123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350 |
- from flask import ( Flask, render_template, request, abort, redirect,
- send_from_directory, url_for, session, flash, get_flashed_messages)
- from flask_session import Session
- import openai
- from dotenv import load_dotenv
- from markdown import markdown
- import requests
- from glob import glob
- import json
- import os
- import sys
- import time
- import shutil
- import re
- import secrets
-
- RE_VID = re.compile("""\[video ["']([^['"]*?)["']\]""")
- TEMPLATE_VID = """<video controls class="centered"><source src="{}" type="video/{}"></video>"""
-
- def preprocess_content(defs):
- md = defs.get("markdown", "")
- if md:
- content = markdown(md)
- content = RE_VID.sub(lambda m: TEMPLATE_VID.format(m.group(1), m.group(1).split('.')[-1]), content)
- else:
- content = ""
- img = defs.get("image", {})
- match img.get("placement", "none"):
- case "above":
- content = """<img src="{source}" alt="{alt}" class="centered w-{width}"/><br/>\n""".format(**img)+content
- case "left":
- content = """<img src="{source}" alt="{alt}" class="float-left w-{width}"/><br/>\n""".format(**img)+content
- case "right":
- content = """<img src="{source}" alt="{alt}" class="float-right w-{width}"/><br/>\n""".format(**img)+content
- case "below":
- content = content+"""<br/>\n<img src="{source}" alt="{alt}" class="centered w-{width}"/><br/>\n""".format(**img)
- if defs.get("use_soundtrack"):
- content = """<audio loop data-autoplay><source src="{}" type="audio/{}"></audio>\n""".format(
- defs["soundtrack"], defs["soundtrack"].split(".")[-1])+content
- return content
-
- def preprocess_payload(payload):
- for column in payload.get("columns", []):
- column["content"] = preprocess_content(column)
- for slide in column.get("slides", []):
- slide["content"] = preprocess_content(slide)
-
- RE_YOUTUBE = re.compile("https://(?:www.)?youtube.com/watch\?v=(?P<code>[^/?]*)")
- RE_YOUTU_BE = re.compile("https://(?:www.)?youtu.be/(?P<code>[^/?]*)")
- RE_VIMEO = re.compile("https://(?:www.)?vimeo.com/(?P<code>[^/?]*)")
- def preprocess_embed(e):
- res = {}
- url = e.get("url")
- if not url:
- return None
- for key in ["id", "description", "proportions"]:
- val = e.get(key)
- if not val:
- return None
- res[key] = val
- m = RE_YOUTUBE.search(url) or RE_YOUTU_BE.search(url)
- if m:
- res["type"] = "youtube"
- res["code"] = m.group("code")
- else:
- m = RE_VIMEO.search(url)
- if m:
- res["type"] = "vimeo"
- res["code"] = m.group("code")
- else:
- return None
- return res
-
- def get_all_embeds():
- embeds = json.load(open("static/media/video-embed.json"))
- embeds = [preprocess_embed(e) for e in embeds]
- embeds = [e for e in embeds if e]
- return embeds
-
- def deref_schema(val):
- "Deref all `$ref` entries in a json-schema"
- if type(val)==type([]):
- return [deref_schema(item) for item in val]
- elif type(val)==type({}):
- val = dict(val)
- if "$ref" in val:
- ref = val["$ref"]
- if ref.startswith("#"):
- pass # stub. Currently we don't have internal references...
- elif ref.startswith("/"):
- ref = url_for("home", _external=True)+ref[1:]
- val.update(requests.get(ref).json())
- del val["$ref"]
- for key in val:
- val[key] = deref_schema(val[key])
- return val
- else:
- return val
-
-
- def file2json(path):
- "returns json string without line breaks and indent"
- return json.dumps(json.load(open(path)))
-
- def make_prompt(username):
- user = open("static/users/{}.txt".format(username)).read()
- json_schema = json.dumps(schema())
- json_example = file2json("static/chat-example.json")
- img_enum = json.dumps(choices("img"))
- return render_template("prompt.txt", user=user, schema=json_schema, example=json_example, img_enum=img_enum)
-
- load_dotenv()
- openai.organization = "org-GFWgNyt7NSKpCv6GhzXYZTpi"
-
- application = Flask(__name__)
- application.config["SECRET_KEY"] = secrets.token_hex()
- application.config["SESSION_TYPE"] = 'filesystem'
- Session(application)
-
- @application.route("/", methods=['GET', 'POST'])
- def home():
- is_initial = False
- if "messages" not in session:
- if "user" not in session:
- session["user"] = "odelia" # terrible kludge
- session["messages"] = [
- {"role": "system", "content": make_prompt(session["user"])},
- {"role": "assistant", "content": file2json("static/initial-chat.json")}
- ]
- session["history"] = []
- is_initial = True
- prompt = ""
- if request.method=='POST' or is_initial:
- if is_initial:
- session["messages"] += [{"role": "system", "content": "User wishes to begin. Remember to find out how they feel before offering a ceremony or other action."}]
- else:
- prompt = request.form["prompt"].strip()
- if prompt:
- session["messages"] += [{"role": "user", "content": prompt}]
- else:
- session["messages"] += [{"role": "system", "content": "User has hit enter. Please continue."}]
- payload = None
- while not payload:
- reply = openai.ChatCompletion.create(model=os.environ["MODEL_NAME"], messages=session["messages"])
- message = dict(reply["choices"][0]["message"])
- session["messages"] += [message]
- try:
- payload = json.loads(message["content"])
- payload["prompt"] = prompt.replace('"', '\"')
- payload["content"] = preprocess_content(payload)
- if payload.get("use_video_content"):
- embed_id = payload.get("video_content")
- if embed_id:
- embeds = get_all_embeds()
- embeds = [e for e in embeds if e["id"]==embed_id]
- if embeds:
- payload["embed"] = embeds[0]
- session["history"] += [dict(payload)] # shallow copy so history doesn't get added later on
- except Exception as e:
- print(repr(e))
- session["messages"] += [{"role": "system", "content": "reply was ignored because it's not valid JSON. User is unaware. Do NOT apologize to them. Simply rephrase your previous answer as valid JSON that complies with the schema in the initial system prompt of this chat. It is followed by an example you can learn from."}]
- print("=====")
- print(reply["choices"][0]["message"]["content"])
- print("-----")
- print(payload)
- print("=====")
- else:
- payload = json.loads(session["messages"][-1]["content"])
- payload["content"] = preprocess_content(payload)
- payload["history"] = list(reversed(session["history"][:-1]))
- return render_template("chat.html", **payload)
-
- @application.get("/reset/<string:user>")
- def reset(user):
- if not os.path.isfile("static/users/{}.txt".format(user)):
- flash("user {} not found".format(user), 'error')
- return redirect(url_for('home'))
- for key in ["messages", "history"]:
- if key in session:
- del session[key]
- session["user"] = user
- return redirect(url_for('home'))
-
- @application.get("/schema.json")
- def schema():
- "return deref_schema of `chat.schema.json` file's content"
- return deref_schema(json.load(open("static/chat.schema.json")))
-
- @application.get("/slides")
- def slides():
- "Legacy multi-slide format"
- payload = json.load(open("static/slides.json"))
- preprocess_payload(payload)
- return render_template("slides.html", generate_indices=False, **payload)
-
- @application.get("/example")
- def example():
- payload = json.load(open("static/chat-example.json"))
- payload["content"] = preprocess_content(payload)
- if payload.get("use_video_content"):
- embed_id = payload.get("video_content")
- if embed_id:
- embeds = get_all_embeds()
- embeds = [e for e in embeds if e["id"]==embed_id]
- if embeds:
- payload["embed"] = embeds[0]
- return render_template("example.html", **payload)
-
- @application.get("/embeds")
- def embeds():
- embeds = get_all_embeds()
- return render_template("embeds.html", embeds=embeds)
-
- @application.post("/update")
- def update():
- "Legacy multi-slide format"
- shutil.copy(
- "static/slides.json",
- time.strftime(
- "editor-archive/slides-%Y-%m-%d-%H.%M.%S.json",
- time.localtime()))
- payload = request.get_json()
- print(type(payload))
- json.dump(payload, sys.stdout, indent=4)
- json.dump(payload, open("static/slides.json", "w"), indent=4)
- return {"status": "success"}
-
- @application.post("/update-embeds")
- def update_embeds():
- shutil.copy(
- "static/media/video-embed.json",
- time.strftime(
- "editor-archive/embeds-%Y-%m-%d-%H.%M.%S.json",
- time.localtime()))
- payload = request.get_json()
- print(type(payload))
- json.dump(payload, sys.stdout, indent=4)
- json.dump(payload, open("static/media/video-embed.json", "w"), indent=4)
- return {"status": "success"}
-
- @application.post("/update-chat")
- def update_chat():
- shutil.copy(
- "static/chat-example.json",
- time.strftime(
- "editor-archive/chat-%Y-%m-%d-%H.%M.%S.json",
- time.localtime()))
- payload = request.get_json()
- print(type(payload))
- json.dump(payload, sys.stdout, indent=4)
- json.dump(payload, open("static/chat-example.json", "w"), indent=4)
- return {"status": "success"}
-
- @application.get("/enum/<topic>")
- def choices(topic):
- if topic in ["img", "bg", "bg-video", "audio"]:
- choices = glob("static/media/{}/*.*".format(topic))
- titles = [os.path.basename(c) for c in choices]
- try:
- descdir = json.load(open("static/media/{}.json".format(topic)))
- except FileNotFoundError:
- descdir = {}
- return {
- "type": "string",
- "enum": choices,
- "options": {
- "enum_titles": [descdir.get(t, t.rsplit(".", 1)[0]) for t in titles]
- }
- }
- abort(404)
-
- @application.get("/embeds_enum")
- def embeds_enum():
- embeds = get_all_embeds()
- return {
- "enum": [e["id"] for e in embeds],
- "options": { "enum_titles": [e["description"] for e in embeds] }
- }
-
- @application.route("/save", methods=['GET', 'POST'])
- def save():
- if request.method=="POST":
- filename = request.form["filename"].rsplit("/",1)[-1]
- if filename:
- path = "archive/{}.json".format(filename)
- is_overwrite = os.path.isfile(path)
- moment = {
- key: session.get(key, [])
- for key in ["messages", "history"]
- }
- print(moment)
- json.dump(moment, open(path,"w"), indent=4)
- if is_overwrite:
- flash("Successfully overwritten {}.json".format(filename))
- else:
- flash("Successfully saved to {}.json".format(filename))
- else:
- flash("Invalid filename. Save aborted.", "error")
- return redirect(url_for("home"))
- else:
- return render_template("save.html",
- suggestion=time.strftime(
- "moment-%Y-%m-%d-%H.%M.%S"),
- files = [os.path.basename(path).rsplit(".",1)[0] for path in glob("archive/*.json")])
-
- @application.route("/load", methods=['GET', 'POST'])
- def load():
- if request.method=="POST":
- filename = request.form["filename"]
- try:
- moment = json.load(open("archive/{}.json".format(filename)))
- session["messages"] = moment["messages"]
- session["history"] = moment["history"]
- except Exception as e:
- flash(repr(e))
- return redirect(url_for("home")+"#/oldest")
- else:
- return render_template("load.html",
- files = [os.path.basename(path).rsplit(".",1)[0] for path in glob("archive/*.json")])
-
- @application.get("/embed-editor")
- def embed_editor():
- return render_template("embed-editor.html")
-
- @application.get("/chat-editor")
- def chat_editor():
- return render_template("chat-editor.html")
-
- @application.get("/editor")
- def editor():
- "Legacy multi-slide format"
- return render_template("editor.html")
-
- @application.route("/img", methods=['GET', 'POST'])
- def image():
- if request.method=='GET':
- src = "static/img/marble-question-mark.png"
- alt = "A question mark"
- prompt = ""
- else:
- prompt = request.form["prompt"]
- alt = prompt
- response = openai.Image.create(prompt=prompt, n=1, size="1024x1024")
- src = response['data'][0]['url']
- return render_template("image.html", src=src, alt=alt, prompt=prompt)
-
- @application.route("/favicon.ico")
- def favicon():
- return send_from_directory(os.path.join(application.root_path, "static"),
- "favicon.ico", mimetype="image/vnd.microsoft.icon")
|