# add_user.py import asyncio, logging, pymysql, re import secrets from sqlalchemy.future import select from data.db import engine, SessionLocal from data.models import Base, User, UserRole async def startup(): from dbs import mysql from app.operations import feed_databases_changes_ws await pools_creator() await mysql_streams_listeners_creator() mysql.cached_cursors_cleaner_task = asyncio.create_task(mysql.cached_cursors_cleaner()) async def shutdown(): from dbs import mysql mysql.cached_cursors_cleaner_task.cancel() try: await mysql.cached_cursors_cleaner_task except asyncio.CancelledError: print('Closed cached_cursors_cleaner_task') await cursors_closer() await pools_destroy() from utils.binlog import destroy destroy() async def mysql_streams_listeners_creator(): from data.crud import read_all_connections async with SessionLocal() as db: connections = await read_all_connections(db=db) from utils.binlog import start_listeners start_listeners(connections=connections) async def pools_creator(): from data.crud import read_all_connections from dbs import mysql async with SessionLocal() as db: connections = await read_all_connections(db=db) for connection in connections: try: mysql.pools[connection.id] = await mysql.pool_creator(connection=connection) except pymysql.err.OperationalError as e: print(e) logging.info(msg='Created Pools') async def cursors_closer(): from dbs import mysql for cursor_id, cursor in mysql.cached_cursors.items(): await cursor.close() logging.info(f'Closed cursor: {cursor_id}') async def pools_destroy(): from dbs import mysql for connection_id, pool in mysql.pools.items(): pool.close() await pool.wait_closed() logging.info(f'Closed pool: {connection_id}') async def db_startup(): async with engine.begin() as conn: await conn.run_sync(Base.metadata.create_all) def create_secret(): return secrets.token_hex(32) async def create_user_script_async(username:str|None=None, role_input:str|None=None): username = username.strip() if username else input("Enter username: ").strip() if not username.isalnum(): print("> Invalid username. Please use alphanumerical characters only.") return if not re.match("[a-zA-Z]{1}[a-zA-Z0-0-9]{,15}", username): print("> Invalid username. Please use english characters and numbers only.") return role_input = role_input.strip().lower() if role_input else input("Enter role (admin/user): ").strip().lower() print('\n') if role_input not in UserRole._value2member_map_: print("> Invalid role. Please enter 'admin' or 'user'.") return role = UserRole(role_input) async with SessionLocal() as session: # Check if username already exists result = await session.execute(select(User).filter_by(username=username)) existing_user = result.scalars().first() if existing_user: print(f"> Username '{username}' is already taken.") return # Create new user api_key = create_secret() new_user = User(username=username, role=role, api_key=api_key) session.add(new_user) await session.commit() await session.refresh(new_user) print(f"> User '{username}' with role '{role.value}' created successfully.") print(f"> API Key: {api_key}") def create_user_script_sync(username:str|None=None, role_input:str|None=None): asyncio.run(create_user_script_async(username=username, role_input=role_input))