Files
db-middleware/utils/scripts.py

118 lines
3.7 KiB
Python

# add_user.py
import asyncio, logging, pymysql, re
import secrets
from sqlalchemy.future import select
from data.db import engine, SessionLocal
from data.models import Base, User, UserRole
async def startup():
from dbs import mysql
from app.operations import feed_databases_changes_ws
await pools_creator()
await mysql_streams_listeners_creator()
mysql.cached_cursors_cleaner_task = asyncio.create_task(mysql.cached_cursors_cleaner())
async def shutdown():
from dbs import mysql
mysql.cached_cursors_cleaner_task.cancel()
try:
await mysql.cached_cursors_cleaner_task
except asyncio.CancelledError:
print('Closed cached_cursors_cleaner_task')
await cursors_closer()
await pools_destroy()
from utils.binlog import destroy
destroy()
async def mysql_streams_listeners_creator():
from data.crud import read_all_connections
async with SessionLocal() as db:
connections = await read_all_connections(db=db)
from utils.binlog import start_listeners
start_listeners(connections=connections)
async def pools_creator():
from data.crud import read_all_connections
from dbs import mysql
async with SessionLocal() as db:
connections = await read_all_connections(db=db)
for connection in connections:
try:
mysql.pools[connection.id] = await mysql.pool_creator(connection=connection)
except pymysql.err.OperationalError as e:
print(e)
logging.info(msg='Created Pools')
async def cursors_closer():
from dbs import mysql
for cursor_id, cursor in mysql.cached_cursors.items():
await cursor.close()
logging.info(f'Closed cursor: {cursor_id}')
async def pools_destroy():
from dbs import mysql
for connection_id, pool in mysql.pools.items():
pool.close()
await pool.wait_closed()
logging.info(f'Closed pool: {connection_id}')
async def db_startup():
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
def create_secret():
return secrets.token_hex(32)
async def create_user_script_async(username:str|None=None, role_input:str|None=None):
if not (username and role_input):
username = input("Enter username: ")
role_input = input("Enter role (admin/user): ")
print('\n')
username = username.strip()
role_input = role_input.strip().lower()
if not username.isalnum():
print("> Invalid username. Please use alphanumerical characters only.")
return
if not re.match("[a-zA-Z]{1}[a-zA-Z0-0-9]{,15}", username):
print("> Invalid username. Please use english characters and numbers only.")
return
if role_input not in UserRole._value2member_map_:
print("> Invalid role. Please enter 'admin' or 'user'.")
return
role = UserRole(role_input)
async with SessionLocal() as session:
# Check if username already exists
result = await session.execute(select(User).filter_by(username=username))
existing_user = result.scalars().first()
if existing_user:
print(f"> Username '{username}' is already taken.")
return
# Create new user
api_key = create_secret()
new_user = User(username=username, role=role, api_key=api_key)
session.add(new_user)
await session.commit()
await session.refresh(new_user)
print(f"> User '{username}' with role '{role.value}' created successfully.")
print(f"> API Key: {api_key}")
def create_user_script_sync(username:str|None=None, role_input:str|None=None):
asyncio.run(create_user_script_async(username=username, role_input=role_input))