Files
Crm-Backend/repositories/deal.py

74 lines
2.4 KiB
Python

from typing import Optional
from sqlalchemy import select
from models import Deal, CardStatusHistory, Board
from repositories.base import BaseRepository
from schemas.deal import UpdateDealSchema, CreateDealSchema
class DealRepository(BaseRepository):
async def get_all(
self,
board_id: Optional[int],
project_id: Optional[int],
page: Optional[int],
items_per_page: Optional[int],
) -> tuple[list[Deal], int]:
stmt = select(Deal).where(Deal.is_deleted.is_(False))
if board_id:
stmt = stmt.where(Deal.board_id == board_id)
if project_id:
stmt = stmt.join(Board).where(Board.project_id == project_id)
total_items = len((await self.session.execute(stmt)).all())
stmt = stmt.order_by(Deal.lexorank)
if page and items_per_page:
stmt = self._apply_pagination(stmt, page, items_per_page)
result = await self.session.execute(stmt)
return list(result.scalars().all()), total_items
async def get_by_id(self, deal_id: int) -> Optional[Deal]:
stmt = select(Deal).where(Deal.id == deal_id, Deal.is_deleted.is_(False))
result = await self.session.execute(stmt)
return result.scalar_one_or_none()
async def create(self, data: CreateDealSchema) -> Deal:
deal = Deal(**data.model_dump())
self.session.add(deal)
await self.session.commit()
await self.session.refresh(deal)
return deal
async def update(self, deal: Deal, data: UpdateDealSchema) -> Deal:
deal.lexorank = data.lexorank if data.lexorank else deal.lexorank
deal.name = data.name if data.name else deal.name
if data.status_id and deal.status_id != data.status_id:
deal.status_history.append(
CardStatusHistory(
from_status_id=deal.status_id,
to_status_id=data.status_id,
)
)
deal.status_id = data.status_id
self.session.add(deal)
await self.session.commit()
await self.session.refresh(deal)
return deal
async def delete(self, deal: Deal, is_soft: bool):
if not is_soft:
await self.session.delete(deal)
await self.session.commit()
return
deal.is_deleted = True
self.session.add(deal)
await self.session.commit()