Files
db-middleware/data/app_types.py

64 lines
1.8 KiB
Python

import datetime
from aiomysql import SSCursor, Connection, Pool, SSCursor
from data.schemas import SelectQuery
from core.exceptions import ClosedCursorUsage
class CachedCursor:
def __init__(
self,
id: str,
cursor: SSCursor,
connection: Connection,
pool: Pool,
connection_id: int,
query: SelectQuery,
ttl: int=60
):
self.id = id
self.cursor = cursor
self.connection = connection
self.connection_id = connection_id
self.pool = pool
self.query = query
self.row_count: int = -1 if cursor.rowcount > 10000000000000000000 else cursor.rowcount
# The rowcount for a SELECT is set to -1 when using a server-side cursor.
# The incorrect large number (> 10000000000000000000) is because -1 is
# interpreted as an unsigned integer in MySQL's internal C API.
self.fetched_rows: int = 0
self.is_closed: bool=False
self.ttl:int = ttl
self.close_at = self.upgrade_close_at()
self.done=False
@property
def has_more(self):
return not self.done
def upgrade_close_at(self) -> int:
return int(datetime.datetime.now(tz=datetime.UTC).timestamp()) + self.ttl
async def close(self):
await self.cursor.close()
await self.pool.release(self.connection)
self.is_closed=True
async def fetch_many(self, size: int = 100) -> tuple[list[tuple], bool]:
if self.is_closed:
raise ClosedCursorUsage
result = await self.cursor.fetchmany(size)
if len(result) < size:
# The cursor has reached the end of the set.
await self.close()
self.done=True
else:
self.upgrade_close_at()
self.fetched_rows += len(result)
return result