Supported MySQL Changes Listening

This commit is contained in:
2025-03-09 00:12:28 +03:00
parent 41d98aafe9
commit 77b23eaad2
9 changed files with 304 additions and 49 deletions

View File

@@ -1,23 +1,35 @@
FROM python:3.12-slim
FROM python:3.12.9-alpine3.21
# FROM python:3.12-slim
# Set environment variables
ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONUNBUFFERED=1
# Set the working directory inside the container
WORKDIR /app
RUN apt-get update && apt-get install -y \
build-essential \
curl \
software-properties-common \
git \
# Install system dependencies required for MySQL and other libraries
# RUN apt-get update && apt-get install -y \
# default-libmysqlclient-dev \
# build-essential \
# && rm -rf /var/lib/apt/lists/*
# Copy the requirements file into the container
COPY files/requirements.txt .
# Install Python dependencies
RUN pip install --no-cache-dir -r requirements.txt
# Clone the Git repository
RUN apt-get update && apt-get install -y git \
&& git clone https://gitea.abdulhade.com/abdulhade/db-middleware.git . \
&& apt-get remove -y git \
&& apt-get autoremove -y \
&& rm -rf /var/lib/apt/lists/*
# RUN git clone https://github.com/streamlit/streamlit-example.git .
COPY requirements.txt .
# Expose port 8080 for the FastAPI application
EXPOSE 8080
RUN pip3 install -r requirements.txt
# COPY . .
EXPOSE 8333
# HEALTHCHECK CMD curl --fail http://localhost:8501/_stcore/health
# ENTRYPOINT ["streamlit", "run", "main.py", "--server.port=8501", "--server.address=0.0.0.0"]
# Command to run the FastAPI application
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8080"]

View File

@@ -1,4 +1,5 @@
from fastapi import WebSocket
import asyncio, time, decimal, datetime
from fastapi import WebSocket, WebSocketDisconnect, WebSocketException
from fastapi.routing import APIRouter
from fastapi.responses import StreamingResponse
from typing_extensions import Annotated
@@ -17,9 +18,12 @@ from core.exceptions import (
CursorNotFound,
)
from dbs import mysql
from utils.binlog import changes_queue, queue
router = APIRouter()
database_changes_active_websocket: WebSocket | None = None
@router.post("/execute", dependencies=[Depends(get_current_user)])
async def execute_select(
@@ -97,7 +101,7 @@ async def server_side_events_stream_cursor(
result = await fetch_cursor(cursor_id=cursor_id, page_size=page_size)
serialized_result = result.model_dump_json()
yield f"data: {serialized_result}\n\n" # Format as Server-Sent Event (SSE)
yield f"data: {serialized_result}\n\n"
if result.cursor.is_closed:
break
@@ -119,7 +123,7 @@ async def websocket_stream_cursor(
if user is None:
await websocket.close(reason="Invalid credentials", code=1008)
return
cached_cursor = mysql.cached_cursors.get(cursor_id, None)
if cached_cursor is None:
e = CursorNotFound()
@@ -136,3 +140,69 @@ async def websocket_stream_cursor(
if result.cursor.is_closed:
break
await websocket.close(reason="Done")
@router.websocket("/databases_changes")
async def websocket_endpoint(
websocket: WebSocket,
db=Depends(get_db),
):
global database_changes_active_websocket
await websocket.accept()
api_key = websocket.headers.get("Authorization")
user = await get_user_from_api_key(db=db, api_key=api_key)
if user is None:
await websocket.close(reason="Invalid credentials", code=1008)
return
if database_changes_active_websocket:
try:
await database_changes_active_websocket.close(
code=1001, reason="New connection established"
)
except Exception as e:
print(e)
database_changes_active_websocket = websocket
await websocket.send_json({"message":"status", "status":"Accepted."})
try:
await feed_databases_changes_ws(websocket=websocket)
except WebSocketDisconnect:
print('Closed websocket.')
def serialize_list(l:list):
serialized = []
for value in l:
if isinstance(value, str | int | None | float):
serialized.append(str(value))
elif isinstance(value, decimal.Decimal):
serialized.append(float(value))
elif isinstance(value, datetime.date):
serialized.append(value.strftime("%Y-%m-%d"))
else:
serialized.append(str(value))
return serialized
async def feed_databases_changes_ws(websocket:WebSocket):
last_update = 0
while True:
try:
change = changes_queue.get_nowait()
if change.action == 'UPDATE':
change.after_values = serialize_list(change.after_values)
change.before_values = serialize_list(change.before_values)
else:
change.values = serialize_list(change.values)
await websocket.send_json({"message": "change", 'change': change.model_dump()})
except queue.Empty:
if last_update + 10 < time.time():
await websocket.send_json({"message":"status", "status":"Alive."})
last_update = time.time()
await asyncio.sleep(1)
continue

View File

@@ -1,8 +1,10 @@
import enum
class ConnectionTypes(str, enum.Enum):
mysql = "mysql"
postgresql = 'postgresql'
postgresql = "postgresql"
class UserRole(enum.Enum):
admin = "admin"
@@ -22,6 +24,13 @@ class FilterOperator(str, enum.Enum):
is_null = "IS NULL"
is_not_null = "IS NOT NULL"
class SortOrder(str, enum.Enum):
asc = "ASC"
desc = "DESC"
desc = "DESC"
class DBUpdatesActions(str, enum.Enum):
insert = "INSERT"
update = "UPDATE"
delete = "DELETE"

View File

@@ -2,7 +2,13 @@ import re
from typing import Union, List, Optional, Literal, Any
from typing_extensions import Annotated
from pydantic import BaseModel, Field, field_validator, ValidationInfo, UUID4
from core.enums import ConnectionTypes, UserRole, FilterOperator, SortOrder
from core.enums import (
ConnectionTypes,
UserRole,
FilterOperator,
SortOrder,
DBUpdatesActions,
)
from core.exceptions import QueryValidationError
@@ -189,6 +195,7 @@ class CachedCursorOut(BaseModel):
has_more: bool
close_at: int
ttl: int
class Config:
from_attributes = True
@@ -196,3 +203,25 @@ class CachedCursorOut(BaseModel):
class SelectResult(BaseModel):
cursor: CachedCursorOut
results: SelectResultData | None
class ConnectionChangeBase(BaseModel):
connection_id: int
action: DBUpdatesActions
table: str
class ConnectionChangeInsert(ConnectionChangeBase):
action: DBUpdatesActions = DBUpdatesActions.insert
values: list[Any]
class ConnectionChangeDelete(ConnectionChangeBase):
action: DBUpdatesActions = DBUpdatesActions.delete
values: list[Any]
class ConnectionChangeUpdate(ConnectionChangeBase):
action: DBUpdatesActions = DBUpdatesActions.update
before_values: list[Any]
after_values: list[Any]

View File

@@ -59,7 +59,6 @@ async def cached_cursors_cleaner():
async def pool_creator(connection: Connection, minsize=5, maxsize=10):
return await aiomysql.create_pool(
# **DB_CONFIG,
host=connection.host,
user=connection.username,
password=connection.password,

18
main.py
View File

@@ -2,24 +2,16 @@ import asyncio
from contextlib import asynccontextmanager
from fastapi import FastAPI
from app import api_router
from utils.scripts import pools_creator, pools_destroy, db_startup, cursors_closer
from dbs import mysql
from utils.scripts import startup, shutdown
@asynccontextmanager
async def lifespan(app: FastAPI):
await pools_creator()
mysql.cached_cursors_cleaner_task = asyncio.create_task(mysql.cached_cursors_cleaner())
await startup()
yield
mysql.cached_cursors_cleaner_task.cancel()
try:
await mysql.cached_cursors_cleaner_task
except asyncio.CancelledError:
print('Closed cached_cursors_cleaner_task')
await cursors_closer()
await pools_destroy()
await shutdown()
app = FastAPI(lifespan=lifespan)

View File

@@ -1,8 +0,0 @@
fastapi[all]
uvicorn
aiomysql
websockets
pydantic
python-dotenv
aiosqlite
sqlalchemy

120
utils/binlog.py Normal file
View File

@@ -0,0 +1,120 @@
from data.schemas import (
Connection,
ConnectionChangeUpdate,
ConnectionChangeDelete,
ConnectionChangeInsert,
)
from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.row_event import (
WriteRowsEvent,
UpdateRowsEvent,
DeleteRowsEvent,
)
import threading, asyncio, time
import queue
events_threads: list[threading.Thread] = []
streams: list[BinLogStreamReader] = []
stop_flag = threading.Event()
changes_queue: queue.Queue[
ConnectionChangeUpdate,
ConnectionChangeDelete,
ConnectionChangeInsert,
] = queue.Queue()
def initiate_stream(connection: Connection) -> BinLogStreamReader:
return BinLogStreamReader(
connection_settings={
"host": connection.host,
"port": connection.port,
"user": connection.username,
"passwd": connection.password,
},
server_id=100, # Unique ID for this client
blocking=False, # Wait for new events
resume_stream=True, # Resume from the last position
only_events=[
WriteRowsEvent,
UpdateRowsEvent,
DeleteRowsEvent,
], # Only capture these events
)
def events_generator(stream: BinLogStreamReader, connection_id: int):
while not stop_flag.is_set():
for binlog_event in stream:
for row in binlog_event.rows:
if isinstance(binlog_event, WriteRowsEvent):
changes_queue.put(
ConnectionChangeInsert(
connection_id=connection_id,
table=binlog_event.table,
values=list(row["values"].values()),
)
)
elif isinstance(binlog_event, UpdateRowsEvent):
changes_queue.put(
ConnectionChangeUpdate(
connection_id=connection_id,
table=binlog_event.table,
before_values=list(row["before_values"].values()),
after_values=list(row["before_values"].values()),
)
)
elif isinstance(binlog_event, DeleteRowsEvent):
changes_queue.put(
ConnectionChangeDelete(
connection_id=connection_id,
table=binlog_event.table,
values=list(row["values"].values()),
)
)
time.sleep(1)
async def process_updates():
while True:
try:
change = changes_queue.get(block=False)
print(f"Processing update: {change.model_dump()}")
except queue.Empty:
await asyncio.sleep(0.5)
def start_listeners(connections: list[Connection]):
global events_threads, streams
for connection in connections:
stream = initiate_stream(connection=connection)
streams.append(stream)
for stream, connection in zip(streams, connections):
binlog_thread = threading.Thread(
target=events_generator,
daemon=True,
kwargs={"stream": stream, "connection_id": connection.id},
)
binlog_thread.start()
events_threads.append(binlog_thread)
print("Created listeners")
def destroy():
stop_flag.set()
for thread in events_threads:
if thread.is_alive():
thread.join(timeout=5)
if thread.is_alive():
print(f"Thread {thread.name} did not stop gracefully")
events_threads.clear()
for stream in streams:
stream.close()
stop_flag.clear()
print("Closed listeners")

View File

@@ -1,14 +1,42 @@
# add_user.py
import asyncio, logging
import asyncio, logging, pymysql
import secrets
from sqlalchemy.future import select
from sqlalchemy.exc import IntegrityError
from getpass import getpass
from data.db import engine, SessionLocal
from data.models import Base, User, UserRole
async def startup():
from dbs import mysql
from app.operations import feed_databases_changes_ws
await pools_creator()
await mysql_streams_listeners_creator()
mysql.cached_cursors_cleaner_task = asyncio.create_task(mysql.cached_cursors_cleaner())
async def shutdown():
from dbs import mysql
mysql.cached_cursors_cleaner_task.cancel()
try:
await mysql.cached_cursors_cleaner_task
except asyncio.CancelledError:
print('Closed cached_cursors_cleaner_task')
await cursors_closer()
await pools_destroy()
from utils.binlog import destroy
destroy()
async def mysql_streams_listeners_creator():
from data.crud import read_all_connections
async with SessionLocal() as db:
connections = await read_all_connections(db=db)
from utils.binlog import start_listeners
start_listeners(connections=connections)
async def pools_creator():
from data.crud import read_all_connections
from dbs import mysql
@@ -17,7 +45,11 @@ async def pools_creator():
connections = await read_all_connections(db=db)
for connection in connections:
mysql.pools[connection.id] = await mysql.pool_creator(connection=connection)
try:
mysql.pools[connection.id] = await mysql.pool_creator(connection=connection)
except pymysql.err.OperationalError as e:
print(e)
logging.info(msg='Created Pools')
async def cursors_closer():