57 lines
1.9 KiB
Python
57 lines
1.9 KiB
Python
from typing import Optional
|
|
|
|
from sqlalchemy import select, func
|
|
|
|
from models import Status, Deal
|
|
from repositories.base import BaseRepository
|
|
from schemas.status import UpdateStatusSchema, CreateStatusSchema
|
|
|
|
|
|
class StatusRepository(BaseRepository):
|
|
async def get_all(self, board_id: int) -> list[Status]:
|
|
stmt = (
|
|
select(Status)
|
|
.where(Status.is_deleted.is_(False), Status.board_id == board_id)
|
|
.order_by(Status.lexorank)
|
|
)
|
|
result = await self.session.execute(stmt)
|
|
return list(result.scalars().all())
|
|
|
|
async def get_by_id(self, status_id: int) -> Optional[Status]:
|
|
stmt = select(Status).where(
|
|
Status.id == status_id, Status.is_deleted.is_(False)
|
|
)
|
|
result = await self.session.execute(stmt)
|
|
return result.scalar_one_or_none()
|
|
|
|
async def get_deals_count(self, status_id: int) -> int:
|
|
stmt = select(func.count(Deal.id)).where(Deal.status_id == status_id)
|
|
result = await self.session.execute(stmt)
|
|
return result.scalar()
|
|
|
|
async def create(self, data: CreateStatusSchema) -> Status:
|
|
status = Status(**data.model_dump())
|
|
self.session.add(status)
|
|
await self.session.commit()
|
|
await self.session.refresh(status)
|
|
return status
|
|
|
|
async def update(self, status: Status, data: UpdateStatusSchema) -> Status:
|
|
status.lexorank = data.lexorank if data.lexorank else status.lexorank
|
|
status.name = data.name if data.name else status.name
|
|
|
|
self.session.add(status)
|
|
await self.session.commit()
|
|
await self.session.refresh(status)
|
|
return status
|
|
|
|
async def delete(self, status: Status, is_soft: bool):
|
|
if not is_soft:
|
|
await self.session.delete(status)
|
|
await self.session.commit()
|
|
return
|
|
|
|
status.is_deleted = True
|
|
self.session.add(status)
|
|
await self.session.commit()
|