from fastapi.routing import APIRouter from data.schemas import ( CachedCursorOut, ) from fastapi import Depends, status from data.crud import ( read_connection, read_select_query, ) from core.dependencies import get_db, get_current_user, get_admin_user from core.exceptions import ( QueryNotFound, ConnectionNotFound, CursorNotFound, ) from dbs import mysql router = APIRouter() @router.post("/", dependencies=[Depends(get_current_user)]) async def create_cursor_endpoint( query_id: int, connection_id: int, db=Depends(get_db), ) -> CachedCursorOut: query = await read_select_query(db=db, query_id=query_id) if query is None: raise QueryNotFound connection = await read_connection(db=db, connection_id=connection_id) if connection is None: raise ConnectionNotFound cached_cursor = await mysql.create_cursor(query=query, connection_id=connection_id) mysql.cached_cursors[cached_cursor.id] = cached_cursor print(mysql.cached_cursors) return cached_cursor @router.get("/", dependencies=[Depends(get_current_user)]) async def get_all_cursors() -> list[CachedCursorOut]: return mysql.cached_cursors.values() @router.get("/{cursor_id}", dependencies=[Depends(get_current_user)]) async def get_cursors(cursor_id: str) -> CachedCursorOut: try: return mysql.cached_cursors[cursor_id] except KeyError: raise CursorNotFound @router.delete( "/", dependencies=[Depends(get_admin_user)], status_code=status.HTTP_204_NO_CONTENT, ) async def close_all_cursor() -> None: for cached_cursor in mysql.cached_cursors.values(): try: await cached_cursor.close() except Exception as e: print(f"Error closing a cursor {e}") mysql.cached_cursors.clear() @router.delete( "/{cursor_id}", dependencies=[Depends(get_current_user)], status_code=status.HTTP_204_NO_CONTENT, ) async def close_cursor(cursor_id: str) -> None: cached_cursor = mysql.cached_cursors.get(cursor_id, None) if cached_cursor is None: raise CursorNotFound try: await cached_cursor.close() except Exception as e: print(f"Error closing the cursor. e={e}") del mysql.cached_cursors[cursor_id] @router.post( "push-ttl/{cursor_id}", dependencies=[Depends(get_current_user)], status_code=status.HTTP_200_OK, ) async def cursor_push_ttl( cursor_id: str, new_ttl: int | None = None ) -> CachedCursorOut: cached_cursor = mysql.cached_cursors.get(cursor_id, None) if cached_cursor is None: raise CursorNotFound cached_cursor.ttl = new_ttl if new_ttl else cached_cursor.ttl cached_cursor.close_at = cached_cursor.upgrade_close_at() return cached_cursor