You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
This repo is archived. You can view files and clone it, but cannot push or open issues/pull-requests.
suchwow/suchwow/app.py

198 lines
5.8 KiB

from functools import wraps
import uuid
import os
import json
import requests
from flask import Flask, request, redirect, url_for, abort
from flask import jsonify, render_template, flash, session
from flask import send_from_directory, make_response
from flask_session import Session
from werkzeug.utils import secure_filename
from suchwow.models import Post, Profile, Comment, Notification, db
app = Flask(__name__)
app.config.from_envvar("FLASK_SECRETS")
app.secret_key = app.config["SECRET_KEY"]
Session(app)
OPENID_URL = app.config["OIDC_URL"]
OPENID_CLIENT_ID = app.config["OIDC_CLIENT_ID"]
OPENID_CLIENT_SECRET = app.config["OIDC_CLIENT_SECRET"]
OPENID_REDIRECT_URI = "http://localhost:5000/auth"
def allowed_file(filename):
return "." in filename and \
filename.rsplit(".", 1)[1].lower() in app.config["ALLOWED_EXTENSIONS"]
def debug_login():
session["auth"] = {
"state": "active",
"session_state": "debug",
"code": "abcdefg",
"preferred_username": "debuguser",
"debug": True
}
return
def login_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if app.debug:
debug_login()
return f(*args, **kwargs)
if "auth" not in session or not session["auth"]:
return redirect(url_for("login"))
return f(*args, **kwargs)
return decorated_function
@app.route("/")
def index():
page = request.args.get("page", "1")
if page.isdigit() is False:
flash("Wow, wtf hackerman. Cool it.")
page = 1
posts = Post.select().order_by(Post.timestamp).paginate(int(page), 10)
return render_template("index.html", posts=posts, page=page)
@app.route("/post/<id>")
def read_post(id):
if Post.filter(id=id):
post = Post.get(Post.id == id)
return render_template("post.html", post=post)
else:
return "no meme there brah"
@app.route("/uploads/<path:filename>")
def uploaded_file(filename):
return send_from_directory(app.config["UPLOAD_FOLDER"], filename)
@app.route("/post/create", methods=["GET", "POST"])
@login_required
def create_post():
if request.method == "POST":
post_title = request.form.get("title")
# check if the post request has the file part
if "file" not in request.files:
flash("You didn't upload a caliente meme, bro! You're fuckin up!")
return redirect(request.url)
file = request.files["file"]
# if user does not select file, browser also
# submit an empty part without filename
if file.filename == "":
flash("You didn't upload a caliente meme, bro! You're fuckin up!")
return redirect(request.url)
if post_title is "":
flash("You didn't give your meme a spicy title, bro! You're fuckin up!")
return redirect(request.url)
if file and allowed_file(file.filename):
filename = secure_filename(file.filename)
save_path = os.path.join(app.config["UPLOAD_FOLDER"], filename)
file.save(save_path)
# gen wallet
post = Post(
title=post_title,
subtitle=request.form.get("subtitle", ""),
submitter=session["auth"]["preferred_username"],
image_name=filename,
account_index=0,
address_index=0
)
post.save()
return redirect(url_for("read_post", id=post.id))
return render_template("create_post.html")
@app.route("/login")
def login():
state = uuid.uuid4().hex
session["auth_state"] = state
url = f"{OPENID_URL}/auth?" \
f"client_id={OPENID_CLIENT_ID}&" \
f"redirect_uri={OPENID_REDIRECT_URI}&" \
f"response_type=code&" \
f"state={state}"
if app.debug:
debug_login()
return redirect(url_for("index"))
else:
return redirect(url)
@app.route("/auth/")
def auth():
# todo
assert "state" in request.args
assert "session_state" in request.args
assert "code" in request.args
# verify state
if not session.get("auth_state"):
return "session error", 500
if request.args["state"] != session["auth_state"]:
return "attack detected :)", 500
# with this authorization code we can fetch an access token
url = f"{OPENID_URL}/token"
data = {
"grant_type": "authorization_code",
"code": request.args["code"],
"redirect_uri": OPENID_REDIRECT_URI,
"client_id": OPENID_CLIENT_ID,
"client_secret": OPENID_CLIENT_SECRET,
"state": request.args["state"]
}
try:
resp = requests.post(url, data=data)
resp.raise_for_status()
except:
return resp.content, 500
data = resp.json()
assert "access_token" in data
assert data.get("token_type") == "bearer"
access_token = data["access_token"]
# fetch user information with the access token
url = f"{OPENID_URL}/userinfo"
try:
resp = requests.post(url, headers={"Authorization": f"Bearer {access_token}"})
resp.raise_for_status()
user_profile = resp.json()
except:
return resp.content, 500
# user can now visit /secret
session["auth"] = user_profile
return redirect(url_for("index"))
@app.route("/debug")
@login_required
def debug():
return f"""
<h3>We are logged in!</h3>
<pre>{json.dumps(session["auth"], indent=4, sort_keys=True)}</pre><br>
<a href="/logout">Logout</a>
"""
@app.route("/logout")
def logout():
session["auth"] = None
return redirect(url_for("index"))
@app.errorhandler(404)
def not_found(error):
return make_response(jsonify({"error": "Page not found"}), 404)
@app.cli.command("dbinit")
def dbinit():
db.create_tables([Post, Profile, Comment, Notification])
if __name__ == "__main__":
app.run()