Files
db-middleware/utils/binlog.py

121 lines
3.7 KiB
Python

from data.schemas import (
Connection,
ConnectionChangeUpdate,
ConnectionChangeDelete,
ConnectionChangeInsert,
)
from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.row_event import (
WriteRowsEvent,
UpdateRowsEvent,
DeleteRowsEvent,
)
import threading, asyncio, time
import queue
events_threads: list[threading.Thread] = []
streams: list[BinLogStreamReader] = []
stop_flag = threading.Event()
changes_queue: queue.Queue[
ConnectionChangeUpdate,
ConnectionChangeDelete,
ConnectionChangeInsert,
] = queue.Queue()
def initiate_stream(connection: Connection) -> BinLogStreamReader:
return BinLogStreamReader(
connection_settings={
"host": connection.host,
"port": connection.port,
"user": connection.username,
"passwd": connection.password,
},
server_id=100, # Unique ID for this client
blocking=False, # Wait for new events
resume_stream=True, # Resume from the last position
only_events=[
WriteRowsEvent,
UpdateRowsEvent,
DeleteRowsEvent,
], # Only capture these events
)
def events_generator(stream: BinLogStreamReader, connection_id: int):
while not stop_flag.is_set():
for binlog_event in stream:
for row in binlog_event.rows:
if isinstance(binlog_event, WriteRowsEvent):
changes_queue.put(
ConnectionChangeInsert(
connection_id=connection_id,
table=binlog_event.table,
values=list(row["values"].values()),
)
)
elif isinstance(binlog_event, UpdateRowsEvent):
changes_queue.put(
ConnectionChangeUpdate(
connection_id=connection_id,
table=binlog_event.table,
before_values=list(row["before_values"].values()),
after_values=list(row["before_values"].values()),
)
)
elif isinstance(binlog_event, DeleteRowsEvent):
changes_queue.put(
ConnectionChangeDelete(
connection_id=connection_id,
table=binlog_event.table,
values=list(row["values"].values()),
)
)
time.sleep(1)
async def process_updates():
while True:
try:
change = changes_queue.get(block=False)
print(f"Processing update: {change.model_dump()}")
except queue.Empty:
await asyncio.sleep(0.5)
def start_listeners(connections: list[Connection]):
global events_threads, streams
for connection in connections:
stream = initiate_stream(connection=connection)
streams.append(stream)
for stream, connection in zip(streams, connections):
binlog_thread = threading.Thread(
target=events_generator,
daemon=True,
kwargs={"stream": stream, "connection_id": connection.id},
)
binlog_thread.start()
events_threads.append(binlog_thread)
print("Created listeners")
def destroy():
stop_flag.set()
for thread in events_threads:
if thread.is_alive():
thread.join(timeout=5)
if thread.is_alive():
print(f"Thread {thread.name} did not stop gracefully")
events_threads.clear()
for stream in streams:
stream.close()
stop_flag.clear()
print("Closed listeners")