# add_user.py import asyncio, logging import secrets from sqlalchemy.future import select from sqlalchemy.exc import IntegrityError from getpass import getpass from data.db import engine, SessionLocal from data.models import Base, User, UserRole async def pools_creator(): from data.crud import read_all_connections from dbs import mysql async with SessionLocal() as db: connections = await read_all_connections(db=db) for connection in connections: mysql.pools[connection.id] = await mysql.pool_creator(connection=connection) logging.info(msg='Created Pools') async def cursors_closer(): from dbs import mysql for cursor_id, cursor in mysql.cached_cursors.items(): await cursor.close() logging.info(f'Closed cursor: {cursor_id}') async def pools_destroy(): from dbs import mysql for connection_id, pool in mysql.pools.items(): pool.close() await pool.wait_closed() logging.info(f'Closed pool: {connection_id}') async def db_startup(): async with engine.begin() as conn: await conn.run_sync(Base.metadata.create_all) def create_secret(): return secrets.token_hex(32) async def create_user_script_async(): async with SessionLocal() as session: username = input("Enter username: ").strip() role_input = input("Enter role (admin/user): ").strip().lower() print('\n') if role_input not in UserRole._value2member_map_: print("> Invalid role. Please enter 'admin' or 'user'.") return role = UserRole(role_input) # Check if username already exists result = await session.execute(select(User).filter_by(username=username)) existing_user = result.scalars().first() if existing_user: print(f"> Username '{username}' is already taken.") return # Create new user api_key = create_secret() new_user = User(username=username, role=role, api_key=api_key) session.add(new_user) await session.commit() await session.refresh(new_user) print(f"> User '{username}' with role '{role.value}' created successfully.") print(f"> API Key: {api_key}") def create_user_script_sync(): asyncio.run(create_user_script_async())