diff --git a/.gitignore b/.gitignore index 22038f2..20fbbdc 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,4 @@ app/files/ app/pwfile.json app/dest app.log +app/__pycache__/ diff --git a/app/__pycache__/main.cpython-313.pyc b/app/__pycache__/main.cpython-313.pyc index ce6523c..34c6ffa 100644 Binary files a/app/__pycache__/main.cpython-313.pyc and b/app/__pycache__/main.cpython-313.pyc differ diff --git a/app/main.py b/app/main.py index 856eee7..f5a79f3 100644 --- a/app/main.py +++ b/app/main.py @@ -1,5 +1,5 @@ from typing import Annotated -from typing import List, Dict, Tuple +from typing import List, Dict, Tuple, Sequence from annotated_types import IsDigit from fastapi import FastAPI, File, HTTPException, UploadFile, Request, Form from fastapi.responses import FileResponse @@ -24,6 +24,8 @@ import datetime import logging +from starlette.types import HTTPExceptionHandler + log = logging.getLogger(__name__) logging.basicConfig(filename="app.log", level=logging.INFO) debug = log.debug @@ -62,6 +64,60 @@ GREETINGFILE = "./app/graphics/greeting.pdf" # for l in cur: # print(l) # locpaths = ["./VO_Mathematik_3.pdf"] # replace this with a database + + +def _sql_quarry( + cursor: mariadb.Cursor, + querry: str, + data: Tuple[str | int, ...] | int | str, + return_result, +) -> List: + datas: Tuple[str | int, ...] + if type(data) is str or type(data) is int: + datas = (data,) + elif type(data) is tuple: + datas = data + try: + cursor.execute(querry, datas) + if return_result: + return cursor.fetchall() + else: + return [] + except mariadb.Error as e: + error(f"Mariadb Error: {e}") + raise HTTPException( + status_code=500, detail="Somethings wrong with the database" + ) + + +def sql( + querry: str, data: Tuple[str | int, ...] | str | int, return_result: bool = True +) -> List[Tuple]: + cur = db.cursor(dictionary=False) + return _sql_quarry(cur, querry, data, return_result) + + +def sqlT( + querry: str, data: tuple[str | int, ...] | str | int, return_result: bool = True +) -> List[Dict]: + cur = db.cursor(dictionary=True) + return _sql_quarry(cur, querry, data, return_result) + + # datas:Tuple[str|int,...] + # if type(data) is str or type(data) is int: + # datas = (data,) + # else: + # datas = data + # try: + # cur.execute(querry, datas) + # return cur.fetchall() + # except mariadb.Error as e: + # error(f"Mariadb Error: {e}") + # raise HTTPException( + # status_code=500, detail="Somethings wrong with the database" + # ) + + @app.get("/") async def get_index(): """gives the Index.html file""" @@ -72,22 +128,26 @@ async def get_index(): async def get_file(file_id: str): """returns the file that cooorosponds with the given ID""" if file_id == "unsupported": - error("File is unsupported") + error("User uploadad unsupported file") return FileResponse(UNSUPPORTEDFILE) if file_id == "empty": - error("File Id empty") + error("User uploaded empty file") return FileResponse(EMPTYFILE) if file_id == "greeting": return FileResponse(GREETINGFILE) - cur = db.cursor() - try: - cur.execute("Select filename from FIP where id=?", (file_id,)) - except mariadb.Error as e: - print(f"Mariadb Error: {e}") - raise HTTPException( - status_code=500, detail="Somethings wrong with the database" - ) - filename = cur.fetchone()[0] + # cur = db.cursor() + # try: + res = sql("Select filename from FIP where id=?", (file_id,)) + if len(res) < 1: + error("File ID a user is trying to reach dose not exist") + raise HTTPException(status_code=404, detail="File dose ot exist") + filename = res[0][0] + # except mariadb.Error as e: + # error(f"Mariadb Error: {e}") + # raise HTTPException( + # status_code=500, detail="Somethings wrong with the database" + # ) + # filename = cur.fetchone()[0] return FileResponse(FILES_IN_PROGRESS + filename) @@ -96,42 +156,43 @@ async def search_lva( searchterm: str = "", pid: str | None = None, searchlim: int = 10 ) -> List[Dict[str, int | str]]: """returns the LVA for a search in the database""" - res = [] - zw = [] - cur = db.cursor(dictionary=True) + res: List[Dict[str, str | int]] = [] + zw: List[Dict[str, str | int]] = [] + # cur = db.cursor(dictionary=True) if await is_LVID(searchterm): - cur.execute( - "SELECT id,lvid,lvname FROM LVAs WHERE lvid LIKE ?", (searchterm + "%",) + res += sqlT( + "SELECT id,lvid,lvname FROM LVAs WHERE lvid LIKE ?", + (searchterm + "%",), ) - res = cur.fetchall() + # res = cur.fetchall() else: if pid is not None: - cur.execute( + res += sqlT( "SELECT LVAs.id,LVAs.lvid,LVAs.lvname FROM LVAs LEFT JOIN LPLink ON LVAs.id=LPLink.lid WHERE lvname like ? AND pid=?", (searchterm + "%", pid), ) - res += cur.fetchall() - cur.execute( + # res += cur.fetchall() + res += sqlT( "SELECT LVAs.id,LVAs.lvid,LVAs.lvname FROM LVAs LEFT JOIN LPLink ON LVAs.id=LPLink.lid WHERE lvname like ? AND pid=?", ("%" + searchterm + "%", pid), ) - res += cur.fetchall() - cur.execute( + # res += cur.fetchall() + zw += sqlT( "SELECT LVAs.id,LVAs.lvid,LVAs.lvname FROM LVAs LEFT JOIN LPLink ON LVAs.id=LPLink.lid WHERE pid=?", (pid,), ) - zw = cur.fetchall() + # zw = cur.fetchall() if searchterm != "": - cur.execute( + res += sqlT( "SELECT id,lvid,lvname FROM LVAs WHERE lvname LIKE ?", (searchterm + "%",), ) - res += cur.fetchall() - cur.execute( + # res += cur.fetchall() + res += sqlT( "SELECT id,lvid,lvname FROM LVAs WHERE lvname LIKE ?", ("%" + searchterm + "%",), ) - res += cur.fetchall() + # res += cur.fetchall() res = remove_duplicates(res + zw) if searchlim == 0: return res @@ -144,27 +205,28 @@ async def search_profs( searchterm: str = "", lid: int | None = None, searchlim: int = 10 ) -> List[Dict[str, str | int]]: """returns the Prof for a searchterm and LVA id""" - res = [] - zw = [] - cur = db.cursor(dictionary=True) + res: List[Dict[str, str | int]] = [] + zw: List[Dict[str, str | int]] = [] + # cur = db.cursor(dictionary=True) if lid is not None: # cur.execute("SELECT id FROM LVAs WHERE LVId=?", (lvid,)) # lid = cur.fetchall()[0]["id"] - cur.execute( + res += sqlT( "SELECT Profs.id,Profs.name FROM Profs LEFT JOIN LPLink ON Profs.id=LPLink.pid WHERE name like ? AND lid=?", ("%" + searchterm + "%", lid), ) - res = cur.fetchall() - cur.execute( + # res = cur.fetchall() + zw += sqlT( "SELECT Profs.id,Profs.name FROM Profs LEFT JOIN LPLink ON Profs.id=LPLink.pid WHERE name NOT like ? AND lid=?", ("%" + searchterm + "%", lid), ) - zw = cur.fetchall() + # zw = cur.fetchall() if searchterm != "": - cur.execute( - "SELECT id,name FROM Profs WHERE name LIKE ?", ("%" + searchterm + "%",) + res += sqlT( + "SELECT id,name FROM Profs WHERE name LIKE ?", + ("%" + searchterm + "%",), ) - res += cur.fetchall() + # res += cur.fetchall() res = remove_duplicates(res + zw) if searchlim == 0: return res @@ -185,26 +247,26 @@ async def search_subcats( """searches for avaliable subcatrgories in a specific LVA with a specific Prof(optional)""" res = [] rest = [] - cur = db.cursor(dictionary=True) + # cur = db.cursor(dictionary=True) if not (lid is None or pid is None or cat is None): # Rest is available # cur.execute("SELECT id FROM LVAs WHERE LVId=?", (lvid,)) # lid = cur.fetchall()[0]["id"] - cur.execute( + rest = sqlT( "SELECT id,name FROM SubCats WHERE lid=? AND pid=? AND cat=?", (lid, pid, cat), ) - rest = cur.fetchall() + # rest = cur.fetchall() if searchterm != "": # searchterm is available if not (lid is None or pid is None or cat is None): - cur.execute( + res = sqlT( "SELECT id,name FROM SubCats WHERE lid=? AND pid=? AND cat=? AND name LIKE ?", (lid, pid, cat, "%" + searchterm + "%"), ) - res = cur.fetchall() - cur.execute( + # res = cur.fetchall() + res += sqlT( "SELECT id,name FROM SubCats WHERE name LIKE ?", ("%" + searchterm + "%",) ) - res += cur.fetchall() + # res += cur.fetchall() res = remove_duplicates(res + rest) if searchlim == 0: return res @@ -264,30 +326,35 @@ async def create_upload_file(files: List[UploadFile], c2pdf: bool = True): filename = make_filename_unique(filename) locpath = FILES_IN_PROGRESS + filename # locpaths.append(locpath) - cur = db.cursor() - try: - cur.execute( - "Insert Into FIP (filename,filetype,initTimeStamp) Values(?,?,?)", - (filename, ft, str(datetime.datetime.now())), - ) - except mariadb.Error as e: - print(f"Error: {e}") - raise HTTPException( - status_code=500, detail="Somethings wrong with the database" - ) - try: - cur.execute("Select id From FIP where filename=?", (filename,)) - except mariadb.Error as e: - print(f"Error: {e}") - raise HTTPException( - status_code=500, detail="Somethings wrong with the database" - ) - id = cur.fetchone()[0] + # cur = db.cursor() + # try: + sql( + "INSERT INTO FIP (filename,filetype,initTimeStamp) Values(?,?,?)", + (filename, ft, str(datetime.datetime.now())), + return_result=False, + ) + # except mariadb.Error as e: + # print(f"Error: {e}") + # raise HTTPException( + # status_code=500, detail="Somethings wrong with the database" + # ) + # try: + db.commit() + sqlres = sql("SELECT id FROM FIP WHERE filename=?", (filename,)) + if len(sqlres) < 1: + error(f"FIP Entry with filename {filename} I just created dose not exist") + raise HTTPException(status_code=500, detail="Error with the Database") + id = sqlres[0][0] + # except mariadb.Error as e: + # print(f"Error: {e}") + # raise HTTPException( + # status_code=500, detail="Somethings wrong with the database" + # ) + # id = cur.fetchone()[0] if content is not None: with open(locpath, "wb") as f: f.write(content) # app.mount("/files", StaticFiles(directory="./app/files/"), name="files") - db.commit() fname = "".join(filename.split(".")[0:-1]) # ftype = filename.split(".")[-1] return { @@ -328,20 +395,24 @@ async def get_submission( ) rects_p = json.loads(rects) scales_p = json.loads(pagescales) - cur = db.cursor() - try: - cur.execute("Select filename from FIP where id=?", (fileId,)) - except mariadb.Error as e: - print(f"Mariadb Error: {e}") - raise HTTPException( - status_code=500, detail="Somethings wrong with the database" - ) - filepath = "./app/files/" + cur.fetchone()[0] + # cur = db.cursor() + # try: + res = sql("Select filename from FIP where id=?", (fileId,)) + if len(res) < 1: + error(f"Submited file ID {fileId} dose not exist in database") + raise HTTPException(status_code=400, detail="Submited file dose not exist.") + filepath = "./app/files/" + res[0][0] + # except mariadb.Error as e: + # print(f"Mariadb Error: {e}") + # raise HTTPException( + # status_code=500, detail="Somethings wrong with the database" + # ) + # filepath = "./app/files/" + cur.fetchone()[0] try: dest = make_savepath(lva, prof, stype, subcat, sem, ex_date, fname, ftype) except ValueError as e: error(f"Error creating savepath: f{e}") - raise HTTPException(status_code=400, detail=str(e)) + raise HTTPException(status_code=400, detail="Cannot create Savepath") await censor_pdf( filepath, dest, rects_p, scales_p, False if censor == "False" else True ) @@ -504,47 +575,47 @@ def make_savepath( def get_lvpath(lva: str) -> Tuple[int, str]: """returns the path in UNIZEUG from a LVA based on its LVID (or name) that may be within a string. It uses the path within the database. If there is no Entry with a fitting LVID in the database it creates a new LVA. Returns: (id,path)""" - cur = db.cursor() + # cur = db.cursor() lvid = re.search(r"[a-zA-Z0-9]{3}\.[a-zA-Z0-9]{3}", lva) if lvid is not None: - cur.execute( + res = sql( "SELECT id,lvpath FROM LVAs WHERE lvid=?", (lvid.group()[:3] + lvid.group()[4:],), ) - res = cur.fetchone() - if res is not None: - return res + # res = cur.fetchone() + if len(res) > 0: + return res[0] else: return makenew(lva, "LVAs") else: - cur.execute("SELECT id,lvpath FROM LVAs WHERE lvname=?", (lva,)) - res = cur.fetchone() - if res is not None: - return res + res = sql("SELECT id,lvpath FROM LVAs WHERE lvname=?", (lva,)) + # res = cur.fetchone() + if len(res) > 0: + return res[0] else: return makenew(lva, "LVAs") def get_profpath(prof: str, lid: int) -> Tuple[int, str]: """Generates the foldername for a prof based on his name. It searches the database for matches. Returns: (id,name)""" - cur = db.cursor() + # cur = db.cursor() prof = prof.replace("_", " ") - cur.execute("SELECT id,name FROM Profs WHERE name=?", (prof,)) - res = cur.fetchall() + res = sql("SELECT id,name FROM Profs WHERE name=?", (prof,)) + # res = cur.fetchall() print(res != []) if res is not None and res != []: ret = (res[0][0], res[0][1].replace(" ", "_")) - cur.execute("SELECT * FROM LPLink WHERE LId=? AND PId=?", (lid, ret[0])) - if cur.fetchall() is None: + # sql("SELECT * FROM LPLink WHERE LId=? AND PId=?", (lid, ret[0])) + if sql("SELECT * FROM LPLink WHERE LId=? AND PId=?", (lid, ret[0])) is None: linkLP(lid, ret[0]) return ret fname, lname = prof.split(" ") - cur.execute("SELECT id,name FROM Profs WHERE name like ?", (lname + " " + fname,)) - res = cur.fetchall() + res = sql("SELECT id,name FROM Profs WHERE name like ?", (lname + " " + fname,)) + # res = cur.fetchall() if res is not None and res != []: ret = (res[0][0], res[0][1].replace(" ", "_")) - cur.execute("SELECT * FROM LPLink WHERE LId=? AND PId=?", (lid, ret[0])) - if cur.fetchall() is None: + # sql("SELECT * FROM LPLink WHERE LId=? AND PId=?", (lid, ret[0])) + if sql("SELECT * FROM LPLink WHERE LId=? AND PId=?", (lid, ret[0])) is None: linkLP(lid, ret[0]) return ret ret = makenew(prof, "Profs") @@ -554,20 +625,20 @@ def get_profpath(prof: str, lid: int) -> Tuple[int, str]: def get_subcatpath(subcat: str, cat: int, pid: int, lid: int) -> Tuple[int, str]: """Generates the subcat path from a subcat name. Returns: (id,name)""" - cur = db.cursor() - cur.execute( + # cur = db.cursor() + res = sql( "SELECT id,name FROM SubCats WHERE LId=? AND PId=? AND cat=? AND name=?", (lid, pid, cat, subcat), ) - res = cur.fetchone() - if res is None: + # res = cur.fetchone() + if res == []: return makenew(subcat, "SubCats", LId=lid, PId=pid, cat=cat) - return res + return res[0] def makenew(input: str, table: str, **kwargs) -> Tuple[int, str]: """Generates new Entrys in the database for LVAs, Profs, SUBCATS. Returns: (id,name/path)""" - cur = db.cursor() + # cur = db.cursor() if table == "LVAs": lvaid = re.search(r"[a-zA-Z0-9]{3}\.[a-zA-Z0-9]{3}", input) if lvaid is None: @@ -575,12 +646,14 @@ def makenew(input: str, table: str, **kwargs) -> Tuple[int, str]: lvid = lvaid.group()[:3] + lvaid.group()[4:] lvname = re.sub(r"[_ -]*[a-zA-Z0-9]{3}\.[a-zA-Z0-9]{3}[_ -]*", "", input) lvpath = lvname + "_" + lvaid.group() - cur.execute( - "INSERT INTO LVAs(lvid,lvname,lvpath) VALUES(?,?,?)", (lvid, lvname, lvpath) + sql( + "INSERT INTO LVAs(lvid,lvname,lvpath) VALUES(?,?,?)", + (lvid, lvname, lvpath), + return_result=False, ) - cur.execute("SELECT id,lvpath FROM LVAs WHERE lvid=?", (lvid,)) + # cur.execute("SELECT id,lvpath FROM LVAs WHERE lvid=?", (lvid,)) db.commit() - return cur.fetchone() + return sql("SELECT id,lvpath FROM LVAs WHERE lvid=?", (lvid,))[0] querry = "INSERT INTO " + table + "(name" values = [input] nvals = 0 @@ -589,10 +662,14 @@ def makenew(input: str, table: str, **kwargs) -> Tuple[int, str]: querry += "," + k nvals += 1 querry += ") VALUES(?" + nvals * ",?" + ")" - cur.execute(querry, tuple(values)) - cur.execute("SELECT id,name FROM " + table + " WHERE name=?", (input,)) - res = cur.fetchone() + sql(querry, tuple(values), return_result=False) + sqlres = sql("SELECT id,name FROM " + table + " WHERE name=?", (input,)) db.commit() + if len(sqlres) < 1: + error(f"Entry into {table} with name {input}, I just created dose not exist") + raise HTTPException(status_code=500, detail="Error with Database") + res = sqlres[0] + # res = cur.fetchone() if table == "Profs": return (res[0], res[1].replace(" ", "_")) return res @@ -600,8 +677,8 @@ def makenew(input: str, table: str, **kwargs) -> Tuple[int, str]: def linkLP(lid: int, pid: int): """declares that a Prof (id in database) offers a LVA (id in database)""" - cur = db.cursor() - cur.execute("INSERT INTO LPLink(LId,PId) VALUES(?,?)", (lid, pid)) + # cur = db.cursor() + sql("INSERT INTO LPLink(LId,PId) VALUES(?,?)", (lid, pid), return_result=False) db.commit() @@ -659,9 +736,9 @@ def filename_to_pdf(filename: str) -> str: def make_filename_unique(filename: str, idx: int | None = None) -> str: """makes sure, there are no duplicate filenames in the temporary folder""" - cur = db.cursor() - cur.execute("SELECT id FROM FIP WHERE filename=?", (filename,)) - res = cur.fetchall() + # cur = db.cursor() + res = sql("SELECT id FROM FIP WHERE filename=?", (filename,)) + # res = cur.fetchall() if res is not None and len(res) > 0: farr = filename.split(".") if len(farr) > 1: