Files
db-middleware/app/cursors.py

94 lines
2.5 KiB
Python

from fastapi.routing import APIRouter
from data.schemas import (
CachedCursorOut,
)
from fastapi import Depends, status
from data.crud import (
read_connection,
read_select_query,
)
from core.dependencies import get_db, get_current_user, get_admin_user
from core.exceptions import (
QueryNotFound,
ConnectionNotFound,
CursorNotFound,
)
from dbs import mysql
router = APIRouter()
@router.post("/", dependencies=[Depends(get_current_user)])
async def create_cursor_endpoint(
query_id: int,
connection_id: int,
db=Depends(get_db),
) -> CachedCursorOut:
query = await read_select_query(db=db, query_id=query_id)
if query is None:
raise QueryNotFound
connection = await read_connection(db=db, connection_id=connection_id)
if connection is None:
raise ConnectionNotFound
cached_cursor = await mysql.create_cursor(query=query, connection_id=connection_id)
mysql.cached_cursors[cached_cursor.id] = cached_cursor
print(mysql.cached_cursors)
return cached_cursor
@router.get("/", dependencies=[Depends(get_current_user)])
async def get_all_cursors() -> list[CachedCursorOut]:
return mysql.cached_cursors.values()
@router.get("/{cursor_id}", dependencies=[Depends(get_current_user)])
async def get_cursors(cursor_id: str) -> CachedCursorOut:
try:
return mysql.cached_cursors[cursor_id]
except KeyError:
raise CursorNotFound
@router.delete(
"/",
dependencies=[Depends(get_admin_user)],
status_code=status.HTTP_204_NO_CONTENT,
)
async def close_all_cursor() -> None:
for cached_cursor in mysql.cached_cursors.values():
await cached_cursor.close()
mysql.cached_cursors.clear()
@router.delete(
"/{cursor_id}",
dependencies=[Depends(get_current_user)],
status_code=status.HTTP_204_NO_CONTENT,
)
async def close_cursor(cursor_id: str) -> None:
cached_cursor = mysql.cached_cursors.get(cursor_id, None)
if cached_cursor is None:
raise CursorNotFound
await cached_cursor.close()
del mysql.cached_cursors[cursor_id]
@router.post(
"push-ttl/{cursor_id}",
dependencies=[Depends(get_current_user)],
status_code=status.HTTP_200_OK,
)
async def cursor_push_ttl(cursor_id: str, new_ttl: int|None=None) -> CachedCursorOut:
cached_cursor = mysql.cached_cursors.get(cursor_id, None)
if cached_cursor is None:
raise CursorNotFound
cached_cursor.ttl = new_ttl if new_ttl else cached_cursor.ttl
cached_cursor.close_at = cached_cursor.upgrade_close_at()
return cached_cursor