from typing import Optional from sqlalchemy import select from models import Deal, CardStatusHistory, Board from repositories.base import BaseRepository from schemas.deal import UpdateDealSchema, CreateDealSchema class DealRepository(BaseRepository): async def get_all( self, board_id: Optional[int], project_id: Optional[int], page: Optional[int], items_per_page: Optional[int], ) -> tuple[list[Deal], int]: stmt = select(Deal).where(Deal.is_deleted.is_(False)) if board_id: stmt = stmt.where(Deal.board_id == board_id) if project_id: stmt = stmt.join(Board).where(Board.project_id == project_id) total_items = len((await self.session.execute(stmt)).all()) stmt = stmt.order_by(Deal.lexorank) if page and items_per_page: stmt = self._apply_pagination(stmt, page, items_per_page) result = await self.session.execute(stmt) return list(result.scalars().all()), total_items async def get_by_id(self, deal_id: int) -> Optional[Deal]: stmt = select(Deal).where(Deal.id == deal_id, Deal.is_deleted.is_(False)) result = await self.session.execute(stmt) return result.scalar_one_or_none() async def create(self, data: CreateDealSchema) -> Deal: deal = Deal(**data.model_dump()) self.session.add(deal) await self.session.commit() await self.session.refresh(deal) return deal async def update(self, deal: Deal, data: UpdateDealSchema) -> Deal: deal.lexorank = data.lexorank if data.lexorank else deal.lexorank deal.name = data.name if data.name else deal.name if data.status_id and deal.status_id != data.status_id: deal.status_history.append( CardStatusHistory( from_status_id=deal.status_id, to_status_id=data.status_id, ) ) deal.status_id = data.status_id self.session.add(deal) await self.session.commit() await self.session.refresh(deal) return deal async def delete(self, deal: Deal, is_soft: bool): if not is_soft: await self.session.delete(deal) await self.session.commit() return deal.is_deleted = True self.session.add(deal) await self.session.commit()