forked from lza_menace/suchwow
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
131 lines
3.5 KiB
131 lines
3.5 KiB
import uuid
|
|
import json
|
|
import requests
|
|
from flask import Flask, request, redirect, url_for
|
|
from flask import render_template, flash, session
|
|
from flask import send_from_directory, make_response
|
|
from flask import jsonify
|
|
from flask_session import Session
|
|
from suchwow.models import Post, Profile, Comment, Notification, db
|
|
from suchwow.routes import auth, comment, post, profile
|
|
from suchwow.utils.decorators import login_required
|
|
|
|
|
|
app = Flask(__name__)
|
|
app.config.from_envvar("FLASK_SECRETS")
|
|
app.secret_key = app.config["SECRET_KEY"]
|
|
Session(app)
|
|
|
|
OPENID_URL = app.config["OIDC_URL"]
|
|
OPENID_CLIENT_ID = app.config["OIDC_CLIENT_ID"]
|
|
OPENID_CLIENT_SECRET = app.config["OIDC_CLIENT_SECRET"]
|
|
OPENID_REDIRECT_URI = "http://localhost:5000/auth"
|
|
|
|
app.register_blueprint(post.bp)
|
|
|
|
|
|
@app.route("/")
|
|
def index():
|
|
page = request.args.get("page", "1")
|
|
if page.isdigit() is False:
|
|
flash("Wow, wtf hackerman. Cool it.")
|
|
page = 1
|
|
|
|
posts = Post.select().order_by(Post.timestamp).paginate(int(page), 10)
|
|
return render_template("index.html", posts=posts, page=page)
|
|
|
|
@app.route("/uploads/<path:filename>")
|
|
def uploaded_file(filename):
|
|
return send_from_directory(app.config["UPLOAD_FOLDER"], filename)
|
|
|
|
@app.route("/login")
|
|
def login():
|
|
state = uuid.uuid4().hex
|
|
session["auth_state"] = state
|
|
url = f"{OPENID_URL}/auth?" \
|
|
f"client_id={OPENID_CLIENT_ID}&" \
|
|
f"redirect_uri={OPENID_REDIRECT_URI}&" \
|
|
f"response_type=code&" \
|
|
f"state={state}"
|
|
|
|
if app.debug:
|
|
debug_login()
|
|
return redirect(url_for("index"))
|
|
else:
|
|
return redirect(url)
|
|
|
|
|
|
@app.route("/auth/")
|
|
def auth():
|
|
# todo
|
|
assert "state" in request.args
|
|
assert "session_state" in request.args
|
|
assert "code" in request.args
|
|
|
|
# verify state
|
|
if not session.get("auth_state"):
|
|
return "session error", 500
|
|
if request.args["state"] != session["auth_state"]:
|
|
return "attack detected :)", 500
|
|
|
|
# with this authorization code we can fetch an access token
|
|
url = f"{OPENID_URL}/token"
|
|
data = {
|
|
"grant_type": "authorization_code",
|
|
"code": request.args["code"],
|
|
"redirect_uri": OPENID_REDIRECT_URI,
|
|
"client_id": OPENID_CLIENT_ID,
|
|
"client_secret": OPENID_CLIENT_SECRET,
|
|
"state": request.args["state"]
|
|
}
|
|
try:
|
|
resp = requests.post(url, data=data)
|
|
resp.raise_for_status()
|
|
except:
|
|
return resp.content, 500
|
|
|
|
data = resp.json()
|
|
assert "access_token" in data
|
|
assert data.get("token_type") == "bearer"
|
|
access_token = data["access_token"]
|
|
|
|
# fetch user information with the access token
|
|
url = f"{OPENID_URL}/userinfo"
|
|
|
|
try:
|
|
resp = requests.post(url, headers={"Authorization": f"Bearer {access_token}"})
|
|
resp.raise_for_status()
|
|
user_profile = resp.json()
|
|
except:
|
|
return resp.content, 500
|
|
|
|
# user can now visit /secret
|
|
session["auth"] = user_profile
|
|
return redirect(url_for("index"))
|
|
|
|
|
|
@app.route("/debug")
|
|
@login_required
|
|
def debug():
|
|
return f"""
|
|
<h3>We are logged in!</h3>
|
|
<pre>{json.dumps(session["auth"], indent=4, sort_keys=True)}</pre><br>
|
|
<a href="/logout">Logout</a>
|
|
"""
|
|
|
|
@app.route("/logout")
|
|
def logout():
|
|
session["auth"] = None
|
|
return redirect(url_for("index"))
|
|
|
|
@app.errorhandler(404)
|
|
def not_found(error):
|
|
return make_response(jsonify({"error": "Page not found"}), 404)
|
|
|
|
@app.cli.command("dbinit")
|
|
def dbinit():
|
|
db.create_tables([Post, Profile, Comment, Notification])
|
|
|
|
if __name__ == "__main__":
|
|
app.run()
|