Files
db-middleware/utils/scripts.py

69 lines
2.0 KiB
Python

# add_user.py
import asyncio, logging
import secrets
from sqlalchemy.future import select
from sqlalchemy.exc import IntegrityError
from getpass import getpass
from data.db import engine, SessionLocal
from data.models import Base, User, UserRole
async def pools_creator():
from data.crud import read_all_connections
from dbs import mysql
async with SessionLocal() as db:
connections = await read_all_connections(db=db)
for connection in connections:
mysql.pools[connection.id] = await mysql.pool_creator(connection=connection)
logging.info(msg='Created Pools')
async def pools_destroy():
from dbs import mysql
for connection_id, pool in mysql.pools.items():
pool.close()
await pool.wait_closed()
logging.info(f'Closed pool: {connection_id}')
async def db_startup():
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
def create_secret():
return secrets.token_hex(32)
async def create_user():
async with SessionLocal() as session:
username = input("Enter username: ").strip()
role_input = input("Enter role (admin/user): ").strip().lower()
print('\n')
if role_input not in UserRole._value2member_map_:
print("> Invalid role. Please enter 'admin' or 'user'.")
return
role = UserRole(role_input)
# Check if username already exists
result = await session.execute(select(User).filter_by(username=username))
existing_user = result.scalars().first()
if existing_user:
print(f"> Username '{username}' is already taken.")
return
# Create new user
api_key = create_secret()
new_user = User(username=username, role=role, api_key=api_key)
session.add(new_user)
await session.commit()
await session.refresh(new_user)
print(f"> User '{username}' with role '{role.value}' created successfully.")
print(f"> API Key: {api_key}")
if __name__ == "__main__":
asyncio.run(create_user())