123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269 |
- from flask import ( Flask, render_template, request, abort, redirect,
- send_from_directory, url_for, session, flash, get_flashed_messages)
- from flask_session import Session
- import openai
- from dotenv import load_dotenv
- from markdown import markdown
- import requests
- from glob import glob
- import json
- import os
- import sys
- import time
- import shutil
- import re
- import secrets
-
- RE_VID = re.compile("""\[video ["']([^['"]*?)["']\]""")
- TEMPLATE_VID = """<video controls class="centered"><source src="{}" type="video/{}"></video>"""
-
- def preprocess_content(defs):
- md = defs.get("markdown", "")
- if md:
- content = markdown(md)
- content = RE_VID.sub(lambda m: TEMPLATE_VID.format(m.group(1), m.group(1).split('.')[-1]), content)
- else:
- content = ""
- img = defs.get("image", {})
- match img.get("placement", "none"):
- case "above":
- content = """<img src="{source}" alt="{alt}" class="centered w-{width}"/><br/>\n""".format(**img)+content
- case "left":
- content = """<img src="{source}" alt="{alt}" class="float-left w-{width}"/><br/>\n""".format(**img)+content
- case "right":
- content = """<img src="{source}" alt="{alt}" class="float-right w-{width}"/><br/>\n""".format(**img)+content
- case "below":
- content = content+"""<br/>\n<img src="{source}" alt="{alt}" class="centered w-{width}"/><br/>\n""".format(**img)
- if defs.get("use_soundtrack"):
- content = """<audio loop data-autoplay><source src="{}" type="audio/{}"></audio>\n""".format(
- defs["soundtrack"], defs["soundtrack"].split(".")[-1])+content
- return content
-
- def preprocess_payload(payload):
- for column in payload.get("columns", []):
- column["content"] = preprocess_content(column)
- for slide in column.get("slides", []):
- slide["content"] = preprocess_content(slide)
-
- def deref_schema(val):
- "Deref all `$ref` entries in a json-schema"
- if type(val)==type([]):
- return [deref_schema(item) for item in val]
- elif type(val)==type({}):
- val = dict(val)
- if "$ref" in val:
- ref = val["$ref"]
- if ref.startswith("#"):
- pass # stub. Currently we don't have internal references...
- elif ref.startswith("/"):
- ref = url_for("home", _external=True)+ref[1:]
- val.update(requests.get(ref).json())
- del val["$ref"]
- for key in val:
- val[key] = deref_schema(val[key])
- return val
- else:
- return val
-
-
- def file2json(path):
- "returns json string without line breaks and indent"
- return json.dumps(json.load(open(path)))
-
- def make_prompt():
- json_schema = json.dumps(schema())
- json_example = file2json("static/chat-example.json")
- return render_template("prompt.txt", schema=json_schema, example=json_example)
-
- load_dotenv()
- openai.organization = "org-GFWgNyt7NSKpCv6GhzXYZTpi"
-
- application = Flask(__name__)
- application.config["SECRET_KEY"] = secrets.token_hex()
- application.config["SESSION_TYPE"] = 'filesystem'
- Session(application)
-
- @application.route("/", methods=['GET', 'POST'])
- def home():
- is_initial = False
- if "messages" not in session:
- session["messages"] = [
- {"role": "system", "content": make_prompt()},
- {"role": "assistant", "content": file2json("static/initial-chat.json")}
- ]
- session["history"] = []
- is_initial = True
- prompt = ""
- if request.method=='POST' or is_initial:
- if is_initial:
- session["messages"] += [{"role": "system", "content": "User wishes to begin. Remember to find out how they feel before offering a ceremony or other action."}]
- else:
- prompt = request.form["prompt"].strip()
- if prompt:
- session["messages"] += [{"role": "user", "content": prompt}]
- else:
- session["messages"] += [{"role": "system", "content": "User has hit enter. Please continue."}]
- payload = None
- while not payload:
- reply = openai.ChatCompletion.create(model=os.environ["MODEL_NAME"], messages=session["messages"])
- message = dict(reply["choices"][0]["message"])
- session["messages"] += [message]
- try:
- payload = json.loads(message["content"])
- payload["prompt"] = prompt.replace('"', '\"')
- payload["content"] = preprocess_content(payload)
- session["history"] += [dict(payload)] # shallow copy so history doesn't get added later on
- except Exception as e:
- print(repr(e))
- session["messages"] += [{"role": "system", "content": "reply was ignored because it's not a valid json or doesn't comply with the schema. user is unaware. do not apologize to them"}]
- print("=====")
- print(reply["choices"][0]["message"]["content"])
- print("-----")
- print(payload)
- print("=====")
- else:
- payload = json.loads(session["messages"][-1]["content"])
- payload["content"] = preprocess_content(payload)
- payload["history"] = list(reversed(session["history"][:-1]))
- return render_template("chat.html", **payload)
-
- @application.get("/reset")
- def reset():
- for key in ["messages", "history"]:
- if key in session:
- del session[key]
- return redirect(url_for('home'))
-
- @application.get("/schema.json")
- def schema():
- "return deref_schema of `chat.schema.json` file's content"
- return deref_schema(json.load(open("static/chat.schema.json")))
-
- @application.get("/messages.json")
- def dump_messages():
- if "messages" not in session:
- session["messages"] = [
- {"role": "system", "content": make_prompt()},
- {"role": "assistant", "content": file2json("static/initial-chat.json")}
- ]
- return session["messages"]
-
- @application.get("/slides")
- def slides():
- "Legacy multy-slide format"
- payload = json.load(open("static/slides.json"))
- preprocess_payload(payload)
- return render_template("slides.html", generate_indices=False, **payload)
-
- @application.post("/update")
- def update():
- "Legacy multy-slide format"
- shutil.copy(
- "static/slides.json",
- time.strftime(
- "archive/slides-%Y-%m-%d-%H.%M.%S.json",
- time.localtime()))
- payload = request.get_json()
- print(type(payload))
- json.dump(payload, sys.stdout, indent=4)
- json.dump(payload, open("static/slides.json", "w"), indent=4)
- return {"status": "success"}
-
- @application.post("/update-chat")
- def update_chat():
- shutil.copy(
- "static/chat.json",
- time.strftime(
- "archive/chat-%Y-%m-%d-%H.%M.%S.json",
- time.localtime()))
- payload = request.get_json()
- print(type(payload))
- json.dump(payload, sys.stdout, indent=4)
- json.dump(payload, open("static/chat.json", "w"), indent=4)
- return {"status": "success"}
-
- @application.get("/enum/<topic>")
- def choices(topic):
- if topic in ["img", "bg", "bg-video", "audio"]:
- choices = glob("static/media/{}/*.*".format(topic))
- titles = [os.path.basename(c) for c in choices]
- try:
- descdir = json.load(open("static/media/{}.json".format(topic)))
- except FileNotFoundError:
- descdir = {}
- return {
- "type": "string",
- "enum": choices,
- "options": {
- "enum_titles": [descdir.get(t, t.rsplit(".", 1)[0]) for t in titles]
- }
- }
- abort(404)
-
- @application.route("/save", methods=['GET', 'POST'])
- def save():
- if request.method=="POST":
- filename = request.form["filename"].rsplit("/",1)[-1]
- if filename:
- path = "archive/{}.json".format(filename)
- is_overwrite = os.path.isfile(path)
- moment = {
- key: session.get(key, [])
- for key in ["messages", "history"]
- }
- print(moment)
- json.dump(moment, open(path,"w"), indent=4)
- if is_overwrite:
- flash("Successfully overwritten {}.json".format(filename))
- else:
- flash("Successfully saved to {}.json".format(filename))
- else:
- flash("Invalid filename. Save aborted.", "error")
- return redirect(url_for("home"))
- else:
- return render_template("save.html",
- suggestion=time.strftime(
- "moment-%Y-%m-%d-%H.%M.%S"),
- files = [os.path.basename(path).rsplit(".",1)[0] for path in glob("archive/*.json")])
-
- @application.route("/load", methods=['GET', 'POST'])
- def load():
- if request.method=="POST":
- filename = request.form["filename"]
- try:
- moment = json.load(open("archive/{}.json".format(filename)))
- session["messages"] = moment["messages"]
- session["history"] = moment["history"]
- except Exception as e:
- flash(repr(e))
- return redirect(url_for("home")+"#/oldest")
- else:
- return render_template("load.html",
- files = [os.path.basename(path).rsplit(".",1)[0] for path in glob("archive/*.json")])
-
- @application.get("/chat-editor")
- def chat_editor():
- return render_template("chat-editor.html")
-
- @application.get("/editor")
- def editor():
- "Legacy multy-slide format"
- return render_template("editor.html")
-
- @application.route("/img", methods=['GET', 'POST'])
- def image():
- if request.method=='GET':
- src = "static/img/marble-question-mark.png"
- alt = "A question mark"
- prompt = ""
- else:
- prompt = request.form["prompt"]
- alt = prompt
- response = openai.Image.create(prompt=prompt, n=1, size="1024x1024")
- src = response['data'][0]['url']
- return render_template("image.html", src=src, alt=alt, prompt=prompt)
-
- @application.route("/favicon.ico")
- def favicon():
- return send_from_directory(os.path.join(application.root_path, "static"),
- "favicon.ico", mimetype="image/vnd.microsoft.icon")
|