Files
db-middleware/data/crud.py

81 lines
2.9 KiB
Python

# crud.py
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.future import select
from data.models import Connection, User, Query
from data.schemas import ConnectionCreate, ConnectionUpdate, UserCreate, SelectQueryIn
from core.exceptions import ObjectNotFoundInDB
async def read_user(db:AsyncSession, user_id:int):
result = await db.execute(select(User).filter(User.id == user_id))
return result.scalars().first()
async def read_all_users(db:AsyncSession):
result = await db.execute(select(User))
return result.scalars().all()
async def create_user(db: AsyncSession, user: UserCreate):
from utils.scripts import create_secret
db_user = User(**user.model_dump(), api_key=create_secret())
db.add(db_user)
await db.commit()
await db.refresh(db_user)
return db_user
async def delete_user(db: AsyncSession, user_id: int):
db_user = await read_user(db, user_id)
if db_user:
await db.delete(db_user)
await db.commit()
return db_user
else:
raise ObjectNotFoundInDB
async def read_connection(db: AsyncSession, connection_id: int):
result = await db.execute(select(Connection).filter(Connection.id == connection_id))
return result.scalars().first()
async def read_all_connections(db: AsyncSession):
result = await db.execute(select(Connection))
return result.scalars().all()
async def read_all_listenable_connection(db:AsyncSession):
result = await db.execute(select(Connection).filter(Connection.listen_updates == True))
return result.scalars().all()
async def create_connection(db: AsyncSession, connection: ConnectionCreate, user_id: int):
db_connection = Connection(**connection.model_dump(), owner_id=user_id)
db.add(db_connection)
await db.commit()
await db.refresh(db_connection)
return db_connection
async def update_connection(db: AsyncSession, connection_id: int, connection: ConnectionUpdate):
db_connection = await read_connection(db, connection_id)
if db_connection:
for key, value in connection.model_dump().items():
setattr(db_connection, key, value)
await db.commit()
await db.refresh(db_connection)
return db_connection
async def delete_connection(db: AsyncSession, connection_id: int):
db_connection = await read_connection(db, connection_id)
if db_connection:
await db.delete(db_connection)
await db.commit()
return db_connection
async def create_select_query(db:AsyncSession, query:SelectQueryIn) -> Query:
db_query = Query(**query.model_dump())
db.add(db_query)
await db.commit()
await db.refresh(db_query)
return db_query
async def read_all_select_queries(db:AsyncSession) -> list[Query]:
result = await db.execute(select(Query))
return result.scalars().all()
async def read_select_query(db:AsyncSession, query_id:int) -> Query:
result = await db.execute(select(Query).filter(Query.id == query_id))
return result.scalars().first()