Files
Crm-Backend/repositories/deal.py

88 lines
2.9 KiB
Python

from typing import Optional
from sqlalchemy import select
from sqlalchemy.orm import joinedload
from models import Deal, CardStatusHistory, Board
from repositories.base import BaseRepository
from repositories.mixins import RepDeleteMixin, RepCreateMixin
from schemas.base import SortDir
from schemas.deal import UpdateDealSchema, CreateDealSchema
from utils.sorting import apply_sorting
class DealRepository(
BaseRepository, RepDeleteMixin[Deal], RepCreateMixin[Deal, CreateDealSchema]
):
entity_class = Deal
async def get_all(
self,
page: Optional[int],
items_per_page: Optional[int],
field: Optional[str],
direction: Optional[SortDir],
project_id: Optional[int],
board_id: Optional[int],
status_id: Optional[int],
id: Optional[int],
name: Optional[str],
) -> tuple[list[Deal], int]:
stmt = (
select(Deal)
.options(joinedload(Deal.status), joinedload(Deal.board))
.where(Deal.is_deleted.is_(False))
)
if id:
stmt = stmt.where(Deal.id == id)
if project_id:
stmt = stmt.join(Board).where(Board.project_id == project_id)
if board_id:
stmt = stmt.where(Deal.board_id == board_id)
if status_id:
stmt = stmt.where(Deal.status_id == status_id)
if name:
stmt = stmt.where(Deal.name.ilike(f"%{name}%"))
total_items = len((await self.session.execute(stmt)).all())
if field and direction is not None:
stmt = apply_sorting(stmt, Deal, field, direction)
else:
stmt = stmt.order_by(Deal.lexorank)
if page and items_per_page:
stmt = self._apply_pagination(stmt, page, items_per_page)
result = await self.session.execute(stmt)
return list(result.scalars().all()), total_items
async def get_by_id(self, deal_id: int) -> Optional[Deal]:
stmt = (
select(Deal)
.options(joinedload(Deal.status), joinedload(Deal.board))
.where(Deal.id == deal_id, Deal.is_deleted.is_(False))
)
result = await self.session.execute(stmt)
return result.scalar_one_or_none()
async def update(self, deal: Deal, data: UpdateDealSchema) -> Deal:
deal.lexorank = data.lexorank if data.lexorank else deal.lexorank
deal.name = data.name if data.name else deal.name
deal.board_id = data.board_id if data.board_id else deal.board_id
if data.status_id and deal.status_id != data.status_id:
deal.status_history.append(
CardStatusHistory(
from_status_id=deal.status_id,
to_status_id=data.status_id,
)
)
deal.status_id = data.status_id
self.session.add(deal)
await self.session.commit()
await self.session.refresh(deal)
return deal