from typing import Annotated from typing import List, Dict, Tuple, Sequence from starlette.responses import StreamingResponse from annotated_types import IsDigit from fastapi import FastAPI, File, HTTPException, UploadFile, Request, Form from fastapi.responses import FileResponse # import multiprocessing # import threading # import concurrent.futures import asyncio # import fastapi from fastapi.staticfiles import StaticFiles import pymupdf # import fitz as pymupdf import json import re import os import signal import mariadb import sys import filetype import logging import pathlib from pathlib import Path from starlette.types import HTTPExceptionHandler log = logging.getLogger(__name__) logging.basicConfig( filename=os.environ.get("APP_LOG_PATH"), level=logging.INFO, format="[%(asctime)s, %(filename)s:%(lineno)s -> %(funcName)10s()] %(levelname)s: %(message)s", ) debug = log.debug info = log.info error = log.error critical = log.critical def exception_handler(etype, value, tb): log.exception(f"Uncought Exception: {value}") sys.excepthook = exception_handler db = mariadb.connect( host=os.environ.get("DB_HOST", "db"), user=os.environ.get("DB_USER", "user"), password=os.environ.get("DB_PASSWORD", "DBPASSWORD"), database=os.environ.get("DB_DATABASE", "unizeug"), ) info("App Started") # remove_old_FIP_entrys() # startup() app = FastAPI() CATEGORIES = [ "Prüfungen", "Klausuren", "Übungen", "Labore", "Unterlagen", "Zusammenfassungen", "Multimedia", ] APP_ROOT_PATH = Path(os.environ.get("APP_ROOT_PATH", "./app")) SUBCAT_CATEGORIES = ["Klausuren", "Übungen", "Labore"] SUBCAT_CATEGORIES_I = [1, 2, 3] EX_DATE_CATEGORIES = ["Prüfungen", "Klausuren"] EX_DATE_CATEGORIES_I = [0, 1] UNIZEUG_PATH = Path(os.environ.get("UNIZEUG_PATH", "./app/dest")) FILES_IN_PROGRESS = APP_ROOT_PATH / "files/" EMPTYFILE = APP_ROOT_PATH / "graphics/empty.pdf" UNSUPPORTEDFILE = APP_ROOT_PATH / "graphics/unsupported.pdf" GREETINGFILE = APP_ROOT_PATH / "graphics/greeting.pdf" FAVICON = APP_ROOT_PATH / "favicon" STATIC_FILES = APP_ROOT_PATH / "static" # cur = db.cursor() # cur.execute("select * from FIP;") # for l in cur: # print(l) # locpaths = ["./VO_Mathematik_3.pdf"] # replace this with a database censor_status_update_events: Dict[str, asyncio.Event] = {} censor_status_datas: Dict[str, Dict[str, int | None | str | bool]] = {} # censor_finished_flags: Dict[str, asyncio.Event] = {} def _sql_quarry( cursor: mariadb.Cursor, querry: str, data: Tuple[str | int, ...] | int | str, return_result: bool, commit: bool, ) -> List: datas: Tuple[str | int, ...] if type(data) is str or type(data) is int: datas = (data,) elif type(data) is tuple: datas = data try: cursor.execute(querry, datas) if commit: db.commit() if return_result: return cursor.fetchall() else: return [] except mariadb.Error as e: error(f"Mariadb Error: '{e}' from Querry: '{querry}' with variables: {data}") raise HTTPException( status_code=500, detail="Somethings wrong with the database" ) def sql_connector_is_active(connector: mariadb.Connection) -> bool: try: connector.ping() except mariadb.Error as e: return False return True def sql_connect(connector: mariadb.Connection) -> mariadb.Connection: try: connector = mariadb.connect( host=os.environ.get("DB_HOST", "db"), user=os.environ.get("DB_USER", "user"), password=os.environ.get("DB_PASSWORD", "DBPASSWORD"), database=os.environ.get("DB_DATABASE", "Unizeug"), ) except mariadb.Error as e: critical( f"Cannot reconnect to Database {os.environ.get('DB_DATABASE', 'Unizeug')} on {os.environ.get('DB_HOST', 'db')}. Got Mariadb Error: {e}" ) os.kill(os.getpid(), signal.SIGTERM) raise HTTPException(500, detail="Database failed") return connector def sql( querry: str, data: Tuple[str | int, ...] | str | int = (), return_result: bool = True, commit: bool = False, ) -> List[Tuple]: global db if not sql_connector_is_active(db): db = sql_connect(db) cur = db.cursor(dictionary=False) return _sql_quarry(cur, querry, data, return_result, commit) def sqlT( querry: str, data: tuple[str | int, ...] | str | int = (), return_result: bool = True, commit: bool = False, ) -> List[Dict]: global db if not sql_connector_is_active(db): db = sql_connect(db) cur = db.cursor(dictionary=True) return _sql_quarry(cur, querry, data, return_result, commit) # datas:Tuple[str|int,...] # if type(data) is str or type(data) is int: # datas = (data,) # else: # datas = data # try: # cur.execute(querry, datas) # return cur.fetchall() # except mariadb.Error as e: # error(f"Mariadb Error: {e}") # raise HTTPException( # status_code=500, detail="Somethings wrong with the database" # ) app.mount( "/favicon", StaticFiles(directory=os.environ.get("FAVICON_PATH", FAVICON)), name="favicon", ) app.mount( "/static", StaticFiles(directory=os.environ.get("STATIC_PATH", STATIC_FILES)), name="static", ) @app.get("/") async def get_index(): """gives the Index.html file""" return FileResponse(APP_ROOT_PATH / "index.html") @app.get("/files/{file_id}") async def get_file(file_id: str): """returns the file that cooorosponds with the given ID""" if file_id == "unsupported": error("User uploadad unsupported file") return FileResponse(UNSUPPORTEDFILE) if file_id == "empty": error("User uploaded empty file") return FileResponse(EMPTYFILE) if file_id == "greeting": return FileResponse(GREETINGFILE) # cur = db.cursor() # try: res = sql("Select filename from FIP where id=?", (file_id,)) if len(res) < 1: error("File ID a user is trying to reach dose not exist") raise HTTPException(status_code=404, detail="File dose ot exist") filename = res[0][0] # except mariadb.Error as e: # error(f"Mariadb Error: {e}") # raise HTTPException( # status_code=500, detail="Somethings wrong with the database" # ) # filename = cur.fetchone()[0] return FileResponse(FILES_IN_PROGRESS / filename) @app.get("/search/lva") async def search_lva( searchterm: str = "", pid: str | None = None, searchlim: int = 10 ) -> List[Dict[str, int | str]]: """returns the LVA for a search in the database""" res: List[Dict[str, str | int]] = [] zw: List[Dict[str, str | int]] = [] # cur = db.cursor(dictionary=True) if await is_LVID(searchterm): res += sqlT( "SELECT id,lvid,lvname FROM LVAs WHERE lvid LIKE ?", (searchterm + "%",), ) # res = cur.fetchall() else: if pid is not None: res += sqlT( "SELECT LVAs.id,LVAs.lvid,LVAs.lvname FROM LVAs LEFT JOIN LPLink ON LVAs.id=LPLink.lid WHERE lvname like ? AND pid=?", (searchterm + "%", pid), ) # res += cur.fetchall() res += sqlT( "SELECT LVAs.id,LVAs.lvid,LVAs.lvname FROM LVAs LEFT JOIN LPLink ON LVAs.id=LPLink.lid WHERE lvname like ? AND pid=?", ("%" + searchterm + "%", pid), ) # res += cur.fetchall() zw += sqlT( "SELECT LVAs.id,LVAs.lvid,LVAs.lvname FROM LVAs LEFT JOIN LPLink ON LVAs.id=LPLink.lid WHERE pid=?", (pid,), ) # zw = cur.fetchall() if searchterm != "": res += sqlT( "SELECT id,lvid,lvname FROM LVAs WHERE lvname LIKE ?", (searchterm + "%",), ) # res += cur.fetchall() res += sqlT( "SELECT id,lvid,lvname FROM LVAs WHERE lvname LIKE ?", ("%" + searchterm + "%",), ) # res += cur.fetchall() res = remove_duplicates(res + zw) info( f"LVA Search: {searchterm}; Result: {res[: (searchlim if searchlim != 0 else -1)]}" ) if searchlim == 0: return res else: return res[:searchlim] @app.get("/search/prof") async def search_profs( searchterm: str = "", lid: int | None = None, searchlim: int = 10 ) -> List[Dict[str, str | int]]: """returns the Prof for a searchterm and LVA id""" res: List[Dict[str, str | int]] = [] zw: List[Dict[str, str | int]] = [] # cur = db.cursor(dictionary=True) if lid is not None: # cur.execute("SELECT id FROM LVAs WHERE LVId=?", (lvid,)) # lid = cur.fetchall()[0]["id"] res += sqlT( "SELECT Profs.id,Profs.name FROM Profs LEFT JOIN LPLink ON Profs.id=LPLink.pid WHERE name like ? AND lid=?", ("%" + searchterm + "%", lid), ) # res = cur.fetchall() zw += sqlT( "SELECT Profs.id,Profs.name FROM Profs LEFT JOIN LPLink ON Profs.id=LPLink.pid WHERE name NOT like ? AND lid=?", ("%" + searchterm + "%", lid), ) # zw = cur.fetchall() if searchterm != "": res += sqlT( "SELECT id,name FROM Profs WHERE name LIKE ?", ("%" + searchterm + "%",), ) # res += cur.fetchall() res = remove_duplicates(res + zw) info( f"Prof Search: {searchterm}; Result: {res[: (searchlim if searchlim != 0 else -1)]}" ) if searchlim == 0: return res else: return res[:searchlim] @app.get( "/search/subcat" ) # NOT FULLY TESTED DUE TO INCOMPLETE DATABASE DUE TO INACCEPTABLE FOLDERSTRUCTURE async def search_subcats( searchterm: str = "", lid: int | None = None, pid: int | None = None, cat: int | None = None, searchlim: int = 10, ) -> List[Dict[str, str | int]]: """searches for avaliable subcatrgories in a specific LVA with a specific Prof(optional)""" res = [] rest = [] # cur = db.cursor(dictionary=True) if not (lid is None or pid is None or cat is None): # Rest is available # cur.execute("SELECT id FROM LVAs WHERE LVId=?", (lvid,)) # lid = cur.fetchall()[0]["id"] rest = sqlT( "SELECT id,name FROM SubCats WHERE lid=? AND pid=? AND cat=?", (lid, pid, cat), ) # rest = cur.fetchall() if searchterm != "": # searchterm is available if not (lid is None or pid is None or cat is None): res = sqlT( "SELECT id,name FROM SubCats WHERE lid=? AND pid=? AND cat=? AND name LIKE ?", (lid, pid, cat, "%" + searchterm + "%"), ) # res = cur.fetchall() res += sqlT( "SELECT id,name FROM SubCats WHERE name LIKE ?", ("%" + searchterm + "%",) ) # res += cur.fetchall() res = remove_duplicates(res + rest) info( f"Subcatrgory Search: {searchterm}; Result: {res[: (searchlim if searchlim != 0 else -1)]}" ) if searchlim == 0: return res else: return res[:searchlim] # @app.post("/files/") # async def create_file(file: Annotated[bytes, File()]): # return {"filesize": len(file)} @app.post("/uploadfile/") async def create_upload_file(files: List[UploadFile], c2pdf: bool = True): """Handles files uploaded. generates ID; saves file; saves path in database""" if len(files) == 0: raise HTTPException(status_code=400, detail="No files found in file submission") filename = files[0].filename if files[0].filename is not None else "None" if len(files) == 1: content = await files[0].read() ft: str = guess_filetype(content, filename) if c2pdf and ft != "pdf": ret = convert_to_pdf(content) if ret is not None: content = ret filename = filename_to_pdf(filename) ft = "pdf" else: filecontents = [] for file in files: content = await file.read() ft = guess_filetype(content, filename) if ft == "pdf": filecontents.append(content) continue if c2pdf: res = convert_to_pdf(content) if res is None: filename = await save_files_to_folder(files) content = None ft = "dir" break filecontents.append(res) else: filename = await save_files_to_folder(files) content = None ft = "dir" break else: # is executed when the loop was not broken out of filename = filename_to_pdf(filename) ft = "pdf" doc = pymupdf.open() for content in filecontents: doc.insert_pdf(pymupdf.open("pdf", content)) content = doc.tobytes() if ft != "dir": filename = make_filename_unique(filename) locpath = FILES_IN_PROGRESS / filename # locpaths.append(locpath) # cur = db.cursor() # try: sql( "INSERT INTO FIP (filename,filetype,initTimeStamp) Values(?,?,NOW())", (filename, ft), # str(datetime.datetime.now()) return_result=False, ) # except mariadb.Error as e: # print(f"Error: {e}") # raise HTTPException( # status_code=500, detail="Somethings wrong with the database" # ) # try: db.commit() sqlres = sql("SELECT id FROM FIP WHERE filename=?", (filename,)) if len(sqlres) < 1: error(f"FIP Entry with filename {filename} I just created dose not exist") raise HTTPException(status_code=500, detail="Error with the Database") id = sqlres[0][0] # except mariadb.Error as e: # print(f"Error: {e}") # raise HTTPException( # status_code=500, detail="Somethings wrong with the database" # ) # id = cur.fetchone()[0] if content is not None: with open(locpath, "wb") as f: f.write(content) # app.mount("/files", StaticFiles(directory="./app/files/"), name="files") fname = "".join(filename.split(".")[0:-1]) # ftype = filename.split(".")[-1] return { "filename": fname, "filetype": ft, "path": "/files/" + id, "fid": id, } @app.post("/submit/") async def get_submission( lva: Annotated[str, Form()], # LVA Name and Number prof: Annotated[str, Form()], # Vortragender fname: Annotated[str, Form()], # Path to pdf File fileId: Annotated[str, Form()], # UUID of file in FIP table sem: Annotated[str, Form()], # Semester eg. 2024W stype: Annotated[str, Form()], # Type of File eg. Prüfung=>0 subcat: Annotated[str, Form()], # Subcategory of file if the category has subcats ex_date: Annotated[ str, Form() ], # Date of Exam only when type is exam(Klausur/Prüfung) ftype: Annotated[str, Form()], # type of File rects: Annotated[ str, Form() ], # Rechtangles # List[List[Tuple[float, float, float, float]]], pagescales: Annotated[ str, Form() ], # Scales of Pages # Annotated[List[Dict[str, float]], Form()], ocr: Annotated[str, Form()], ): """handles submission""" print( f"lva: {lva}, prof: {prof}, fname {fname}, stype: {stype}, subcat: {subcat}, sem: {sem}, ex_date: {ex_date}, rects: {rects}, pagescales: {pagescales}, ocr: {ocr}" ) info( f"Got Submission: lva: {lva}, prof: {prof}, fname {fname}, stype: {stype}, subcat: {subcat}, sem: {sem}, ex_date: {ex_date}, rects: {rects}, pagescales: {pagescales}, ocr: {ocr}" ) rects_p = json.loads(rects) scales_p = json.loads(pagescales) # cur = db.cursor() # try: res = sql("Select filename from FIP where id=?", (fileId,)) if len(res) < 1: error(f"Submited file ID {fileId} dose not exist in database") if fileId == "greeting": raise HTTPException(400, "You need to upload a file before submitting") raise HTTPException(status_code=400, detail="Submited file dose not exist.") for th in [(lva, "LVA"), (prof, "Prof"), (fname, "Filename"), (sem, "Semmester")]: if th[0] == "": error(f"User tried to upload a file without specifying the {th[1]}") raise HTTPException(400, f"You need to specify a {th[1]}") filepath = FILES_IN_PROGRESS / res[0][0] # except mariadb.Error as e: # print(f"Mariadb Error: {e}") # raise HTTPException( # status_code=500, detail="Somethings wrong with the database" # ) # filepath = "./app/files/" + cur.fetchone()[0] try: dest = make_savepath(lva, prof, stype, subcat, sem, ex_date, fname, ftype) except ValueError as e: error(f"Error creating savepath: {e}") raise HTTPException(status_code=400, detail=f"Error creation savepath: {e}") # censor_finished_flags[fileId] = asyncio.Event() censor_status_datas[fileId] = {} if fileId not in censor_status_update_events: censor_status_update_events[fileId] = asyncio.Event() if ocr == "True": await asyncio.to_thread( censor_pdf_ocr, filepath, dest, rects_p, scales_p, fileId, ) else: await asyncio.to_thread( censor_pdf, filepath, dest, rects_p, scales_p, fileId, ) # return {"done": "ok"} # print(dest) # await censor_finished_flags[fileId].wait() # censor_finished_flags[fileId].clear() info(f"Saved file {fileId} as {dest}") delete_from_FIP(fileId) return FileResponse(dest, content_disposition_type="inline") @app.get("/get_censor_status/{file_id}") async def get_censor_status(file_id: str): """Yields the currrent page being censored and the total number of pages""" if len(sql("Select filename from FIP where id=?", (file_id,))) < 1: raise HTTPException( 400, detail="You are trying to get a status updater for a file that dosent exist.", ) if file_id not in censor_status_update_events: censor_status_update_events[file_id] = asyncio.Event() return StreamingResponse( yield_censor_status(file_id), media_type="text/event-stream" ) async def yield_censor_status(file_id: str): """Internal function to yield updates to the stream""" while True: await censor_status_update_events[file_id].wait() censor_status_update_events[file_id].clear() yield f"event: censorUpdate\ndata: {json.dumps(censor_status_datas[file_id])}\n\n" if censor_status_datas[file_id]["done"]: del censor_status_update_events[file_id] del censor_status_datas[file_id] return def censor_pdf( path: os.PathLike, destpath: os.PathLike, rects: List[List[List[float]]], scales: List[Dict[str, float]], file_id: str, ): """Censors pdf and saves the file to the given Destpath. Args: path: path to the pdf document destpath: Path where the result is supposed to be saved to rects: Coordinates of rectangles to be placed on the pdf document scales: Scales of the rects coordinates for the pdf document secure: weather or not the pdf document is supposed to be converted into an Image (and back) to make shure, the censoring is irreversible Returns: None """ info(f"started Censoring for file {path} to be saved to {destpath}") doc = pymupdf.open(path) page = doc[0] npage = doc.page_count for i in range(npage): page = doc[i] if i < len(rects) and rects[i] != []: print(i) wfac = page.rect.width / scales[i]["width"] hfac = page.rect.height / scales[i]["height"] for rect in rects[i]: prect = pymupdf.Rect( rect[0] * wfac, rect[1] * hfac, (rect[0] + rect[2]) * wfac, (rect[1] + rect[3]) * hfac, ) page.add_redact_annot( prect, fill=(0, 0, 0), ) page.apply_redactions() censor_status_datas[file_id]["page"] = i + 1 censor_status_datas[file_id]["pages"] = npage censor_status_datas[file_id]["done"] = False censor_status_update_events[file_id].set() doc.set_metadata({}) doc.save(destpath, garbage=4, deflate=True, clean=True) censor_status_datas[file_id]["done"] = True censor_status_update_events[file_id].set() def censor_pdf_ocr( path: os.PathLike, destpath: os.PathLike, rects: List[List[List[float]]], scales: List[Dict[str, float]], file_id: str, ): """Censors pdf and runs OCR The file is converted to Pixels and then recreated. Saves the file to the given Destpath. Args: path: path to the pdf document destpath: Path where the result is supposed to be saved to rects: Coordinates of rectangles to be placed on the pdf document scales: Scales of the rects coordinates for the pdf document secure: weather or not the pdf document is supposed to be converted into an Image (and back) to make shure, the censoring is irreversible Returns: None """ info(f"started Censoring in OCR Mode for file {path} to be saved to {destpath}") doc = pymupdf.open(path) output = pymupdf.open() page = doc[0] npage = doc.page_count for i in range(npage): page = doc[i] if i < len(rects) and rects[i] != []: print(i) wfac = page.rect.width / scales[i]["width"] hfac = page.rect.height / scales[i]["height"] for rect in rects[i]: prect = pymupdf.Rect( rect[0] * wfac, rect[1] * hfac, (rect[0] + rect[2]) * wfac, (rect[1] + rect[3]) * hfac, ) page.draw_rect( prect, color=(0, 0, 0), fill=(0, 0, 0), ) censor_status_datas[file_id]["page"] = i + 1 censor_status_datas[file_id]["pages"] = npage censor_status_datas[file_id]["done"] = False censor_status_update_events[file_id].set() # THis Costs us dearly try: bitmap = page.get_pixmap(dpi=400) pdf_bytes = bitmap.pdfocr_tobytes( language="deu", tessdata="/usr/share/tessdata/", # tesseract needs to be installed; this is the path to thetesseract files ) output.insert_pdf(pymupdf.Document(stream=pdf_bytes)) except RuntimeError as e: error( f"Error in OCR for document: {destpath}. Error: {e}. Falling back to standard mode." ) if i < len(rects) and rects[i] != []: for rect in rects[i]: prect = pymupdf.Rect( rect[0] * wfac, rect[1] * hfac, (rect[0] + rect[2]) * wfac, (rect[1] + rect[3]) * hfac, ) page.add_redact_annot( prect, fill=(0, 0, 0), ) page.apply_redactions() output.insert_pdf(page.parent, from_page=page.number, to_page=page.number) # End of the costly part print(f"Page {i + 1}/{npage}: CENSORING DONE") output.save(destpath) censor_status_datas[file_id]["done"] = True censor_status_update_events[file_id].set() def test_function(i: int) -> bytes: return b"\x00\x66\x99" async def censor_page(page: pymupdf.Page) -> bytes: bitmap = page.get_pixmap(dpi=400) pdf_bytes = bitmap.pdfocr_tobytes( language="deu", tessdata="/usr/share/tessdata/", # tesseract needs to be installed; this is the path to thetesseract files ) # print(pdf_bytes) return pdf_bytes # def save_without_censoring(dest) async def is_LVID(term: str) -> bool: """Returns weather a string has the format of a LVA ID""" if re.match(r"[a-zA-Z0-9]{3}\.[a-zA-Z0-9]*", term): return True if term.isdigit(): return True return False def remove_duplicates( results: List[Dict[str, str | int]], ) -> List[Dict[str, str | int]]: """removes duplicate file Ids""" ids = [] res = [] for result in results: if result["id"] in ids: continue ids.append(result["id"]) res.append(result) return res def make_savepath( lva: str, prof: str, cat: str, subcat: str, sem: str, ex_date: str, fname: str, ftype: str, ) -> os.PathLike: """Generates the path, the file is saved to after the upload process is finished. It creates all nessecery directories.""" info(f"Started to make Savepath for '{fname}' in '{lva}' with prof '{prof}'.") lv = get_lvpath(lva) lvpath = Path(lv[1]) pf = get_profpath(prof, lv[0]) pfpath = Path(pf[1]) catpath = Path(CATEGORIES[int(cat)]) scpath: str | os.PathLike = "" if int(cat) in SUBCAT_CATEGORIES_I and subcat != "": sc = get_subcatpath(subcat, int(cat), pf[0], lv[0]) scpath = Path(sc[1]) if int(cat) == 6: savepath = UNIZEUG_PATH / (lv[1] + "_Multimedia_only/") / pfpath else: savepath = UNIZEUG_PATH / lvpath / pfpath / catpath / scpath os.makedirs(savepath, exist_ok=True) filename = sem + "_" if int(cat) in EX_DATE_CATEGORIES_I: try: yyyy, mm, dd = ex_date.split("-") except ValueError as e: error( f"ValueError: f{e}. Probably caused by user not specifying a date where a date is required" ) raise HTTPException( 400, "You have not specified a date for an upload that requires a date like an exam.", ) filename = yyyy + "_" + mm + "_" + dd + "_" filename += fname file = filename + "." + ftype destpath = savepath / file i = 0 while destpath.is_file(): info(f"{destpath} already exists.") file = filename + f"_{i}." + ftype i += 1 destpath = savepath / file destpath.touch() info(f"Path for file to be saved generated as: {savepath / file}") return savepath / file def get_lvpath(lva: str) -> Tuple[int, str]: """returns the path in UNIZEUG from a LVA based on its LVID (or name) that may be within a string. It uses the path within the database. If there is no Entry with a fitting LVID in the database it creates a new LVA. Returns: (id,path)""" # cur = db.cursor() lvid = re.search(r"[a-zA-Z0-9]{3}\.[a-zA-Z0-9]{3}", lva) if lvid is not None: res = sql( "SELECT id,lvpath FROM LVAs WHERE lvid=?", (lvid.group()[:3] + lvid.group()[4:],), ) # res = cur.fetchone() if len(res) > 0: return res[0] else: return makenew(lva, "LVAs") else: res = sql("SELECT id,lvpath FROM LVAs WHERE lvname=?", (lva,)) # res = cur.fetchone() if len(res) > 0: return res[0] else: return makenew(lva, "LVAs") def get_profpath(prof: str, lid: int) -> Tuple[int, str]: """Generates the foldername for a prof based on his name. It searches the database for matches. Returns: (id,name)""" # cur = db.cursor() prof = prof.replace("_", " ") res = sql("SELECT id,name FROM Profs WHERE name=?", (prof,)) # res = cur.fetchall() print(res != []) if res is not None and res != []: ret = (res[0][0], res[0][1].replace(" ", "_")) # sql("SELECT * FROM LPLink WHERE LId=? AND PId=?", (lid, ret[0])) if sql("SELECT * FROM LPLink WHERE LId=? AND PId=?", (lid, ret[0])) is None: linkLP(lid, ret[0]) return ret fname, lname = prof.split(" ") res = sql("SELECT id,name FROM Profs WHERE name like ?", (lname + " " + fname,)) # res = cur.fetchall() if res is not None and res != []: ret = (res[0][0], res[0][1].replace(" ", "_")) # sql("SELECT * FROM LPLink WHERE LId=? AND PId=?", (lid, ret[0])) if sql("SELECT * FROM LPLink WHERE LId=? AND PId=?", (lid, ret[0])) is None: linkLP(lid, ret[0]) return ret ret = makenew(prof, "Profs") linkLP(lid, ret[0]) return ret def get_subcatpath(subcat: str, cat: int, pid: int, lid: int) -> Tuple[int, str]: """Generates the subcat path from a subcat name. Returns: (id,name)""" # cur = db.cursor() res = sql( "SELECT id,name FROM SubCats WHERE LId=? AND PId=? AND cat=? AND name=?", (lid, pid, cat, subcat), ) # res = cur.fetchone() if res == []: return makenew(subcat, "SubCats", LId=lid, PId=pid, cat=cat) return res[0] def makenew(input: str, table: str, **kwargs) -> Tuple[int, str]: """Generates new Entrys in the database for LVAs, Profs, SUBCATS. Returns: (id,name/path)""" # cur = db.cursor() if table == "LVAs": lvaid = re.search(r"[a-zA-Z0-9]{3}\.[a-zA-Z0-9]{3}", input) if lvaid is None: raise ValueError("LVA needs to have a LVA ID to be inserted into the table") lvid = lvaid.group()[:3] + lvaid.group()[4:] lvname = re.sub(r"[_ -]*[a-zA-Z0-9]{3}\.[a-zA-Z0-9]{3}[_ -]*", "", input) lvpath = lvname + "_" + lvaid.group() sql( "INSERT INTO LVAs(lvid,lvname,lvpath) VALUES(?,?,?)", (lvid, lvname, lvpath), return_result=False, ) # cur.execute("SELECT id,lvpath FROM LVAs WHERE lvid=?", (lvid,)) db.commit() return sql("SELECT id,lvpath FROM LVAs WHERE lvid=?", (lvid,))[0] querry = "INSERT INTO " + table + "(name" values = [input] nvals = 0 for k, v in kwargs.items(): values.append(v) querry += "," + k nvals += 1 querry += ") VALUES(?" + nvals * ",?" + ")" sql(querry, tuple(values), return_result=False) sqlres = sql("SELECT id,name FROM " + table + " WHERE name=?", (input,)) db.commit() if len(sqlres) < 1: error(f"Entry into {table} with name {input}, I just created dose not exist") raise HTTPException(status_code=500, detail="Error with Database") res = sqlres[0] # res = cur.fetchone() if table == "Profs": return (res[0], res[1].replace(" ", "_")) return res def linkLP(lid: int, pid: int): """declares that a Prof (id in database) offers a LVA (id in database)""" # cur = db.cursor() sql("INSERT INTO LPLink(LId,PId) VALUES(?,?)", (lid, pid), return_result=False) db.commit() def convert_to_pdf(file: bytes) -> bytes | None: """Converts an image(thats all thats implemented right now) into a pdf.""" # ft = filetype.guess(file) # cid = hash(file) # if ( # ft.mime # == "application/vnd.openxmlformats-officedocument.wordprocessingml.document" # ): # with open(f"./app/convert_temp/input{cid}.docx", "wb") as f: # f.write(file) # docx2pdf.convert( # f"./app/convert_temp/input{cid}.docx", f"./app/convert_temp/output{cid}.pdf" # ) # with open(f"./app/convert_temp/output{cid}.pdf", "rb") as f: # cont = f.read() # os.remove(f"./app/convert_temp/input{cid}.docx") # os.remove(f"./app/convert_temp/output{cid}.pdf") # return cont # elif ( # ft.mime # == "application/vnd.openxmlformats-officedocument.presentationml.presentation" # ): # with open("f./app/convert_temp/input{cid}.pptx", "wb") as f: # f.write(file) # pptxtopdf.convert( # f"./app/convert_temp/input{cid}.pptx", f"./app/convert_temp/output{cid}.pdf" # ) # with open(f"./app/convert_temp/output{cid}.pdf", "rb") as f: # cont = f.read() # os.remove(f"./app/convert_temp/input{cid}.pptx") # os.remove(f"./app/convert_temp/output{cid}.pdf") # return cont try: doc = pymupdf.Document(stream=file) return doc.convert_to_pdf() except (pymupdf.mupdf.FzErrorUnsupported, pymupdf.FileDataError) as e: error(f"Error converting Image to pdf file: {e}") print(e) return None def filename_to_pdf(filename: str) -> str: """converts any filename.any to filename.pdf""" farr = filename.split(".") if len(farr) > 1: farr[-1] = "pdf" filename = ".".join(farr) else: filename = filename + ".pdf" return filename def make_filename_unique(filename: str, idx: int | None = None) -> str: """makes sure, there are no duplicate filenames in the temporary folder""" # cur = db.cursor() res = sql("SELECT id FROM FIP WHERE filename=?", (filename,)) # res = cur.fetchall() if res is not None and len(res) > 0: farr = filename.split(".") if len(farr) > 1: farr[-2] = ( farr[-2][:-1] + str(idx + 1) if idx is not None else farr[-2] + "_0" ) filename = ".".join(farr) else: filename = ( filename[:-1] + str(idx + 1) if idx is not None else filename + "_0" ) idx = 0 if idx is None else idx + 1 idx = idx if idx < 10 else idx - 10 filename = make_filename_unique(filename, idx) return filename async def save_files_to_folder(files: List[UploadFile]) -> str: """saves file to files in prograss folder""" filename = files[0].filename if files[0].filename is not None else "None" filename = filename.split(".")[0] if filename == "": filename = "None" filename = make_filename_unique(filename) os.mkdir(FILES_IN_PROGRESS / filename) for idx, file in enumerate(files): fn = file.filename if file.filename is not None else "None" + str(idx) with open(FILES_IN_PROGRESS / filename / fn, "wb") as f: f.write(await file.read()) return filename # async def get_submittion(request: Request): # reqJson = await request.form() # print(reqJson) # return {"done": "ok"} def guess_filetype(content: bytes, filename: str) -> str: """Guesses the filetype of a file based on first the sontent, If that fails the extension in teh filename. If no conclusion can be reached it reutrns an empty string""" ftyp = filetype.guess(content) if ftyp is not None: return ftyp.extension farr = filename.split(".") if len(farr) > 1: return filename.split(".")[-1] return "" @app.get("/remove_old") async def remove_old_FIP_entrys(): files = sqlT( "SELECT id,filename FROM FIP WHERE HOUR(TIMEDIFF(NOW(),initTimeStamp)) > 24 " ) info(f"Remove Files: {files}") for file in files: sql("DELETE FROM FIP WHERE id=?", (file["id"]), return_result=False) os.remove(FILES_IN_PROGRESS / file["filename"]) # sql( # "DELETE FROM FIP WHERE HOUR(TIMEDIFF(NOW(),initTimeStamp)) > 24", # return_result=False, # ) db.commit() return FileResponse(APP_ROOT_PATH / "index.html") def delete_from_FIP(uuid: str): res = sqlT("SELECT filename FROM FIP WHERE id=?", (uuid,)) if len(res) < 1: raise HTTPException(500, "I am trying to delete a file that dose not exist") sql("DELETE FROM FIP WHERE id=?", (uuid,), return_result=False, commit=True) os.remove(FILES_IN_PROGRESS / res[0]["filename"])