from typing import Annotated from typing import List, Dict, Tuple, Sequence from starlette.responses import StreamingResponse from annotated_types import IsDigit from fastapi import FastAPI, File, HTTPException, Path, UploadFile, Request, Form from fastapi.responses import FileResponse # import multiprocessing # import threading # import concurrent.futures import asyncio # import fastapi from fastapi.staticfiles import StaticFiles import pymupdf # import fitz as pymupdf import json import re import os import mariadb import filetype import logging import pathlib from pathlib import Path from starlette.types import HTTPExceptionHandler log = logging.getLogger(__name__) logging.basicConfig( filename=os.environ.get("APP_LOG_PATH"), level=logging.INFO, format="[%(asctime)s, %(filename)s:%(lineno)s -> %(funcName)10s() ]%(levelname)s: %(message)s", ) debug = log.debug info = log.info error = log.error db = mariadb.connect( host=os.environ.get("DB_HOST", "db"), user=os.environ.get("DB_USER", "user"), password=os.environ.get("DB_PASSWORD", "DBPASSWORD"), database=os.environ.get("DB_DATABASE", "unizeug"), ) info("App Started") # remove_old_FIP_entrys() # startup() app = FastAPI() CATEGORIES = [ "Prüfungen", "Klausuren", "Übungen", "Labore", "Unterlagen", "Zusammenfassungen", "Multimedia", ] APP_ROOT_PATH = Path(os.environ.get("APP_ROOT_PATH", "./app")) SUBCAT_CATEGORIES = ["Klausuren", "Übungen", "Labore"] SUBCAT_CATEGORIES_I = [1, 2, 3] EX_DATE_CATEGORIES = ["Prüfungen", "Klausuren"] EX_DATE_CATEGORIES_I = [0, 1] UNIZEUG_PATH = Path(os.environ.get("UNIZEUG_PATH", "./app/dest")) FILES_IN_PROGRESS = APP_ROOT_PATH / "files/" EMPTYFILE = APP_ROOT_PATH / "graphics/empty.pdf" UNSUPPORTEDFILE = APP_ROOT_PATH / "graphics/unsupported.pdf" GREETINGFILE = APP_ROOT_PATH / "graphics/greeting.pdf" FAVICON = APP_ROOT_PATH / "favicon" STATIC_FILES = APP_ROOT_PATH / "static" # cur = db.cursor() # cur.execute("select * from FIP;") # for l in cur: # print(l) # locpaths = ["./VO_Mathematik_3.pdf"] # replace this with a database censor_status_update_events: Dict[str, asyncio.Event] = {} censor_status_datas: Dict[str, Dict[str, int | None | str | bool]] = {} # censor_finished_flags: Dict[str, asyncio.Event] = {} def _sql_quarry( cursor: mariadb.Cursor, querry: str, data: Tuple[str | int, ...] | int | str, return_result: bool, commit: bool, ) -> List: datas: Tuple[str | int, ...] if type(data) is str or type(data) is int: datas = (data,) elif type(data) is tuple: datas = data try: cursor.execute(querry, datas) if commit: db.commit() if return_result: return cursor.fetchall() else: return [] except mariadb.Error as e: error(f"Mariadb Error: '{e}' from Querry: '{querry}' with variables: {data}") raise HTTPException( status_code=500, detail="Somethings wrong with the database" ) def sql( querry: str, data: Tuple[str | int, ...] | str | int = (), return_result: bool = True, commit: bool = False, ) -> List[Tuple]: cur = db.cursor(dictionary=False) return _sql_quarry(cur, querry, data, return_result, commit) def sqlT( querry: str, data: tuple[str | int, ...] | str | int = (), return_result: bool = True, commit: bool = False, ) -> List[Dict]: cur = db.cursor(dictionary=True) return _sql_quarry(cur, querry, data, return_result, commit) # datas:Tuple[str|int,...] # if type(data) is str or type(data) is int: # datas = (data,) # else: # datas = data # try: # cur.execute(querry, datas) # return cur.fetchall() # except mariadb.Error as e: # error(f"Mariadb Error: {e}") # raise HTTPException( # status_code=500, detail="Somethings wrong with the database" # ) app.mount( "/favicon", StaticFiles(directory=os.environ.get("FAVICON_PATH", FAVICON)), name="favicon", ) app.mount( "/static", StaticFiles(directory=os.environ.get("STATIC_PATH", STATIC_FILES)), name="static", ) @app.get("/") async def get_index(): """gives the Index.html file""" return FileResponse(APP_ROOT_PATH / "index.html") @app.get("/files/{file_id}") async def get_file(file_id: str): """returns the file that cooorosponds with the given ID""" if file_id == "unsupported": error("User uploadad unsupported file") return FileResponse(UNSUPPORTEDFILE) if file_id == "empty": error("User uploaded empty file") return FileResponse(EMPTYFILE) if file_id == "greeting": return FileResponse(GREETINGFILE) # cur = db.cursor() # try: res = sql("Select filename from FIP where id=?", (file_id,)) if len(res) < 1: error("File ID a user is trying to reach dose not exist") raise HTTPException(status_code=404, detail="File dose ot exist") filename = res[0][0] # except mariadb.Error as e: # error(f"Mariadb Error: {e}") # raise HTTPException( # status_code=500, detail="Somethings wrong with the database" # ) # filename = cur.fetchone()[0] return FileResponse(FILES_IN_PROGRESS / filename) @app.get("/search/lva") async def search_lva( searchterm: str = "", pid: str | None = None, searchlim: int = 10 ) -> List[Dict[str, int | str]]: """returns the LVA for a search in the database""" res: List[Dict[str, str | int]] = [] zw: List[Dict[str, str | int]] = [] # cur = db.cursor(dictionary=True) if await is_LVID(searchterm): res += sqlT( "SELECT id,lvid,lvname FROM LVAs WHERE lvid LIKE ?", (searchterm + "%",), ) # res = cur.fetchall() else: if pid is not None: res += sqlT( "SELECT LVAs.id,LVAs.lvid,LVAs.lvname FROM LVAs LEFT JOIN LPLink ON LVAs.id=LPLink.lid WHERE lvname like ? AND pid=?", (searchterm + "%", pid), ) # res += cur.fetchall() res += sqlT( "SELECT LVAs.id,LVAs.lvid,LVAs.lvname FROM LVAs LEFT JOIN LPLink ON LVAs.id=LPLink.lid WHERE lvname like ? AND pid=?", ("%" + searchterm + "%", pid), ) # res += cur.fetchall() zw += sqlT( "SELECT LVAs.id,LVAs.lvid,LVAs.lvname FROM LVAs LEFT JOIN LPLink ON LVAs.id=LPLink.lid WHERE pid=?", (pid,), ) # zw = cur.fetchall() if searchterm != "": res += sqlT( "SELECT id,lvid,lvname FROM LVAs WHERE lvname LIKE ?", (searchterm + "%",), ) # res += cur.fetchall() res += sqlT( "SELECT id,lvid,lvname FROM LVAs WHERE lvname LIKE ?", ("%" + searchterm + "%",), ) # res += cur.fetchall() res = remove_duplicates(res + zw) info( f"LVA Search: {searchterm}; Result: {res[: (searchlim if searchlim != 0 else -1)]}" ) if searchlim == 0: return res else: return res[:searchlim] @app.get("/search/prof") async def search_profs( searchterm: str = "", lid: int | None = None, searchlim: int = 10 ) -> List[Dict[str, str | int]]: """returns the Prof for a searchterm and LVA id""" res: List[Dict[str, str | int]] = [] zw: List[Dict[str, str | int]] = [] # cur = db.cursor(dictionary=True) if lid is not None: # cur.execute("SELECT id FROM LVAs WHERE LVId=?", (lvid,)) # lid = cur.fetchall()[0]["id"] res += sqlT( "SELECT Profs.id,Profs.name FROM Profs LEFT JOIN LPLink ON Profs.id=LPLink.pid WHERE name like ? AND lid=?", ("%" + searchterm + "%", lid), ) # res = cur.fetchall() zw += sqlT( "SELECT Profs.id,Profs.name FROM Profs LEFT JOIN LPLink ON Profs.id=LPLink.pid WHERE name NOT like ? AND lid=?", ("%" + searchterm + "%", lid), ) # zw = cur.fetchall() if searchterm != "": res += sqlT( "SELECT id,name FROM Profs WHERE name LIKE ?", ("%" + searchterm + "%",), ) # res += cur.fetchall() res = remove_duplicates(res + zw) info( f"Prof Search: {searchterm}; Result: {res[: (searchlim if searchlim != 0 else -1)]}" ) if searchlim == 0: return res else: return res[:searchlim] @app.get( "/search/subcat" ) # NOT FULLY TESTED DUE TO INCOMPLETE DATABASE DUE TO INACCEPTABLE FOLDERSTRUCTURE async def search_subcats( searchterm: str = "", lid: int | None = None, pid: int | None = None, cat: int | None = None, searchlim: int = 10, ) -> List[Dict[str, str | int]]: """searches for avaliable subcatrgories in a specific LVA with a specific Prof(optional)""" res = [] rest = [] # cur = db.cursor(dictionary=True) if not (lid is None or pid is None or cat is None): # Rest is available # cur.execute("SELECT id FROM LVAs WHERE LVId=?", (lvid,)) # lid = cur.fetchall()[0]["id"] rest = sqlT( "SELECT id,name FROM SubCats WHERE lid=? AND pid=? AND cat=?", (lid, pid, cat), ) # rest = cur.fetchall() if searchterm != "": # searchterm is available if not (lid is None or pid is None or cat is None): res = sqlT( "SELECT id,name FROM SubCats WHERE lid=? AND pid=? AND cat=? AND name LIKE ?", (lid, pid, cat, "%" + searchterm + "%"), ) # res = cur.fetchall() res += sqlT( "SELECT id,name FROM SubCats WHERE name LIKE ?", ("%" + searchterm + "%",) ) # res += cur.fetchall() res = remove_duplicates(res + rest) info( f"Subcatrgory Search: {searchterm}; Result: {res[: (searchlim if searchlim != 0 else -1)]}" ) if searchlim == 0: return res else: return res[:searchlim] # @app.post("/files/") # async def create_file(file: Annotated[bytes, File()]): # return {"filesize": len(file)} @app.post("/uploadfile/") async def create_upload_file(files: List[UploadFile], c2pdf: bool = True): """Handles files uploaded. generates ID; saves file; saves path in database""" if len(files) == 0: raise HTTPException(status_code=400, detail="No files found in file submission") filename = files[0].filename if files[0].filename is not None else "None" if len(files) == 1: content = await files[0].read() ft: str = guess_filetype(content, filename) if c2pdf and ft != "pdf": ret = convert_to_pdf(content) if ret is not None: content = ret filename = filename_to_pdf(filename) ft = "pdf" else: filecontents = [] for file in files: content = await file.read() ft = guess_filetype(content, filename) if ft == "pdf": filecontents.append(content) continue if c2pdf: res = convert_to_pdf(content) if res is None: filename = await save_files_to_folder(files) content = None ft = "dir" break filecontents.append(res) else: filename = await save_files_to_folder(files) content = None ft = "dir" break else: # is executed when the loop was not broken out of filename = filename_to_pdf(filename) ft = "pdf" doc = pymupdf.open() for content in filecontents: doc.insert_pdf(pymupdf.open("pdf", content)) content = doc.tobytes() if ft != "dir": filename = make_filename_unique(filename) locpath = FILES_IN_PROGRESS / filename # locpaths.append(locpath) # cur = db.cursor() # try: sql( "INSERT INTO FIP (filename,filetype,initTimeStamp) Values(?,?,NOW())", (filename, ft), # str(datetime.datetime.now()) return_result=False, ) # except mariadb.Error as e: # print(f"Error: {e}") # raise HTTPException( # status_code=500, detail="Somethings wrong with the database" # ) # try: db.commit() sqlres = sql("SELECT id FROM FIP WHERE filename=?", (filename,)) if len(sqlres) < 1: error(f"FIP Entry with filename {filename} I just created dose not exist") raise HTTPException(status_code=500, detail="Error with the Database") id = sqlres[0][0] # except mariadb.Error as e: # print(f"Error: {e}") # raise HTTPException( # status_code=500, detail="Somethings wrong with the database" # ) # id = cur.fetchone()[0] if content is not None: with open(locpath, "wb") as f: f.write(content) # app.mount("/files", StaticFiles(directory="./app/files/"), name="files") fname = "".join(filename.split(".")[0:-1]) # ftype = filename.split(".")[-1] return { "filename": fname, "filetype": ft, "path": "/files/" + id, "fid": id, } @app.post("/submit/") async def get_submission( lva: Annotated[str, Form()], # LVA Name and Number prof: Annotated[str, Form()], # Vortragender fname: Annotated[str, Form()], # Path to pdf File fileId: Annotated[str, Form()], # UUID of file in FIP table sem: Annotated[str, Form()], # Semester eg. 2024W stype: Annotated[str, Form()], # Type of File eg. Prüfung=>0 subcat: Annotated[str, Form()], # Subcategory of file if the category has subcats ex_date: Annotated[ str, Form() ], # Date of Exam only when type is exam(Klausur/Prüfung) ftype: Annotated[str, Form()], # type of File rects: Annotated[ str, Form() ], # Rechtangles # List[List[Tuple[float, float, float, float]]], pagescales: Annotated[ str, Form() ], # Scales of Pages # Annotated[List[Dict[str, float]], Form()], ocr: Annotated[str, Form()], ): """handles submission""" print( f"lva: {lva}, prof: {prof}, fname {fname}, stype: {stype}, subcat: {subcat}, sem: {sem}, ex_date: {ex_date}, rects: {rects}, pagescales: {pagescales}, ocr: {ocr}" ) info( f"lva: {lva}, prof: {prof}, fname {fname}, stype: {stype}, subcat: {subcat}, sem: {sem}, ex_date: {ex_date}, rects: {rects}, pagescales: {pagescales}, ocr: {ocr}" ) rects_p = json.loads(rects) scales_p = json.loads(pagescales) # cur = db.cursor() # try: res = sql("Select filename from FIP where id=?", (fileId,)) if len(res) < 1: error(f"Submited file ID {fileId} dose not exist in database") if fileId == "greeting": raise HTTPException(400, "You need to upload a file before submitting") raise HTTPException(status_code=400, detail="Submited file dose not exist.") for th in [(lva, "LVA"), (prof, "Prof"), (fname, "Filename"), (sem, "Semmester")]: if th[0] == "": error(f"User tried to upload a file without specifying the {th[1]}") raise HTTPException(400, f"You need to specify a {th[1]}") filepath = FILES_IN_PROGRESS / res[0][0] # except mariadb.Error as e: # print(f"Mariadb Error: {e}") # raise HTTPException( # status_code=500, detail="Somethings wrong with the database" # ) # filepath = "./app/files/" + cur.fetchone()[0] try: dest = make_savepath(lva, prof, stype, subcat, sem, ex_date, fname, ftype) except ValueError as e: error(f"Error creating savepath: {e}") raise HTTPException(status_code=400, detail=f"Error creation savepath: {e}") # censor_finished_flags[fileId] = asyncio.Event() censor_status_datas[fileId] = {} if fileId not in censor_status_update_events: censor_status_update_events[fileId] = asyncio.Event() if ocr == "True": await asyncio.to_thread( censor_pdf_ocr, filepath, dest, rects_p, scales_p, fileId, ) else: await asyncio.to_thread( censor_pdf, filepath, dest, rects_p, scales_p, fileId, ) # return {"done": "ok"} # print(dest) # await censor_finished_flags[fileId].wait() # censor_finished_flags[fileId].clear() info(f"Saved file {fileId} as {dest}") delete_from_FIP(fileId) return FileResponse(dest, content_disposition_type="inline") @app.get("/get_censor_status/{file_id}") async def get_censor_status(file_id: str): """Yields the currrent page being censored and the total number of pages""" if len(sql("Select filename from FIP where id=?", (file_id,))) < 1: raise HTTPException( 400, detail="You are trying to get a status updater for a file that dosent exist.", ) if file_id not in censor_status_update_events: censor_status_update_events[file_id] = asyncio.Event() return StreamingResponse( yield_censor_status(file_id), media_type="text/event-stream" ) async def yield_censor_status(file_id: str): """Internal function to yield updates to the stream""" while True: await censor_status_update_events[file_id].wait() censor_status_update_events[file_id].clear() yield f"event: censorUpdate\ndata: {json.dumps(censor_status_datas[file_id])}\n\n" if censor_status_datas[file_id]["done"]: del censor_status_update_events[file_id] del censor_status_datas[file_id] return def censor_pdf( path: os.PathLike, destpath: os.PathLike, rects: List[List[List[float]]], scales: List[Dict[str, float]], file_id: str, ): """Censors pdf and saves the file to the given Destpath. Args: path: path to the pdf document destpath: Path where the result is supposed to be saved to rects: Coordinates of rectangles to be placed on the pdf document scales: Scales of the rects coordinates for the pdf document secure: weather or not the pdf document is supposed to be converted into an Image (and back) to make shure, the censoring is irreversible Returns: None """ doc = pymupdf.open(path) page = doc[0] npage = doc.page_count for i in range(npage): page = doc[i] if i < len(rects) and rects[i] != []: print(i) wfac = page.rect.width / scales[i]["width"] hfac = page.rect.height / scales[i]["height"] for rect in rects[i]: prect = pymupdf.Rect( rect[0] * wfac, rect[1] * hfac, (rect[0] + rect[2]) * wfac, (rect[1] + rect[3]) * hfac, ) page.add_redact_annot( prect, fill=(0, 0, 0), ) page.apply_redactions() censor_status_datas[file_id]["page"] = i + 1 censor_status_datas[file_id]["pages"] = npage censor_status_datas[file_id]["done"] = False censor_status_update_events[file_id].set() doc.set_metadata({}) doc.save(destpath, garbage=4, deflate=True, clean=True) censor_status_datas[file_id]["done"] = True censor_status_update_events[file_id].set() def censor_pdf_ocr( path: os.PathLike, destpath: os.PathLike, rects: List[List[List[float]]], scales: List[Dict[str, float]], file_id: str, ): """Censors pdf and runs OCR The file is converted to Pixels and then recreated. Saves the file to the given Destpath. Args: path: path to the pdf document destpath: Path where the result is supposed to be saved to rects: Coordinates of rectangles to be placed on the pdf document scales: Scales of the rects coordinates for the pdf document secure: weather or not the pdf document is supposed to be converted into an Image (and back) to make shure, the censoring is irreversible Returns: None """ doc = pymupdf.open(path) output = pymupdf.open() page = doc[0] npage = doc.page_count for i in range(npage): page = doc[i] if i < len(rects) and rects[i] != []: print(i) wfac = page.rect.width / scales[i]["width"] hfac = page.rect.height / scales[i]["height"] for rect in rects[i]: prect = pymupdf.Rect( rect[0] * wfac, rect[1] * hfac, (rect[0] + rect[2]) * wfac, (rect[1] + rect[3]) * hfac, ) page.draw_rect( prect, color=(0, 0, 0), fill=(0, 0, 0), ) censor_status_datas[file_id]["page"] = i + 1 censor_status_datas[file_id]["pages"] = npage censor_status_datas[file_id]["done"] = False censor_status_update_events[file_id].set() # THis Costs us dearly bitmap = page.get_pixmap(dpi=400) pdf_bytes = bitmap.pdfocr_tobytes( language="deu", tessdata="/usr/share/tessdata/", # tesseract needs to be installed; this is the path to thetesseract files ) output.insert_pdf(pymupdf.Document(stream=pdf_bytes)) # End of the costly part print(f"Page {i + 1}/{npage}: CENSORING DONE") output.save(destpath) censor_status_datas[file_id]["done"] = True censor_status_update_events[file_id].set() def test_function(i: int) -> bytes: return b"\x00\x66\x99" async def censor_page(page: pymupdf.Page) -> bytes: bitmap = page.get_pixmap(dpi=400) pdf_bytes = bitmap.pdfocr_tobytes( language="deu", tessdata="/usr/share/tessdata/", # tesseract needs to be installed; this is the path to thetesseract files ) # print(pdf_bytes) return pdf_bytes # def save_without_censoring(dest) async def is_LVID(term: str) -> bool: """Returns weather a string has the format of a LVA ID""" if re.match(r"[a-zA-Z0-9]{3}\.[a-zA-Z0-9]*", term): return True if term.isdigit(): return True return False def remove_duplicates( results: List[Dict[str, str | int]], ) -> List[Dict[str, str | int]]: """removes duplicate file Ids""" ids = [] res = [] for result in results: if result["id"] in ids: continue ids.append(result["id"]) res.append(result) return res def make_savepath( lva: str, prof: str, cat: str, subcat: str, sem: str, ex_date: str, fname: str, ftype: str, ) -> os.PathLike: """Generates the path, the file is saved to after the upload process is finished. It creates all nessecery directories.""" lv = get_lvpath(lva) lvpath = Path(lv[1]) pf = get_profpath(prof, lv[0]) pfpath = Path(pf[1]) catpath = Path(CATEGORIES[int(cat)]) scpath = "" if int(cat) in SUBCAT_CATEGORIES_I and subcat != "": sc = get_subcatpath(subcat, int(cat), pf[0], lv[0]) scpath = Path(sc[1]) if int(cat) == 6: savepath = UNIZEUG_PATH / (lv[1] + "_Multimedia_only/") / pfpath else: savepath = UNIZEUG_PATH / lvpath / pfpath / catpath / scpath os.makedirs(savepath, exist_ok=True) filename = sem + "_" if int(cat) in EX_DATE_CATEGORIES_I: try: yyyy, mm, dd = ex_date.split("-") except ValueError as e: error( f"ValueError: f{e}. Probably caused by user not specifying a date where a date is required" ) raise HTTPException( 400, "You have not specified a date for an upload that requires a date like an exam.", ) filename = yyyy + "_" + mm + "_" + dd + "_" filename += fname file = filename + "." + ftype destpath = savepath / file i = 0 while destpath.is_file(): file = filename + f"_{i}." + ftype i += 1 destpath = savepath / file destpath.touch() return savepath / file def get_lvpath(lva: str) -> Tuple[int, str]: """returns the path in UNIZEUG from a LVA based on its LVID (or name) that may be within a string. It uses the path within the database. If there is no Entry with a fitting LVID in the database it creates a new LVA. Returns: (id,path)""" # cur = db.cursor() lvid = re.search(r"[a-zA-Z0-9]{3}\.[a-zA-Z0-9]{3}", lva) if lvid is not None: res = sql( "SELECT id,lvpath FROM LVAs WHERE lvid=?", (lvid.group()[:3] + lvid.group()[4:],), ) # res = cur.fetchone() if len(res) > 0: return res[0] else: return makenew(lva, "LVAs") else: res = sql("SELECT id,lvpath FROM LVAs WHERE lvname=?", (lva,)) # res = cur.fetchone() if len(res) > 0: return res[0] else: return makenew(lva, "LVAs") def get_profpath(prof: str, lid: int) -> Tuple[int, str]: """Generates the foldername for a prof based on his name. It searches the database for matches. Returns: (id,name)""" # cur = db.cursor() prof = prof.replace("_", " ") res = sql("SELECT id,name FROM Profs WHERE name=?", (prof,)) # res = cur.fetchall() print(res != []) if res is not None and res != []: ret = (res[0][0], res[0][1].replace(" ", "_")) # sql("SELECT * FROM LPLink WHERE LId=? AND PId=?", (lid, ret[0])) if sql("SELECT * FROM LPLink WHERE LId=? AND PId=?", (lid, ret[0])) is None: linkLP(lid, ret[0]) return ret fname, lname = prof.split(" ") res = sql("SELECT id,name FROM Profs WHERE name like ?", (lname + " " + fname,)) # res = cur.fetchall() if res is not None and res != []: ret = (res[0][0], res[0][1].replace(" ", "_")) # sql("SELECT * FROM LPLink WHERE LId=? AND PId=?", (lid, ret[0])) if sql("SELECT * FROM LPLink WHERE LId=? AND PId=?", (lid, ret[0])) is None: linkLP(lid, ret[0]) return ret ret = makenew(prof, "Profs") linkLP(lid, ret[0]) return ret def get_subcatpath(subcat: str, cat: int, pid: int, lid: int) -> Tuple[int, str]: """Generates the subcat path from a subcat name. Returns: (id,name)""" # cur = db.cursor() res = sql( "SELECT id,name FROM SubCats WHERE LId=? AND PId=? AND cat=? AND name=?", (lid, pid, cat, subcat), ) # res = cur.fetchone() if res == []: return makenew(subcat, "SubCats", LId=lid, PId=pid, cat=cat) return res[0] def makenew(input: str, table: str, **kwargs) -> Tuple[int, str]: """Generates new Entrys in the database for LVAs, Profs, SUBCATS. Returns: (id,name/path)""" # cur = db.cursor() if table == "LVAs": lvaid = re.search(r"[a-zA-Z0-9]{3}\.[a-zA-Z0-9]{3}", input) if lvaid is None: raise ValueError("LVA needs to have a LVA ID to be inserted into the table") lvid = lvaid.group()[:3] + lvaid.group()[4:] lvname = re.sub(r"[_ -]*[a-zA-Z0-9]{3}\.[a-zA-Z0-9]{3}[_ -]*", "", input) lvpath = lvname + "_" + lvaid.group() sql( "INSERT INTO LVAs(lvid,lvname,lvpath) VALUES(?,?,?)", (lvid, lvname, lvpath), return_result=False, ) # cur.execute("SELECT id,lvpath FROM LVAs WHERE lvid=?", (lvid,)) db.commit() return sql("SELECT id,lvpath FROM LVAs WHERE lvid=?", (lvid,))[0] querry = "INSERT INTO " + table + "(name" values = [input] nvals = 0 for k, v in kwargs.items(): values.append(v) querry += "," + k nvals += 1 querry += ") VALUES(?" + nvals * ",?" + ")" sql(querry, tuple(values), return_result=False) sqlres = sql("SELECT id,name FROM " + table + " WHERE name=?", (input,)) db.commit() if len(sqlres) < 1: error(f"Entry into {table} with name {input}, I just created dose not exist") raise HTTPException(status_code=500, detail="Error with Database") res = sqlres[0] # res = cur.fetchone() if table == "Profs": return (res[0], res[1].replace(" ", "_")) return res def linkLP(lid: int, pid: int): """declares that a Prof (id in database) offers a LVA (id in database)""" # cur = db.cursor() sql("INSERT INTO LPLink(LId,PId) VALUES(?,?)", (lid, pid), return_result=False) db.commit() def convert_to_pdf(file: bytes) -> bytes | None: """Converts an image(thats all thats implemented right now) into a pdf.""" # ft = filetype.guess(file) # cid = hash(file) # if ( # ft.mime # == "application/vnd.openxmlformats-officedocument.wordprocessingml.document" # ): # with open(f"./app/convert_temp/input{cid}.docx", "wb") as f: # f.write(file) # docx2pdf.convert( # f"./app/convert_temp/input{cid}.docx", f"./app/convert_temp/output{cid}.pdf" # ) # with open(f"./app/convert_temp/output{cid}.pdf", "rb") as f: # cont = f.read() # os.remove(f"./app/convert_temp/input{cid}.docx") # os.remove(f"./app/convert_temp/output{cid}.pdf") # return cont # elif ( # ft.mime # == "application/vnd.openxmlformats-officedocument.presentationml.presentation" # ): # with open("f./app/convert_temp/input{cid}.pptx", "wb") as f: # f.write(file) # pptxtopdf.convert( # f"./app/convert_temp/input{cid}.pptx", f"./app/convert_temp/output{cid}.pdf" # ) # with open(f"./app/convert_temp/output{cid}.pdf", "rb") as f: # cont = f.read() # os.remove(f"./app/convert_temp/input{cid}.pptx") # os.remove(f"./app/convert_temp/output{cid}.pdf") # return cont try: doc = pymupdf.Document(stream=file) return doc.convert_to_pdf() except (pymupdf.mupdf.FzErrorUnsupported, pymupdf.FileDataError) as e: error(f"Error converting Image to pdf file: {e}") print(e) return None def filename_to_pdf(filename: str) -> str: """converts any filename.any to filename.pdf""" farr = filename.split(".") if len(farr) > 1: farr[-1] = "pdf" filename = ".".join(farr) else: filename = filename + ".pdf" return filename def make_filename_unique(filename: str, idx: int | None = None) -> str: """makes sure, there are no duplicate filenames in the temporary folder""" # cur = db.cursor() res = sql("SELECT id FROM FIP WHERE filename=?", (filename,)) # res = cur.fetchall() if res is not None and len(res) > 0: farr = filename.split(".") if len(farr) > 1: farr[-2] = ( farr[-2][:-1] + str(idx + 1) if idx is not None else farr[-2] + "_0" ) filename = ".".join(farr) else: filename = ( filename[:-1] + str(idx + 1) if idx is not None else filename + "_0" ) idx = 0 if idx is None else idx + 1 idx = idx if idx < 10 else idx - 10 filename = make_filename_unique(filename, idx) return filename async def save_files_to_folder(files: List[UploadFile]) -> str: """saves file to files in prograss folder""" filename = files[0].filename if files[0].filename is not None else "None" filename = filename.split(".")[0] if filename == "": filename = "None" filename = make_filename_unique(filename) os.mkdir(FILES_IN_PROGRESS / filename) for idx, file in enumerate(files): fn = file.filename if file.filename is not None else "None" + str(idx) with open(FILES_IN_PROGRESS / filename / fn, "wb") as f: f.write(await file.read()) return filename # async def get_submittion(request: Request): # reqJson = await request.form() # print(reqJson) # return {"done": "ok"} def guess_filetype(content: bytes, filename: str) -> str: """Guesses the filetype of a file based on first the sontent, If that fails the extension in teh filename. If no conclusion can be reached it reutrns an empty string""" ftyp = filetype.guess(content) if ftyp is not None: return ftyp.extension farr = filename.split(".") if len(farr) > 1: return filename.split(".")[-1] return "" @app.get("/remove_old") async def remove_old_FIP_entrys(): files = sqlT( "SELECT id,filename FROM FIP WHERE HOUR(TIMEDIFF(NOW(),initTimeStamp)) > 24 " ) info(f"Remove Files: {files}") for file in files: sql("DELETE FROM FIP WHERE id=?", (file["id"]), return_result=False) os.remove(FILES_IN_PROGRESS / file["filename"]) # sql( # "DELETE FROM FIP WHERE HOUR(TIMEDIFF(NOW(),initTimeStamp)) > 24", # return_result=False, # ) db.commit() return FileResponse(APP_ROOT_PATH / "index.html") def delete_from_FIP(uuid: str): res = sqlT("SELECT filename FROM FIP WHERE id=?", (uuid,)) if len(res) < 1: raise HTTPException(500, "I am trying to delete a file that dose not exist") sql("DELETE FROM FIP WHERE id=?", (uuid,), return_result=False, commit=True) os.remove(FILES_IN_PROGRESS / res[0]["filename"])