diff --git a/Dockerfile b/Dockerfile index 14e5f57..05064b1 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,23 +1,35 @@ -FROM python:3.12-slim +FROM python:3.12.9-alpine3.21 +# FROM python:3.12-slim + +# Set environment variables +ENV PYTHONDONTWRITEBYTECODE=1 +ENV PYTHONUNBUFFERED=1 + +# Set the working directory inside the container WORKDIR /app -RUN apt-get update && apt-get install -y \ - build-essential \ - curl \ - software-properties-common \ - git \ +# Install system dependencies required for MySQL and other libraries +# RUN apt-get update && apt-get install -y \ +# default-libmysqlclient-dev \ +# build-essential \ +# && rm -rf /var/lib/apt/lists/* + +# Copy the requirements file into the container +COPY files/requirements.txt . + +# Install Python dependencies +RUN pip install --no-cache-dir -r requirements.txt + +# Clone the Git repository +RUN apt-get update && apt-get install -y git \ + && git clone https://gitea.abdulhade.com/abdulhade/db-middleware.git . \ + && apt-get remove -y git \ + && apt-get autoremove -y \ && rm -rf /var/lib/apt/lists/* -# RUN git clone https://github.com/streamlit/streamlit-example.git . -COPY requirements.txt . +# Expose port 8080 for the FastAPI application +EXPOSE 8080 -RUN pip3 install -r requirements.txt - -# COPY . . - -EXPOSE 8333 - -# HEALTHCHECK CMD curl --fail http://localhost:8501/_stcore/health - -# ENTRYPOINT ["streamlit", "run", "main.py", "--server.port=8501", "--server.address=0.0.0.0"] +# Command to run the FastAPI application +CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8080"] \ No newline at end of file diff --git a/app/operations.py b/app/operations.py index b5b18cf..e4192e8 100644 --- a/app/operations.py +++ b/app/operations.py @@ -1,4 +1,5 @@ -from fastapi import WebSocket +import asyncio, time, decimal, datetime +from fastapi import WebSocket, WebSocketDisconnect, WebSocketException from fastapi.routing import APIRouter from fastapi.responses import StreamingResponse from typing_extensions import Annotated @@ -17,9 +18,12 @@ from core.exceptions import ( CursorNotFound, ) from dbs import mysql +from utils.binlog import changes_queue, queue router = APIRouter() +database_changes_active_websocket: WebSocket | None = None + @router.post("/execute", dependencies=[Depends(get_current_user)]) async def execute_select( @@ -97,7 +101,7 @@ async def server_side_events_stream_cursor( result = await fetch_cursor(cursor_id=cursor_id, page_size=page_size) serialized_result = result.model_dump_json() - yield f"data: {serialized_result}\n\n" # Format as Server-Sent Event (SSE) + yield f"data: {serialized_result}\n\n" if result.cursor.is_closed: break @@ -119,7 +123,7 @@ async def websocket_stream_cursor( if user is None: await websocket.close(reason="Invalid credentials", code=1008) return - + cached_cursor = mysql.cached_cursors.get(cursor_id, None) if cached_cursor is None: e = CursorNotFound() @@ -136,3 +140,69 @@ async def websocket_stream_cursor( if result.cursor.is_closed: break await websocket.close(reason="Done") + + +@router.websocket("/databases_changes") +async def websocket_endpoint( + websocket: WebSocket, + db=Depends(get_db), +): + global database_changes_active_websocket + + await websocket.accept() + + api_key = websocket.headers.get("Authorization") + user = await get_user_from_api_key(db=db, api_key=api_key) + if user is None: + await websocket.close(reason="Invalid credentials", code=1008) + return + + if database_changes_active_websocket: + try: + await database_changes_active_websocket.close( + code=1001, reason="New connection established" + ) + except Exception as e: + print(e) + + database_changes_active_websocket = websocket + await websocket.send_json({"message":"status", "status":"Accepted."}) + + try: + await feed_databases_changes_ws(websocket=websocket) + except WebSocketDisconnect: + print('Closed websocket.') + +def serialize_list(l:list): + serialized = [] + for value in l: + if isinstance(value, str | int | None | float): + serialized.append(str(value)) + elif isinstance(value, decimal.Decimal): + serialized.append(float(value)) + elif isinstance(value, datetime.date): + serialized.append(value.strftime("%Y-%m-%d")) + else: + serialized.append(str(value)) + return serialized + + + +async def feed_databases_changes_ws(websocket:WebSocket): + + last_update = 0 + while True: + try: + change = changes_queue.get_nowait() + if change.action == 'UPDATE': + change.after_values = serialize_list(change.after_values) + change.before_values = serialize_list(change.before_values) + else: + change.values = serialize_list(change.values) + await websocket.send_json({"message": "change", 'change': change.model_dump()}) + except queue.Empty: + if last_update + 10 < time.time(): + await websocket.send_json({"message":"status", "status":"Alive."}) + last_update = time.time() + await asyncio.sleep(1) + continue diff --git a/core/enums.py b/core/enums.py index 5b90661..a87efd7 100644 --- a/core/enums.py +++ b/core/enums.py @@ -1,8 +1,10 @@ import enum + class ConnectionTypes(str, enum.Enum): mysql = "mysql" - postgresql = 'postgresql' + postgresql = "postgresql" + class UserRole(enum.Enum): admin = "admin" @@ -22,6 +24,13 @@ class FilterOperator(str, enum.Enum): is_null = "IS NULL" is_not_null = "IS NOT NULL" + class SortOrder(str, enum.Enum): asc = "ASC" - desc = "DESC" \ No newline at end of file + desc = "DESC" + + +class DBUpdatesActions(str, enum.Enum): + insert = "INSERT" + update = "UPDATE" + delete = "DELETE" diff --git a/data/schemas.py b/data/schemas.py index b2e7be5..c5452bc 100644 --- a/data/schemas.py +++ b/data/schemas.py @@ -2,7 +2,13 @@ import re from typing import Union, List, Optional, Literal, Any from typing_extensions import Annotated from pydantic import BaseModel, Field, field_validator, ValidationInfo, UUID4 -from core.enums import ConnectionTypes, UserRole, FilterOperator, SortOrder +from core.enums import ( + ConnectionTypes, + UserRole, + FilterOperator, + SortOrder, + DBUpdatesActions, +) from core.exceptions import QueryValidationError @@ -189,6 +195,7 @@ class CachedCursorOut(BaseModel): has_more: bool close_at: int ttl: int + class Config: from_attributes = True @@ -196,3 +203,25 @@ class CachedCursorOut(BaseModel): class SelectResult(BaseModel): cursor: CachedCursorOut results: SelectResultData | None + + +class ConnectionChangeBase(BaseModel): + connection_id: int + action: DBUpdatesActions + table: str + + +class ConnectionChangeInsert(ConnectionChangeBase): + action: DBUpdatesActions = DBUpdatesActions.insert + values: list[Any] + + +class ConnectionChangeDelete(ConnectionChangeBase): + action: DBUpdatesActions = DBUpdatesActions.delete + values: list[Any] + + +class ConnectionChangeUpdate(ConnectionChangeBase): + action: DBUpdatesActions = DBUpdatesActions.update + before_values: list[Any] + after_values: list[Any] diff --git a/dbs/mysql.py b/dbs/mysql.py index d6d1d68..2d8ea7d 100644 --- a/dbs/mysql.py +++ b/dbs/mysql.py @@ -59,7 +59,6 @@ async def cached_cursors_cleaner(): async def pool_creator(connection: Connection, minsize=5, maxsize=10): return await aiomysql.create_pool( - # **DB_CONFIG, host=connection.host, user=connection.username, password=connection.password, diff --git a/main.py b/main.py index dc06e86..eabb5a1 100644 --- a/main.py +++ b/main.py @@ -2,24 +2,16 @@ import asyncio from contextlib import asynccontextmanager from fastapi import FastAPI from app import api_router -from utils.scripts import pools_creator, pools_destroy, db_startup, cursors_closer -from dbs import mysql +from utils.scripts import startup, shutdown + @asynccontextmanager async def lifespan(app: FastAPI): - await pools_creator() - mysql.cached_cursors_cleaner_task = asyncio.create_task(mysql.cached_cursors_cleaner()) - + await startup() yield - - mysql.cached_cursors_cleaner_task.cancel() - try: - await mysql.cached_cursors_cleaner_task - except asyncio.CancelledError: - print('Closed cached_cursors_cleaner_task') - await cursors_closer() - await pools_destroy() + await shutdown() + app = FastAPI(lifespan=lifespan) diff --git a/requirements.txt b/requirements.txt deleted file mode 100644 index 393f7e0..0000000 --- a/requirements.txt +++ /dev/null @@ -1,8 +0,0 @@ -fastapi[all] -uvicorn -aiomysql -websockets -pydantic -python-dotenv -aiosqlite -sqlalchemy \ No newline at end of file diff --git a/utils/binlog.py b/utils/binlog.py new file mode 100644 index 0000000..c6b7448 --- /dev/null +++ b/utils/binlog.py @@ -0,0 +1,120 @@ +from data.schemas import ( + Connection, + ConnectionChangeUpdate, + ConnectionChangeDelete, + ConnectionChangeInsert, +) +from pymysqlreplication import BinLogStreamReader +from pymysqlreplication.row_event import ( + WriteRowsEvent, + UpdateRowsEvent, + DeleteRowsEvent, +) + +import threading, asyncio, time + +import queue + +events_threads: list[threading.Thread] = [] +streams: list[BinLogStreamReader] = [] +stop_flag = threading.Event() + +changes_queue: queue.Queue[ + ConnectionChangeUpdate, + ConnectionChangeDelete, + ConnectionChangeInsert, +] = queue.Queue() + + +def initiate_stream(connection: Connection) -> BinLogStreamReader: + return BinLogStreamReader( + connection_settings={ + "host": connection.host, + "port": connection.port, + "user": connection.username, + "passwd": connection.password, + }, + server_id=100, # Unique ID for this client + blocking=False, # Wait for new events + resume_stream=True, # Resume from the last position + only_events=[ + WriteRowsEvent, + UpdateRowsEvent, + DeleteRowsEvent, + ], # Only capture these events + ) + + +def events_generator(stream: BinLogStreamReader, connection_id: int): + while not stop_flag.is_set(): + for binlog_event in stream: + for row in binlog_event.rows: + if isinstance(binlog_event, WriteRowsEvent): + changes_queue.put( + ConnectionChangeInsert( + connection_id=connection_id, + table=binlog_event.table, + values=list(row["values"].values()), + ) + ) + elif isinstance(binlog_event, UpdateRowsEvent): + changes_queue.put( + ConnectionChangeUpdate( + connection_id=connection_id, + table=binlog_event.table, + before_values=list(row["before_values"].values()), + after_values=list(row["before_values"].values()), + ) + ) + + elif isinstance(binlog_event, DeleteRowsEvent): + changes_queue.put( + ConnectionChangeDelete( + connection_id=connection_id, + table=binlog_event.table, + values=list(row["values"].values()), + ) + ) + time.sleep(1) + + +async def process_updates(): + while True: + try: + change = changes_queue.get(block=False) + print(f"Processing update: {change.model_dump()}") + except queue.Empty: + await asyncio.sleep(0.5) + + +def start_listeners(connections: list[Connection]): + global events_threads, streams + for connection in connections: + stream = initiate_stream(connection=connection) + streams.append(stream) + + for stream, connection in zip(streams, connections): + binlog_thread = threading.Thread( + target=events_generator, + daemon=True, + kwargs={"stream": stream, "connection_id": connection.id}, + ) + binlog_thread.start() + events_threads.append(binlog_thread) + print("Created listeners") + + +def destroy(): + stop_flag.set() + for thread in events_threads: + if thread.is_alive(): + thread.join(timeout=5) + if thread.is_alive(): + print(f"Thread {thread.name} did not stop gracefully") + + events_threads.clear() + for stream in streams: + stream.close() + + stop_flag.clear() + print("Closed listeners") diff --git a/utils/scripts.py b/utils/scripts.py index 29bba52..0faffc3 100644 --- a/utils/scripts.py +++ b/utils/scripts.py @@ -1,14 +1,42 @@ # add_user.py -import asyncio, logging +import asyncio, logging, pymysql import secrets from sqlalchemy.future import select -from sqlalchemy.exc import IntegrityError -from getpass import getpass from data.db import engine, SessionLocal from data.models import Base, User, UserRole +async def startup(): + from dbs import mysql + from app.operations import feed_databases_changes_ws + await pools_creator() + await mysql_streams_listeners_creator() + mysql.cached_cursors_cleaner_task = asyncio.create_task(mysql.cached_cursors_cleaner()) + + +async def shutdown(): + from dbs import mysql + mysql.cached_cursors_cleaner_task.cancel() + try: + await mysql.cached_cursors_cleaner_task + except asyncio.CancelledError: + print('Closed cached_cursors_cleaner_task') + await cursors_closer() + await pools_destroy() + + from utils.binlog import destroy + destroy() + + +async def mysql_streams_listeners_creator(): + from data.crud import read_all_connections + async with SessionLocal() as db: + connections = await read_all_connections(db=db) + + from utils.binlog import start_listeners + start_listeners(connections=connections) + async def pools_creator(): from data.crud import read_all_connections from dbs import mysql @@ -17,7 +45,11 @@ async def pools_creator(): connections = await read_all_connections(db=db) for connection in connections: - mysql.pools[connection.id] = await mysql.pool_creator(connection=connection) + try: + mysql.pools[connection.id] = await mysql.pool_creator(connection=connection) + except pymysql.err.OperationalError as e: + print(e) + logging.info(msg='Created Pools') async def cursors_closer():