import datetime from aiomysql import SSCursor, Connection, Pool, SSCursor from data.schemas import SelectQuery from core.exceptions import ClosedCursorUsage class CachedCursor: def __init__( self, id: str, cursor: SSCursor, connection: Connection, pool: Pool, connection_id: int, query: SelectQuery, ttl: int=60 ): self.id = id self.cursor = cursor self.connection = connection self.connection_id = connection_id self.pool = pool self.query = query self.row_count: int = -1 if cursor.rowcount > 10000000000000000000 else cursor.rowcount # The rowcount for a SELECT is set to -1 when using a server-side cursor. # The incorrect large number (> 10000000000000000000) is because -1 is # interpreted as an unsigned integer in MySQL's internal C API. self.fetched_rows: int = 0 self.is_closed: bool=False self.ttl:int = ttl self.close_at = self.upgrade_close_at() self.done=False @property def has_more(self): return not self.done def upgrade_close_at(self) -> int: return int(datetime.datetime.now(tz=datetime.UTC).timestamp()) + self.ttl async def close(self): await self.cursor.close() await self.pool.release(self.connection) self.is_closed=True async def fetch_many(self, size: int = 100) -> tuple[list[tuple], bool]: if self.is_closed: raise ClosedCursorUsage result = await self.cursor.fetchmany(size) if len(result) < size: # The cursor has reached the end of the set. await self.close() self.done=True else: self.upgrade_close_at() self.fetched_rows += len(result) return result