diff --git a/app/operations.py b/app/operations.py index d273ca9..9257075 100644 --- a/app/operations.py +++ b/app/operations.py @@ -1,4 +1,5 @@ from fastapi.routing import APIRouter +from fastapi.responses import StreamingResponse from typing_extensions import Annotated from pydantic import Field from data.schemas import SelectResult, CachedCursorOut @@ -77,3 +78,28 @@ async def fetch_cursor( cursor=cached_cursor, results={"columns": cached_cursor.query.columns, "data": result}, ) + + + + +@router.get("/sse-stream-cursor", dependencies=[Depends(get_current_user)], status_code=200) +async def server_side_events_stream_cursor( + cursor_id: str, + page_size: Annotated[int, Field(ge=1, le=1000)] = 50, +): + cached_cursor = mysql.cached_cursors.get(cursor_id, None) + if cached_cursor is None: + raise CursorNotFound + + async def stream(): + while True: + result = await fetch_cursor(cursor_id=cursor_id, page_size=page_size) + + serialized_result = ( + result.model_dump_json() + ) + yield f"data: {serialized_result}\n\n" # Format as Server-Sent Event (SSE) + if result.cursor.is_closed: + break + + return StreamingResponse(stream(), media_type="text/event-stream")