from data.schemas import ( Connection, ConnectionChangeUpdate, ConnectionChangeDelete, ConnectionChangeInsert, ) from pymysqlreplication import BinLogStreamReader from pymysqlreplication.row_event import ( WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent, ) import threading, asyncio, time import queue events_threads: list[threading.Thread] = [] streams: list[BinLogStreamReader] = [] stop_flag = threading.Event() changes_queue: queue.Queue[ ConnectionChangeUpdate, ConnectionChangeDelete, ConnectionChangeInsert, ] = queue.Queue() def initiate_stream(connection: Connection) -> BinLogStreamReader: return BinLogStreamReader( connection_settings={ "host": connection.host, "port": connection.port, "user": connection.username, "passwd": connection.password, }, server_id=100, # Unique ID for this client blocking=False, # Wait for new events resume_stream=True, # Resume from the last position only_events=[ WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent, ], # Only capture these events ) def events_generator(stream: BinLogStreamReader, connection_id: int): while not stop_flag.is_set(): for binlog_event in stream: for row in binlog_event.rows: if isinstance(binlog_event, WriteRowsEvent): changes_queue.put( ConnectionChangeInsert( connection_id=connection_id, table=binlog_event.table, values=list(row["values"].values()), ) ) elif isinstance(binlog_event, UpdateRowsEvent): changes_queue.put( ConnectionChangeUpdate( connection_id=connection_id, table=binlog_event.table, before_values=list(row["before_values"].values()), after_values=list(row["before_values"].values()), ) ) elif isinstance(binlog_event, DeleteRowsEvent): changes_queue.put( ConnectionChangeDelete( connection_id=connection_id, table=binlog_event.table, values=list(row["values"].values()), ) ) time.sleep(1) async def process_updates(): while True: try: change = changes_queue.get(block=False) print(f"Processing update: {change.model_dump()}") except queue.Empty: await asyncio.sleep(0.5) def start_listeners(connections: list[Connection]): global events_threads, streams for connection in connections: stream = initiate_stream(connection=connection) streams.append(stream) for stream, connection in zip(streams, connections): binlog_thread = threading.Thread( target=events_generator, daemon=True, kwargs={"stream": stream, "connection_id": connection.id}, ) binlog_thread.start() events_threads.append(binlog_thread) print("Created listeners") def destroy(): stop_flag.set() for thread in events_threads: if thread.is_alive(): thread.join(timeout=5) if thread.is_alive(): print(f"Thread {thread.name} did not stop gracefully") events_threads.clear() for stream in streams: stream.close() stop_flag.clear() print("Closed listeners")