Raised the limit for one query execution

This commit is contained in:
2025-12-25 06:54:45 +03:00
parent 3ca8b328dd
commit 1dc1b8154b

View File

@@ -29,7 +29,7 @@ database_changes_active_websocket: WebSocket | None = None
async def execute_select( async def execute_select(
query_id: int, query_id: int,
connection_id: int, connection_id: int,
page_size: Annotated[int, Field(ge=1, le=100)] = 50, page_size: Annotated[int, Field(ge=1, le=10000)] = 50,
db=Depends(get_db), db=Depends(get_db),
) -> SelectResult: ) -> SelectResult:
query = await read_select_query(db=db, query_id=query_id) query = await read_select_query(db=db, query_id=query_id)
@@ -88,13 +88,14 @@ async def fetch_cursor(
@router.get( @router.get(
"/get_database_tables", dependencies=[Depends(get_current_user)], status_code=200 "/get_database_tables", dependencies=[Depends(get_current_user)], status_code=200
) )
async def get_database_tables(connection_id:int): async def get_database_tables(connection_id: int):
pool = mysql.pools.get(connection_id, None) pool = mysql.pools.get(connection_id, None)
r = await mysql.get_tables_and_datatypes(pool=pool) r = await mysql.get_tables_and_datatypes(pool=pool)
print(r) print(r)
return r return r
@router.get( @router.get(
"/sse-stream-cursor", dependencies=[Depends(get_current_user)], status_code=200 "/sse-stream-cursor", dependencies=[Depends(get_current_user)], status_code=200
) )
@@ -152,8 +153,6 @@ async def websocket_stream_cursor(
await websocket.close(reason="Done") await websocket.close(reason="Done")
@router.websocket("/databases_changes") @router.websocket("/databases_changes")
async def websocket_endpoint( async def websocket_endpoint(
websocket: WebSocket, websocket: WebSocket,
@@ -178,14 +177,15 @@ async def websocket_endpoint(
print(e) print(e)
database_changes_active_websocket = websocket database_changes_active_websocket = websocket
await websocket.send_json({"message":"status", "status":"Accepted."}) await websocket.send_json({"message": "status", "status": "Accepted."})
try: try:
await feed_databases_changes_ws(websocket=websocket) await feed_databases_changes_ws(websocket=websocket)
except WebSocketDisconnect: except WebSocketDisconnect:
print('Closed websocket.') print("Closed websocket.")
def serialize_list(l:list):
def serialize_list(l: list):
serialized = [] serialized = []
for value in l: for value in l:
if isinstance(value, str | int | None | float): if isinstance(value, str | int | None | float):
@@ -199,22 +199,22 @@ def serialize_list(l:list):
return serialized return serialized
async def feed_databases_changes_ws(websocket: WebSocket):
async def feed_databases_changes_ws(websocket:WebSocket):
last_update = 0 last_update = 0
while True: while True:
try: try:
change = changes_queue.get_nowait() change = changes_queue.get_nowait()
if change.action == 'UPDATE': if change.action == "UPDATE":
change.after_values = serialize_list(change.after_values) change.after_values = serialize_list(change.after_values)
change.before_values = serialize_list(change.before_values) change.before_values = serialize_list(change.before_values)
else: else:
change.values = serialize_list(change.values) change.values = serialize_list(change.values)
await websocket.send_json({"message": "change", 'change': change.model_dump()}) await websocket.send_json(
{"message": "change", "change": change.model_dump()}
)
except queue.Empty: except queue.Empty:
if last_update + 10 < time.time(): if last_update + 10 < time.time():
await websocket.send_json({"message":"status", "status":"Alive."}) await websocket.send_json({"message": "status", "status": "Alive."})
last_update = time.time() last_update = time.time()
await asyncio.sleep(1) await asyncio.sleep(1)
continue continue