Files
db-middleware/utils/scripts.py

107 lines
3.2 KiB
Python

# add_user.py
import asyncio, logging, pymysql
import secrets
from sqlalchemy.future import select
from data.db import engine, SessionLocal
from data.models import Base, User, UserRole
async def startup():
from dbs import mysql
from app.operations import feed_databases_changes_ws
await pools_creator()
await mysql_streams_listeners_creator()
mysql.cached_cursors_cleaner_task = asyncio.create_task(mysql.cached_cursors_cleaner())
async def shutdown():
from dbs import mysql
mysql.cached_cursors_cleaner_task.cancel()
try:
await mysql.cached_cursors_cleaner_task
except asyncio.CancelledError:
print('Closed cached_cursors_cleaner_task')
await cursors_closer()
await pools_destroy()
from utils.binlog import destroy
destroy()
async def mysql_streams_listeners_creator():
from data.crud import read_all_connections
async with SessionLocal() as db:
connections = await read_all_connections(db=db)
from utils.binlog import start_listeners
start_listeners(connections=connections)
async def pools_creator():
from data.crud import read_all_connections
from dbs import mysql
async with SessionLocal() as db:
connections = await read_all_connections(db=db)
for connection in connections:
try:
mysql.pools[connection.id] = await mysql.pool_creator(connection=connection)
except pymysql.err.OperationalError as e:
print(e)
logging.info(msg='Created Pools')
async def cursors_closer():
from dbs import mysql
for cursor_id, cursor in mysql.cached_cursors.items():
await cursor.close()
logging.info(f'Closed cursor: {cursor_id}')
async def pools_destroy():
from dbs import mysql
for connection_id, pool in mysql.pools.items():
pool.close()
await pool.wait_closed()
logging.info(f'Closed pool: {connection_id}')
async def db_startup():
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
def create_secret():
return secrets.token_hex(32)
async def create_user_script_async():
async with SessionLocal() as session:
username = input("Enter username: ").strip()
role_input = input("Enter role (admin/user): ").strip().lower()
print('\n')
if role_input not in UserRole._value2member_map_:
print("> Invalid role. Please enter 'admin' or 'user'.")
return
role = UserRole(role_input)
# Check if username already exists
result = await session.execute(select(User).filter_by(username=username))
existing_user = result.scalars().first()
if existing_user:
print(f"> Username '{username}' is already taken.")
return
# Create new user
api_key = create_secret()
new_user = User(username=username, role=role, api_key=api_key)
session.add(new_user)
await session.commit()
await session.refresh(new_user)
print(f"> User '{username}' with role '{role.value}' created successfully.")
print(f"> API Key: {api_key}")
def create_user_script_sync():
asyncio.run(create_user_script_async())