41 lines
1.6 KiB
Python
41 lines
1.6 KiB
Python
from sqlalchemy import func
|
|
|
|
from models import Status, Deal, DealStatusHistory
|
|
from repositories.mixins import *
|
|
from schemas.status import UpdateStatusSchema, CreateStatusSchema
|
|
|
|
|
|
class StatusRepository(RepCrudMixin[Status, CreateStatusSchema, UpdateStatusSchema]):
|
|
entity_class = Status
|
|
entity_not_found_msg = "Статус не найден"
|
|
|
|
def _process_get_all_stmt_with_args(self, stmt: Select, *args) -> Select:
|
|
board_id = args[0]
|
|
return stmt.where(Status.board_id == board_id).order_by(Status.lexorank)
|
|
|
|
async def get_all(self, board_id: int) -> list[Status]:
|
|
stmt = (
|
|
select(Status)
|
|
.where(Status.is_deleted.is_(False), Status.board_id == board_id)
|
|
.order_by(Status.lexorank)
|
|
)
|
|
result = await self.session.execute(stmt)
|
|
return list(result.scalars().all())
|
|
|
|
async def get_deals_count(self, status_id: int) -> int:
|
|
stmt = select(func.count(Deal.id)).where(Deal.status_id == status_id)
|
|
result = await self.session.execute(stmt)
|
|
return result.scalar()
|
|
|
|
async def update(self, status: Status, data: UpdateStatusSchema) -> Status:
|
|
return await self._apply_update_data_to_model(status, data, True)
|
|
|
|
async def get_status_history(self, deal_id: int) -> list[DealStatusHistory]:
|
|
stmt = (
|
|
select(DealStatusHistory)
|
|
.where(DealStatusHistory.deal_id == deal_id)
|
|
.order_by(DealStatusHistory.created_at)
|
|
)
|
|
result = await self.session.execute(stmt)
|
|
return list(result.scalars().all())
|