from functools import wraps import uuid import os import json import requests from flask import Flask, g, request, redirect, url_for, abort from flask import jsonify, render_template, flash, session from flask import send_from_directory, make_response from flask_session import Session from werkzeug.utils import secure_filename from suchwow.models import Meme, db app = Flask(__name__) app.config.from_envvar("FLASK_SECRETS") app.secret_key = app.config["SECRET_KEY"] Session(app) OPENID_URL = app.config["OIDC_URL"][0] OPENID_CLIENT_ID = app.config["OIDC_CLIENT_ID"][0] OPENID_CLIENT_SECRET = app.config["OIDC_CLIENT_SECRET"][0] OPENID_REDIRECT_URI = "http://localhost:5000/auth" def allowed_file(filename): return "." in filename and \ filename.rsplit(".", 1)[1].lower() in app.config["ALLOWED_EXTENSIONS"] def login_required(f): @wraps(f) def decorated_function(*args, **kwargs): if "auth" not in session or not session["auth"]: return redirect(url_for("login")) return f(*args, **kwargs) return decorated_function @app.route("/") def index(): return render_template("index.html") @app.route("/meme/") def view(id): if Meme.filter(id=id): m = Meme.get(Meme.id == id) return render_template("view.html", meme=m) else: return "no meme there brah" @app.route("/uploads/") def uploaded_file(filename): return send_from_directory(app.config['UPLOAD_FOLDER'], filename) @app.route("/submit", methods=["GET", "POST"]) @login_required def submit(): if request.method == "POST": # check if the post request has the file part if "file" not in request.files: flash("No file part") return redirect(request.url) file = request.files["file"] # if user does not select file, browser also # submit an empty part without filename if file.filename == "": flash("No selected file") return redirect(request.url) if file and allowed_file(file.filename): filename = secure_filename(file.filename) save_path = os.path.join(app.config["UPLOAD_FOLDER"], filename) file.save(save_path) meme = Meme( title=request.form.get('title'), submitter=session["auth"]["preferred_username"], image_name=filename, ) meme.save() return redirect(url_for("view", id=meme.id)) return render_template("submit.html") @app.route("/login") def login(): state = uuid.uuid4().hex session["auth_state"] = state url = f"{OPENID_URL}/auth?" \ f"client_id={OPENID_CLIENT_ID}&" \ f"redirect_uri={OPENID_REDIRECT_URI}&" \ f"response_type=code&" \ f"state={state}" return redirect(url) @app.route("/auth/") def auth(): # todo assert "state" in request.args assert "session_state" in request.args assert "code" in request.args # verify state if not session.get("auth_state"): return "session error", 500 if request.args["state"] != session["auth_state"]: return "attack detected :)", 500 # with this authorization code we can fetch an access token url = f"{OPENID_URL}/token" data = { "grant_type": "authorization_code", "code": request.args["code"], "redirect_uri": OPENID_REDIRECT_URI, "client_id": OPENID_CLIENT_ID, "client_secret": OPENID_CLIENT_SECRET, "state": request.args["state"] } try: resp = requests.post(url, data=data) resp.raise_for_status() except: return resp.content, 500 data = resp.json() assert "access_token" in data assert data.get("token_type") == "bearer" access_token = data["access_token"] # fetch user information with the access token url = f"{OPENID_URL}/userinfo" try: resp = requests.post(url, headers={"Authorization": f"Bearer {access_token}"}) resp.raise_for_status() user_profile = resp.json() except: return resp.content, 500 # user can now visit /secret session["auth"] = user_profile return redirect(url_for("index")) @app.route("/debug") @login_required def debug(): return f"""

We are logged in!

{json.dumps(session["auth"], indent=4, sort_keys=True)}

Logout """ @app.route("/logout") def logout(): session["auth"] = None return redirect(url_for("index")) @app.errorhandler(404) def not_found(error): return make_response(jsonify({"error": "Page not found"}), 404) @app.cli.command('dbinit') def dbinit(): db.create_tables([Meme]) if __name__ == "__main__": app.run()