Files
fet_Unizeug/app/main.py
2025-10-30 15:48:51 +01:00

1017 lines
34 KiB
Python

from typing import Annotated
from typing import List, Dict, Tuple, Sequence
from starlette.responses import StreamingResponse
from annotated_types import IsDigit
from fastapi import FastAPI, File, HTTPException, UploadFile, Request, Form
from fastapi.responses import FileResponse
# import multiprocessing
# import threading
# import concurrent.futures
import asyncio
# import fastapi
from fastapi.staticfiles import StaticFiles
import pymupdf
# import fitz as pymupdf
import json
import re
import os
import signal
import mariadb
import sys
import filetype
import logging
import pathlib
from pathlib import Path
from starlette.types import HTTPExceptionHandler
log = logging.getLogger(__name__)
logging.basicConfig(
filename=os.environ.get("APP_LOG_PATH"),
level=logging.INFO,
format="[%(asctime)s, %(filename)s:%(lineno)s -> %(funcName)10s()] %(levelname)s: %(message)s",
)
debug = log.debug
info = log.info
error = log.error
critical = log.critical
def exception_handler(etype, value, tb):
log.exception(f"Uncought Exception: {value}")
sys.excepthook = exception_handler
db = mariadb.connect(
host=os.environ.get("DB_HOST", "db"),
user=os.environ.get("DB_USER", "user"),
password=os.environ.get("DB_PASSWORD", "DBPASSWORD"),
database=os.environ.get("DB_DATABASE", "unizeug"),
)
info("App Started")
# remove_old_FIP_entrys()
# startup()
app = FastAPI()
CATEGORIES = [
"Prüfungen",
"Klausuren",
"Übungen",
"Labore",
"Unterlagen",
"Zusammenfassungen",
"Multimedia",
]
APP_ROOT_PATH = Path(os.environ.get("APP_ROOT_PATH", "./app"))
SUBCAT_CATEGORIES = ["Klausuren", "Übungen", "Labore"]
SUBCAT_CATEGORIES_I = [1, 2, 3]
EX_DATE_CATEGORIES = ["Prüfungen", "Klausuren"]
EX_DATE_CATEGORIES_I = [0, 1]
UNIZEUG_PATH = Path(os.environ.get("UNIZEUG_PATH", "./app/dest"))
FILES_IN_PROGRESS = APP_ROOT_PATH / "files/"
EMPTYFILE = APP_ROOT_PATH / "graphics/empty.pdf"
UNSUPPORTEDFILE = APP_ROOT_PATH / "graphics/unsupported.pdf"
GREETINGFILE = APP_ROOT_PATH / "graphics/greeting.pdf"
FAVICON = APP_ROOT_PATH / "favicon"
STATIC_FILES = APP_ROOT_PATH / "static"
# cur = db.cursor()
# cur.execute("select * from FIP;")
# for l in cur:
# print(l)
# locpaths = ["./VO_Mathematik_3.pdf"] # replace this with a database
censor_status_update_events: Dict[str, asyncio.Event] = {}
censor_status_datas: Dict[str, Dict[str, int | None | str | bool]] = {}
# censor_finished_flags: Dict[str, asyncio.Event] = {}
def _sql_quarry(
cursor: mariadb.Cursor,
querry: str,
data: Tuple[str | int, ...] | int | str,
return_result: bool,
commit: bool,
) -> List:
datas: Tuple[str | int, ...]
if type(data) is str or type(data) is int:
datas = (data,)
elif type(data) is tuple:
datas = data
try:
cursor.execute(querry, datas)
if commit:
db.commit()
if return_result:
return cursor.fetchall()
else:
return []
except mariadb.Error as e:
error(f"Mariadb Error: '{e}' from Querry: '{querry}' with variables: {data}")
raise HTTPException(
status_code=500, detail="Somethings wrong with the database"
)
def sql_connector_is_active(connector: mariadb.Connection) -> bool:
try:
connector.ping()
except mariadb.Error as e:
return False
return True
def sql_connect(connector: mariadb.Connection) -> mariadb.Connection:
try:
connector = mariadb.connect(
host=os.environ.get("DB_HOST", "db"),
user=os.environ.get("DB_USER", "user"),
password=os.environ.get("DB_PASSWORD", "DBPASSWORD"),
database=os.environ.get("DB_DATABASE", "Unizeug"),
)
except mariadb.Error as e:
critical(
f"Cannot reconnect to Database {os.environ.get('DB_DATABASE', 'Unizeug')} on {os.environ.get('DB_HOST', 'db')}. Got Mariadb Error: {e}"
)
os.kill(os.getpid(), signal.SIGTERM)
raise HTTPException(500, detail="Database failed")
return connector
def sql(
querry: str,
data: Tuple[str | int, ...] | str | int = (),
return_result: bool = True,
commit: bool = False,
) -> List[Tuple]:
global db
if not sql_connector_is_active(db):
db = sql_connect(db)
cur = db.cursor(dictionary=False)
return _sql_quarry(cur, querry, data, return_result, commit)
def sqlT(
querry: str,
data: tuple[str | int, ...] | str | int = (),
return_result: bool = True,
commit: bool = False,
) -> List[Dict]:
global db
if not sql_connector_is_active(db):
db = sql_connect(db)
cur = db.cursor(dictionary=True)
return _sql_quarry(cur, querry, data, return_result, commit)
# datas:Tuple[str|int,...]
# if type(data) is str or type(data) is int:
# datas = (data,)
# else:
# datas = data
# try:
# cur.execute(querry, datas)
# return cur.fetchall()
# except mariadb.Error as e:
# error(f"Mariadb Error: {e}")
# raise HTTPException(
# status_code=500, detail="Somethings wrong with the database"
# )
app.mount(
"/favicon",
StaticFiles(directory=os.environ.get("FAVICON_PATH", FAVICON)),
name="favicon",
)
app.mount(
"/static",
StaticFiles(directory=os.environ.get("STATIC_PATH", STATIC_FILES)),
name="static",
)
@app.get("/")
async def get_index():
"""gives the Index.html file"""
return FileResponse(APP_ROOT_PATH / "index.html")
@app.get("/files/{file_id}")
async def get_file(file_id: str):
"""returns the file that cooorosponds with the given ID"""
if file_id == "unsupported":
error("User uploadad unsupported file")
return FileResponse(UNSUPPORTEDFILE)
if file_id == "empty":
error("User uploaded empty file")
return FileResponse(EMPTYFILE)
if file_id == "greeting":
return FileResponse(GREETINGFILE)
# cur = db.cursor()
# try:
res = sql("Select filename from FIP where id=?", (file_id,))
if len(res) < 1:
error("File ID a user is trying to reach dose not exist")
raise HTTPException(status_code=404, detail="File dose ot exist")
filename = res[0][0]
# except mariadb.Error as e:
# error(f"Mariadb Error: {e}")
# raise HTTPException(
# status_code=500, detail="Somethings wrong with the database"
# )
# filename = cur.fetchone()[0]
return FileResponse(FILES_IN_PROGRESS / filename)
@app.get("/search/lva")
async def search_lva(
searchterm: str = "", pid: str | None = None, searchlim: int = 10
) -> List[Dict[str, int | str]]:
"""returns the LVA for a search in the database"""
res: List[Dict[str, str | int]] = []
zw: List[Dict[str, str | int]] = []
# cur = db.cursor(dictionary=True)
if await is_LVID(searchterm):
res += sqlT(
"SELECT id,lvid,lvname FROM LVAs WHERE lvid LIKE ?",
(searchterm + "%",),
)
# res = cur.fetchall()
else:
if pid is not None:
res += sqlT(
"SELECT LVAs.id,LVAs.lvid,LVAs.lvname FROM LVAs LEFT JOIN LPLink ON LVAs.id=LPLink.lid WHERE lvname like ? AND pid=?",
(searchterm + "%", pid),
)
# res += cur.fetchall()
res += sqlT(
"SELECT LVAs.id,LVAs.lvid,LVAs.lvname FROM LVAs LEFT JOIN LPLink ON LVAs.id=LPLink.lid WHERE lvname like ? AND pid=?",
("%" + searchterm + "%", pid),
)
# res += cur.fetchall()
zw += sqlT(
"SELECT LVAs.id,LVAs.lvid,LVAs.lvname FROM LVAs LEFT JOIN LPLink ON LVAs.id=LPLink.lid WHERE pid=?",
(pid,),
)
# zw = cur.fetchall()
if searchterm != "":
res += sqlT(
"SELECT id,lvid,lvname FROM LVAs WHERE lvname LIKE ?",
(searchterm + "%",),
)
# res += cur.fetchall()
res += sqlT(
"SELECT id,lvid,lvname FROM LVAs WHERE lvname LIKE ?",
("%" + searchterm + "%",),
)
# res += cur.fetchall()
res = remove_duplicates(res + zw)
info(
f"LVA Search: {searchterm}; Result: {res[: (searchlim if searchlim != 0 else -1)]}"
)
if searchlim == 0:
return res
else:
return res[:searchlim]
@app.get("/search/prof")
async def search_profs(
searchterm: str = "", lid: int | None = None, searchlim: int = 10
) -> List[Dict[str, str | int]]:
"""returns the Prof for a searchterm and LVA id"""
res: List[Dict[str, str | int]] = []
zw: List[Dict[str, str | int]] = []
# cur = db.cursor(dictionary=True)
if lid is not None:
# cur.execute("SELECT id FROM LVAs WHERE LVId=?", (lvid,))
# lid = cur.fetchall()[0]["id"]
res += sqlT(
"SELECT Profs.id,Profs.name FROM Profs LEFT JOIN LPLink ON Profs.id=LPLink.pid WHERE name like ? AND lid=?",
("%" + searchterm + "%", lid),
)
# res = cur.fetchall()
zw += sqlT(
"SELECT Profs.id,Profs.name FROM Profs LEFT JOIN LPLink ON Profs.id=LPLink.pid WHERE name NOT like ? AND lid=?",
("%" + searchterm + "%", lid),
)
# zw = cur.fetchall()
if searchterm != "":
res += sqlT(
"SELECT id,name FROM Profs WHERE name LIKE ?",
("%" + searchterm + "%",),
)
# res += cur.fetchall()
res = remove_duplicates(res + zw)
info(
f"Prof Search: {searchterm}; Result: {res[: (searchlim if searchlim != 0 else -1)]}"
)
if searchlim == 0:
return res
else:
return res[:searchlim]
@app.get(
"/search/subcat"
) # NOT FULLY TESTED DUE TO INCOMPLETE DATABASE DUE TO INACCEPTABLE FOLDERSTRUCTURE
async def search_subcats(
searchterm: str = "",
lid: int | None = None,
pid: int | None = None,
cat: int | None = None,
searchlim: int = 10,
) -> List[Dict[str, str | int]]:
"""searches for avaliable subcatrgories in a specific LVA with a specific Prof(optional)"""
res = []
rest = []
# cur = db.cursor(dictionary=True)
if not (lid is None or pid is None or cat is None): # Rest is available
# cur.execute("SELECT id FROM LVAs WHERE LVId=?", (lvid,))
# lid = cur.fetchall()[0]["id"]
rest = sqlT(
"SELECT id,name FROM SubCats WHERE lid=? AND pid=? AND cat=?",
(lid, pid, cat),
)
# rest = cur.fetchall()
if searchterm != "": # searchterm is available
if not (lid is None or pid is None or cat is None):
res = sqlT(
"SELECT id,name FROM SubCats WHERE lid=? AND pid=? AND cat=? AND name LIKE ?",
(lid, pid, cat, "%" + searchterm + "%"),
)
# res = cur.fetchall()
res += sqlT(
"SELECT id,name FROM SubCats WHERE name LIKE ?", ("%" + searchterm + "%",)
)
# res += cur.fetchall()
res = remove_duplicates(res + rest)
info(
f"Subcatrgory Search: {searchterm}; Result: {res[: (searchlim if searchlim != 0 else -1)]}"
)
if searchlim == 0:
return res
else:
return res[:searchlim]
# @app.post("/files/")
# async def create_file(file: Annotated[bytes, File()]):
# return {"filesize": len(file)}
@app.post("/uploadfile/")
async def create_upload_file(files: List[UploadFile], c2pdf: bool = True):
"""Handles files uploaded. generates ID; saves file; saves path in database"""
if len(files) == 0:
raise HTTPException(status_code=400, detail="No files found in file submission")
filename = files[0].filename if files[0].filename is not None else "None"
if len(files) == 1:
content = await files[0].read()
ft: str = guess_filetype(content, filename)
if c2pdf and ft != "pdf":
ret = convert_to_pdf(content)
if ret is not None:
content = ret
filename = filename_to_pdf(filename)
ft = "pdf"
else:
filecontents = []
for file in files:
content = await file.read()
ft = guess_filetype(content, filename)
if ft == "pdf":
filecontents.append(content)
continue
if c2pdf:
res = convert_to_pdf(content)
if res is None:
filename = await save_files_to_folder(files)
content = None
ft = "dir"
break
filecontents.append(res)
else:
filename = await save_files_to_folder(files)
content = None
ft = "dir"
break
else: # is executed when the loop was not broken out of
filename = filename_to_pdf(filename)
ft = "pdf"
doc = pymupdf.open()
for content in filecontents:
doc.insert_pdf(pymupdf.open("pdf", content))
content = doc.tobytes()
if ft != "dir":
filename = make_filename_unique(filename)
locpath = FILES_IN_PROGRESS / filename
# locpaths.append(locpath)
# cur = db.cursor()
# try:
sql(
"INSERT INTO FIP (filename,filetype,initTimeStamp) Values(?,?,NOW())",
(filename, ft), # str(datetime.datetime.now())
return_result=False,
)
# except mariadb.Error as e:
# print(f"Error: {e}")
# raise HTTPException(
# status_code=500, detail="Somethings wrong with the database"
# )
# try:
db.commit()
sqlres = sql("SELECT id FROM FIP WHERE filename=?", (filename,))
if len(sqlres) < 1:
error(f"FIP Entry with filename {filename} I just created dose not exist")
raise HTTPException(status_code=500, detail="Error with the Database")
id = sqlres[0][0]
# except mariadb.Error as e:
# print(f"Error: {e}")
# raise HTTPException(
# status_code=500, detail="Somethings wrong with the database"
# )
# id = cur.fetchone()[0]
if content is not None:
with open(locpath, "wb") as f:
f.write(content)
# app.mount("/files", StaticFiles(directory="./app/files/"), name="files")
fname = "".join(filename.split(".")[0:-1])
# ftype = filename.split(".")[-1]
return {
"filename": fname,
"filetype": ft,
"path": "/files/" + id,
"fid": id,
}
@app.post("/submit/")
async def get_submission(
lva: Annotated[str, Form()], # LVA Name and Number
prof: Annotated[str, Form()], # Vortragender
fname: Annotated[str, Form()], # Path to pdf File
fileId: Annotated[str, Form()], # UUID of file in FIP table
sem: Annotated[str, Form()], # Semester eg. 2024W
stype: Annotated[str, Form()], # Type of File eg. Prüfung=>0
subcat: Annotated[str, Form()], # Subcategory of file if the category has subcats
ex_date: Annotated[
str, Form()
], # Date of Exam only when type is exam(Klausur/Prüfung)
ftype: Annotated[str, Form()], # type of File
rects: Annotated[
str, Form()
], # Rechtangles # List[List[Tuple[float, float, float, float]]],
pagescales: Annotated[
str, Form()
], # Scales of Pages # Annotated[List[Dict[str, float]], Form()],
ocr: Annotated[str, Form()],
):
"""handles submission"""
print(
f"lva: {lva}, prof: {prof}, fname {fname}, stype: {stype}, subcat: {subcat}, sem: {sem}, ex_date: {ex_date}, rects: {rects}, pagescales: {pagescales}, ocr: {ocr}"
)
info(
f"Got Submission: lva: {lva}, prof: {prof}, fname {fname}, stype: {stype}, subcat: {subcat}, sem: {sem}, ex_date: {ex_date}, rects: {rects}, pagescales: {pagescales}, ocr: {ocr}"
)
rects_p = json.loads(rects)
scales_p = json.loads(pagescales)
# cur = db.cursor()
# try:
res = sql("Select filename from FIP where id=?", (fileId,))
if len(res) < 1:
error(f"Submited file ID {fileId} dose not exist in database")
if fileId == "greeting":
raise HTTPException(400, "You need to upload a file before submitting")
raise HTTPException(status_code=400, detail="Submited file dose not exist.")
for th in [(lva, "LVA"), (prof, "Prof"), (fname, "Filename"), (sem, "Semmester")]:
if th[0] == "":
error(f"User tried to upload a file without specifying the {th[1]}")
raise HTTPException(400, f"You need to specify a {th[1]}")
filepath = FILES_IN_PROGRESS / res[0][0]
# except mariadb.Error as e:
# print(f"Mariadb Error: {e}")
# raise HTTPException(
# status_code=500, detail="Somethings wrong with the database"
# )
# filepath = "./app/files/" + cur.fetchone()[0]
try:
dest = make_savepath(lva, prof, stype, subcat, sem, ex_date, fname, ftype)
except ValueError as e:
error(f"Error creating savepath: {e}")
raise HTTPException(status_code=400, detail=f"Error creation savepath: {e}")
# censor_finished_flags[fileId] = asyncio.Event()
censor_status_datas[fileId] = {}
if fileId not in censor_status_update_events:
censor_status_update_events[fileId] = asyncio.Event()
if ocr == "True":
await asyncio.to_thread(
censor_pdf_ocr,
filepath,
dest,
rects_p,
scales_p,
fileId,
)
else:
await asyncio.to_thread(
censor_pdf,
filepath,
dest,
rects_p,
scales_p,
fileId,
)
# return {"done": "ok"}
# print(dest)
# await censor_finished_flags[fileId].wait()
# censor_finished_flags[fileId].clear()
info(f"Saved file {fileId} as {dest}")
delete_from_FIP(fileId)
return FileResponse(dest, content_disposition_type="inline")
@app.get("/get_censor_status/{file_id}")
async def get_censor_status(file_id: str):
"""Yields the currrent page being censored and the total number of pages"""
if len(sql("Select filename from FIP where id=?", (file_id,))) < 1:
raise HTTPException(
400,
detail="You are trying to get a status updater for a file that dosent exist.",
)
if file_id not in censor_status_update_events:
censor_status_update_events[file_id] = asyncio.Event()
return StreamingResponse(
yield_censor_status(file_id), media_type="text/event-stream"
)
async def yield_censor_status(file_id: str):
"""Internal function to yield updates to the stream"""
while True:
await censor_status_update_events[file_id].wait()
censor_status_update_events[file_id].clear()
yield f"event: censorUpdate\ndata: {json.dumps(censor_status_datas[file_id])}\n\n"
if censor_status_datas[file_id]["done"]:
del censor_status_update_events[file_id]
del censor_status_datas[file_id]
return
def censor_pdf(
path: os.PathLike,
destpath: os.PathLike,
rects: List[List[List[float]]],
scales: List[Dict[str, float]],
file_id: str,
):
"""Censors pdf and saves the file to the given Destpath.
Args:
path: path to the pdf document
destpath: Path where the result is supposed to be saved to
rects: Coordinates of rectangles to be placed on the pdf document
scales: Scales of the rects coordinates for the pdf document
secure: weather or not the pdf document is supposed to be converted into an Image (and back) to make shure, the censoring is irreversible
Returns:
None
"""
info(f"started Censoring for file {path} to be saved to {destpath}")
doc = pymupdf.open(path)
page = doc[0]
npage = doc.page_count
for i in range(npage):
page = doc[i]
if i < len(rects) and rects[i] != []:
print(i)
wfac = page.rect.width / scales[i]["width"]
hfac = page.rect.height / scales[i]["height"]
for rect in rects[i]:
prect = pymupdf.Rect(
rect[0] * wfac,
rect[1] * hfac,
(rect[0] + rect[2]) * wfac,
(rect[1] + rect[3]) * hfac,
)
page.add_redact_annot(
prect,
fill=(0, 0, 0),
)
page.apply_redactions()
censor_status_datas[file_id]["page"] = i + 1
censor_status_datas[file_id]["pages"] = npage
censor_status_datas[file_id]["done"] = False
censor_status_update_events[file_id].set()
doc.set_metadata({})
doc.save(destpath, garbage=4, deflate=True, clean=True)
censor_status_datas[file_id]["done"] = True
censor_status_update_events[file_id].set()
def censor_pdf_ocr(
path: os.PathLike,
destpath: os.PathLike,
rects: List[List[List[float]]],
scales: List[Dict[str, float]],
file_id: str,
):
"""Censors pdf and runs OCR
The file is converted to Pixels and then recreated.
Saves the file to the given Destpath.
Args:
path: path to the pdf document
destpath: Path where the result is supposed to be saved to
rects: Coordinates of rectangles to be placed on the pdf document
scales: Scales of the rects coordinates for the pdf document
secure: weather or not the pdf document is supposed to be converted into an Image (and back) to make shure, the censoring is irreversible
Returns:
None
"""
info(f"started Censoring in OCR Mode for file {path} to be saved to {destpath}")
doc = pymupdf.open(path)
output = pymupdf.open()
page = doc[0]
npage = doc.page_count
for i in range(npage):
page = doc[i]
if i < len(rects) and rects[i] != []:
print(i)
wfac = page.rect.width / scales[i]["width"]
hfac = page.rect.height / scales[i]["height"]
for rect in rects[i]:
prect = pymupdf.Rect(
rect[0] * wfac,
rect[1] * hfac,
(rect[0] + rect[2]) * wfac,
(rect[1] + rect[3]) * hfac,
)
page.draw_rect(
prect,
color=(0, 0, 0),
fill=(0, 0, 0),
)
censor_status_datas[file_id]["page"] = i + 1
censor_status_datas[file_id]["pages"] = npage
censor_status_datas[file_id]["done"] = False
censor_status_update_events[file_id].set()
# THis Costs us dearly
bitmap = page.get_pixmap(dpi=400)
pdf_bytes = bitmap.pdfocr_tobytes(
language="deu",
tessdata="/usr/share/tessdata/", # tesseract needs to be installed; this is the path to thetesseract files
)
output.insert_pdf(pymupdf.Document(stream=pdf_bytes))
# End of the costly part
print(f"Page {i + 1}/{npage}: CENSORING DONE")
output.save(destpath)
censor_status_datas[file_id]["done"] = True
censor_status_update_events[file_id].set()
def test_function(i: int) -> bytes:
return b"\x00\x66\x99"
async def censor_page(page: pymupdf.Page) -> bytes:
bitmap = page.get_pixmap(dpi=400)
pdf_bytes = bitmap.pdfocr_tobytes(
language="deu",
tessdata="/usr/share/tessdata/", # tesseract needs to be installed; this is the path to thetesseract files
)
# print(pdf_bytes)
return pdf_bytes
# def save_without_censoring(dest)
async def is_LVID(term: str) -> bool:
"""Returns weather a string has the format of a LVA ID"""
if re.match(r"[a-zA-Z0-9]{3}\.[a-zA-Z0-9]*", term):
return True
if term.isdigit():
return True
return False
def remove_duplicates(
results: List[Dict[str, str | int]],
) -> List[Dict[str, str | int]]:
"""removes duplicate file Ids"""
ids = []
res = []
for result in results:
if result["id"] in ids:
continue
ids.append(result["id"])
res.append(result)
return res
def make_savepath(
lva: str,
prof: str,
cat: str,
subcat: str,
sem: str,
ex_date: str,
fname: str,
ftype: str,
) -> os.PathLike:
"""Generates the path, the file is saved to after the upload process is finished. It creates all nessecery directories."""
info(f"Started to make Savepath for '{fname}' in '{lva}' with prof '{prof}'.")
lv = get_lvpath(lva)
lvpath = Path(lv[1])
pf = get_profpath(prof, lv[0])
pfpath = Path(pf[1])
catpath = Path(CATEGORIES[int(cat)])
scpath: str | os.PathLike = ""
if int(cat) in SUBCAT_CATEGORIES_I and subcat != "":
sc = get_subcatpath(subcat, int(cat), pf[0], lv[0])
scpath = Path(sc[1])
if int(cat) == 6:
savepath = UNIZEUG_PATH / (lv[1] + "_Multimedia_only/") / pfpath
else:
savepath = UNIZEUG_PATH / lvpath / pfpath / catpath / scpath
os.makedirs(savepath, exist_ok=True)
filename = sem + "_"
if int(cat) in EX_DATE_CATEGORIES_I:
try:
yyyy, mm, dd = ex_date.split("-")
except ValueError as e:
error(
f"ValueError: f{e}. Probably caused by user not specifying a date where a date is required"
)
raise HTTPException(
400,
"You have not specified a date for an upload that requires a date like an exam.",
)
filename = yyyy + "_" + mm + "_" + dd + "_"
filename += fname
file = filename + "." + ftype
destpath = savepath / file
i = 0
while destpath.is_file():
info(f"{destpath} already exists.")
file = filename + f"_{i}." + ftype
i += 1
destpath = savepath / file
destpath.touch()
info(f"Path for file to be saved generated as: {savepath / file}")
return savepath / file
def get_lvpath(lva: str) -> Tuple[int, str]:
"""returns the path in UNIZEUG from a LVA based on its LVID (or name) that may be within a string. It uses the path within the database. If there is no Entry with a fitting LVID in the database it creates a new LVA. Returns: (id,path)"""
# cur = db.cursor()
lvid = re.search(r"[a-zA-Z0-9]{3}\.[a-zA-Z0-9]{3}", lva)
if lvid is not None:
res = sql(
"SELECT id,lvpath FROM LVAs WHERE lvid=?",
(lvid.group()[:3] + lvid.group()[4:],),
)
# res = cur.fetchone()
if len(res) > 0:
return res[0]
else:
return makenew(lva, "LVAs")
else:
res = sql("SELECT id,lvpath FROM LVAs WHERE lvname=?", (lva,))
# res = cur.fetchone()
if len(res) > 0:
return res[0]
else:
return makenew(lva, "LVAs")
def get_profpath(prof: str, lid: int) -> Tuple[int, str]:
"""Generates the foldername for a prof based on his name. It searches the database for matches. Returns: (id,name)"""
# cur = db.cursor()
prof = prof.replace("_", " ")
res = sql("SELECT id,name FROM Profs WHERE name=?", (prof,))
# res = cur.fetchall()
print(res != [])
if res is not None and res != []:
ret = (res[0][0], res[0][1].replace(" ", "_"))
# sql("SELECT * FROM LPLink WHERE LId=? AND PId=?", (lid, ret[0]))
if sql("SELECT * FROM LPLink WHERE LId=? AND PId=?", (lid, ret[0])) is None:
linkLP(lid, ret[0])
return ret
fname, lname = prof.split(" ")
res = sql("SELECT id,name FROM Profs WHERE name like ?", (lname + " " + fname,))
# res = cur.fetchall()
if res is not None and res != []:
ret = (res[0][0], res[0][1].replace(" ", "_"))
# sql("SELECT * FROM LPLink WHERE LId=? AND PId=?", (lid, ret[0]))
if sql("SELECT * FROM LPLink WHERE LId=? AND PId=?", (lid, ret[0])) is None:
linkLP(lid, ret[0])
return ret
ret = makenew(prof, "Profs")
linkLP(lid, ret[0])
return ret
def get_subcatpath(subcat: str, cat: int, pid: int, lid: int) -> Tuple[int, str]:
"""Generates the subcat path from a subcat name. Returns: (id,name)"""
# cur = db.cursor()
res = sql(
"SELECT id,name FROM SubCats WHERE LId=? AND PId=? AND cat=? AND name=?",
(lid, pid, cat, subcat),
)
# res = cur.fetchone()
if res == []:
return makenew(subcat, "SubCats", LId=lid, PId=pid, cat=cat)
return res[0]
def makenew(input: str, table: str, **kwargs) -> Tuple[int, str]:
"""Generates new Entrys in the database for LVAs, Profs, SUBCATS. Returns: (id,name/path)"""
# cur = db.cursor()
if table == "LVAs":
lvaid = re.search(r"[a-zA-Z0-9]{3}\.[a-zA-Z0-9]{3}", input)
if lvaid is None:
raise ValueError("LVA needs to have a LVA ID to be inserted into the table")
lvid = lvaid.group()[:3] + lvaid.group()[4:]
lvname = re.sub(r"[_ -]*[a-zA-Z0-9]{3}\.[a-zA-Z0-9]{3}[_ -]*", "", input)
lvpath = lvname + "_" + lvaid.group()
sql(
"INSERT INTO LVAs(lvid,lvname,lvpath) VALUES(?,?,?)",
(lvid, lvname, lvpath),
return_result=False,
)
# cur.execute("SELECT id,lvpath FROM LVAs WHERE lvid=?", (lvid,))
db.commit()
return sql("SELECT id,lvpath FROM LVAs WHERE lvid=?", (lvid,))[0]
querry = "INSERT INTO " + table + "(name"
values = [input]
nvals = 0
for k, v in kwargs.items():
values.append(v)
querry += "," + k
nvals += 1
querry += ") VALUES(?" + nvals * ",?" + ")"
sql(querry, tuple(values), return_result=False)
sqlres = sql("SELECT id,name FROM " + table + " WHERE name=?", (input,))
db.commit()
if len(sqlres) < 1:
error(f"Entry into {table} with name {input}, I just created dose not exist")
raise HTTPException(status_code=500, detail="Error with Database")
res = sqlres[0]
# res = cur.fetchone()
if table == "Profs":
return (res[0], res[1].replace(" ", "_"))
return res
def linkLP(lid: int, pid: int):
"""declares that a Prof (id in database) offers a LVA (id in database)"""
# cur = db.cursor()
sql("INSERT INTO LPLink(LId,PId) VALUES(?,?)", (lid, pid), return_result=False)
db.commit()
def convert_to_pdf(file: bytes) -> bytes | None:
"""Converts an image(thats all thats implemented right now) into a pdf."""
# ft = filetype.guess(file)
# cid = hash(file)
# if (
# ft.mime
# == "application/vnd.openxmlformats-officedocument.wordprocessingml.document"
# ):
# with open(f"./app/convert_temp/input{cid}.docx", "wb") as f:
# f.write(file)
# docx2pdf.convert(
# f"./app/convert_temp/input{cid}.docx", f"./app/convert_temp/output{cid}.pdf"
# )
# with open(f"./app/convert_temp/output{cid}.pdf", "rb") as f:
# cont = f.read()
# os.remove(f"./app/convert_temp/input{cid}.docx")
# os.remove(f"./app/convert_temp/output{cid}.pdf")
# return cont
# elif (
# ft.mime
# == "application/vnd.openxmlformats-officedocument.presentationml.presentation"
# ):
# with open("f./app/convert_temp/input{cid}.pptx", "wb") as f:
# f.write(file)
# pptxtopdf.convert(
# f"./app/convert_temp/input{cid}.pptx", f"./app/convert_temp/output{cid}.pdf"
# )
# with open(f"./app/convert_temp/output{cid}.pdf", "rb") as f:
# cont = f.read()
# os.remove(f"./app/convert_temp/input{cid}.pptx")
# os.remove(f"./app/convert_temp/output{cid}.pdf")
# return cont
try:
doc = pymupdf.Document(stream=file)
return doc.convert_to_pdf()
except (pymupdf.mupdf.FzErrorUnsupported, pymupdf.FileDataError) as e:
error(f"Error converting Image to pdf file: {e}")
print(e)
return None
def filename_to_pdf(filename: str) -> str:
"""converts any filename.any to filename.pdf"""
farr = filename.split(".")
if len(farr) > 1:
farr[-1] = "pdf"
filename = ".".join(farr)
else:
filename = filename + ".pdf"
return filename
def make_filename_unique(filename: str, idx: int | None = None) -> str:
"""makes sure, there are no duplicate filenames in the temporary folder"""
# cur = db.cursor()
res = sql("SELECT id FROM FIP WHERE filename=?", (filename,))
# res = cur.fetchall()
if res is not None and len(res) > 0:
farr = filename.split(".")
if len(farr) > 1:
farr[-2] = (
farr[-2][:-1] + str(idx + 1) if idx is not None else farr[-2] + "_0"
)
filename = ".".join(farr)
else:
filename = (
filename[:-1] + str(idx + 1) if idx is not None else filename + "_0"
)
idx = 0 if idx is None else idx + 1
idx = idx if idx < 10 else idx - 10
filename = make_filename_unique(filename, idx)
return filename
async def save_files_to_folder(files: List[UploadFile]) -> str:
"""saves file to files in prograss folder"""
filename = files[0].filename if files[0].filename is not None else "None"
filename = filename.split(".")[0]
if filename == "":
filename = "None"
filename = make_filename_unique(filename)
os.mkdir(FILES_IN_PROGRESS / filename)
for idx, file in enumerate(files):
fn = file.filename if file.filename is not None else "None" + str(idx)
with open(FILES_IN_PROGRESS / filename / fn, "wb") as f:
f.write(await file.read())
return filename
# async def get_submittion(request: Request):
# reqJson = await request.form()
# print(reqJson)
# return {"done": "ok"}
def guess_filetype(content: bytes, filename: str) -> str:
"""Guesses the filetype of a file based on first the sontent, If that fails the extension in teh filename. If no conclusion can be reached it reutrns an empty string"""
ftyp = filetype.guess(content)
if ftyp is not None:
return ftyp.extension
farr = filename.split(".")
if len(farr) > 1:
return filename.split(".")[-1]
return ""
@app.get("/remove_old")
async def remove_old_FIP_entrys():
files = sqlT(
"SELECT id,filename FROM FIP WHERE HOUR(TIMEDIFF(NOW(),initTimeStamp)) > 24 "
)
info(f"Remove Files: {files}")
for file in files:
sql("DELETE FROM FIP WHERE id=?", (file["id"]), return_result=False)
os.remove(FILES_IN_PROGRESS / file["filename"])
# sql(
# "DELETE FROM FIP WHERE HOUR(TIMEDIFF(NOW(),initTimeStamp)) > 24",
# return_result=False,
# )
db.commit()
return FileResponse(APP_ROOT_PATH / "index.html")
def delete_from_FIP(uuid: str):
res = sqlT("SELECT filename FROM FIP WHERE id=?", (uuid,))
if len(res) < 1:
raise HTTPException(500, "I am trying to delete a file that dose not exist")
sql("DELETE FROM FIP WHERE id=?", (uuid,), return_result=False, commit=True)
os.remove(FILES_IN_PROGRESS / res[0]["filename"])