from typing import Optional from sqlalchemy import select, Select from sqlalchemy.orm import joinedload from models import Deal, CardStatusHistory, Board from repositories.base import BaseRepository from repositories.mixins import RepDeleteMixin, RepCreateMixin, GetByIdMixin from schemas.base import SortDir from schemas.deal import UpdateDealSchema, CreateDealSchema from utils.sorting import apply_sorting class DealRepository( BaseRepository, RepDeleteMixin[Deal], RepCreateMixin[Deal, CreateDealSchema], GetByIdMixin[Deal], ): entity_class = Deal async def get_all( self, page: Optional[int], items_per_page: Optional[int], field: Optional[str], direction: Optional[SortDir], project_id: Optional[int], board_id: Optional[int], status_id: Optional[int], id: Optional[int], name: Optional[str], ) -> tuple[list[Deal], int]: stmt = ( select(Deal) .options(joinedload(Deal.status), joinedload(Deal.board)) .where(Deal.is_deleted.is_(False)) ) if id: stmt = stmt.where(Deal.id == id) if project_id: stmt = stmt.join(Board).where(Board.project_id == project_id) if board_id: stmt = stmt.where(Deal.board_id == board_id) if status_id: stmt = stmt.where(Deal.status_id == status_id) if name: stmt = stmt.where(Deal.name.ilike(f"%{name}%")) total_items = len((await self.session.execute(stmt)).all()) if field and direction is not None: stmt = apply_sorting(stmt, Deal, field, direction) else: stmt = stmt.order_by(Deal.lexorank) if page and items_per_page: stmt = self._apply_pagination(stmt, page, items_per_page) result = await self.session.execute(stmt) return list(result.scalars().all()), total_items def _process_get_by_id_stmt(self, stmt: Select) -> Select: return stmt.options(joinedload(Deal.status), joinedload(Deal.board)) async def update(self, deal: Deal, data: UpdateDealSchema) -> Deal: deal.lexorank = data.lexorank if data.lexorank else deal.lexorank deal.name = data.name if data.name else deal.name deal.board_id = data.board_id if data.board_id else deal.board_id if data.status_id and deal.status_id != data.status_id: deal.status_history.append( CardStatusHistory( from_status_id=deal.status_id, to_status_id=data.status_id, ) ) deal.status_id = data.status_id self.session.add(deal) await self.session.commit() await self.session.refresh(deal) return deal