121 lines
3.7 KiB
Python
121 lines
3.7 KiB
Python
from data.schemas import (
|
|
Connection,
|
|
ConnectionChangeUpdate,
|
|
ConnectionChangeDelete,
|
|
ConnectionChangeInsert,
|
|
)
|
|
from pymysqlreplication import BinLogStreamReader
|
|
from pymysqlreplication.row_event import (
|
|
WriteRowsEvent,
|
|
UpdateRowsEvent,
|
|
DeleteRowsEvent,
|
|
)
|
|
|
|
import threading, asyncio, time
|
|
|
|
import queue
|
|
|
|
events_threads: list[threading.Thread] = []
|
|
streams: list[BinLogStreamReader] = []
|
|
stop_flag = threading.Event()
|
|
|
|
changes_queue: queue.Queue[
|
|
ConnectionChangeUpdate,
|
|
ConnectionChangeDelete,
|
|
ConnectionChangeInsert,
|
|
] = queue.Queue()
|
|
|
|
|
|
def initiate_stream(connection: Connection) -> BinLogStreamReader:
|
|
return BinLogStreamReader(
|
|
connection_settings={
|
|
"host": connection.host,
|
|
"port": connection.port,
|
|
"user": connection.username,
|
|
"passwd": connection.password,
|
|
},
|
|
server_id=100, # Unique ID for this client
|
|
blocking=False, # Wait for new events
|
|
resume_stream=True, # Resume from the last position
|
|
only_events=[
|
|
WriteRowsEvent,
|
|
UpdateRowsEvent,
|
|
DeleteRowsEvent,
|
|
], # Only capture these events
|
|
)
|
|
|
|
|
|
def events_generator(stream: BinLogStreamReader, connection_id: int):
|
|
while not stop_flag.is_set():
|
|
for binlog_event in stream:
|
|
for row in binlog_event.rows:
|
|
if isinstance(binlog_event, WriteRowsEvent):
|
|
changes_queue.put(
|
|
ConnectionChangeInsert(
|
|
connection_id=connection_id,
|
|
table=binlog_event.table,
|
|
values=list(row["values"].values()),
|
|
)
|
|
)
|
|
elif isinstance(binlog_event, UpdateRowsEvent):
|
|
changes_queue.put(
|
|
ConnectionChangeUpdate(
|
|
connection_id=connection_id,
|
|
table=binlog_event.table,
|
|
before_values=list(row["before_values"].values()),
|
|
after_values=list(row["before_values"].values()),
|
|
)
|
|
)
|
|
|
|
elif isinstance(binlog_event, DeleteRowsEvent):
|
|
changes_queue.put(
|
|
ConnectionChangeDelete(
|
|
connection_id=connection_id,
|
|
table=binlog_event.table,
|
|
values=list(row["values"].values()),
|
|
)
|
|
)
|
|
time.sleep(1)
|
|
|
|
|
|
async def process_updates():
|
|
while True:
|
|
try:
|
|
change = changes_queue.get(block=False)
|
|
print(f"Processing update: {change.model_dump()}")
|
|
except queue.Empty:
|
|
await asyncio.sleep(0.5)
|
|
|
|
|
|
def start_listeners(connections: list[Connection]):
|
|
global events_threads, streams
|
|
for connection in connections:
|
|
stream = initiate_stream(connection=connection)
|
|
streams.append(stream)
|
|
|
|
for stream, connection in zip(streams, connections):
|
|
binlog_thread = threading.Thread(
|
|
target=events_generator,
|
|
daemon=True,
|
|
kwargs={"stream": stream, "connection_id": connection.id},
|
|
)
|
|
binlog_thread.start()
|
|
events_threads.append(binlog_thread)
|
|
print("Created listeners")
|
|
|
|
|
|
def destroy():
|
|
stop_flag.set()
|
|
for thread in events_threads:
|
|
if thread.is_alive():
|
|
thread.join(timeout=5)
|
|
if thread.is_alive():
|
|
print(f"Thread {thread.name} did not stop gracefully")
|
|
|
|
events_threads.clear()
|
|
for stream in streams:
|
|
stream.close()
|
|
|
|
stop_flag.clear()
|
|
print("Closed listeners")
|