100 lines
3.2 KiB
Python
100 lines
3.2 KiB
Python
from typing import Optional
|
|
|
|
from sqlalchemy import select
|
|
from sqlalchemy.orm import joinedload
|
|
|
|
from models import Deal, CardStatusHistory, Board
|
|
from repositories.base import BaseRepository
|
|
from schemas.base import SortDir
|
|
from schemas.deal import UpdateDealSchema, CreateDealSchema
|
|
from utils.sorting import apply_sorting
|
|
|
|
|
|
class DealRepository(BaseRepository):
|
|
async def get_all(
|
|
self,
|
|
page: Optional[int],
|
|
items_per_page: Optional[int],
|
|
field: Optional[str],
|
|
direction: Optional[SortDir],
|
|
project_id: Optional[int],
|
|
board_id: Optional[int],
|
|
status_id: Optional[int],
|
|
id: Optional[int],
|
|
name: Optional[str],
|
|
) -> tuple[list[Deal], int]:
|
|
stmt = (
|
|
select(Deal)
|
|
.options(joinedload(Deal.status), joinedload(Deal.board))
|
|
.where(Deal.is_deleted.is_(False))
|
|
)
|
|
|
|
if id:
|
|
stmt = stmt.where(Deal.id == id)
|
|
if project_id:
|
|
stmt = stmt.join(Board).where(Board.project_id == project_id)
|
|
if board_id:
|
|
stmt = stmt.where(Deal.board_id == board_id)
|
|
if status_id:
|
|
stmt = stmt.where(Deal.status_id == status_id)
|
|
if name:
|
|
stmt = stmt.where(Deal.name.ilike(f"%{name}%"))
|
|
|
|
total_items = len((await self.session.execute(stmt)).all())
|
|
|
|
if field and direction is not None:
|
|
stmt = apply_sorting(stmt, Deal, field, direction)
|
|
else:
|
|
stmt = stmt.order_by(Deal.lexorank)
|
|
|
|
if page and items_per_page:
|
|
stmt = self._apply_pagination(stmt, page, items_per_page)
|
|
|
|
result = await self.session.execute(stmt)
|
|
return list(result.scalars().all()), total_items
|
|
|
|
async def get_by_id(self, deal_id: int) -> Optional[Deal]:
|
|
stmt = (
|
|
select(Deal)
|
|
.options(joinedload(Deal.status), joinedload(Deal.board))
|
|
.where(Deal.id == deal_id, Deal.is_deleted.is_(False))
|
|
)
|
|
result = await self.session.execute(stmt)
|
|
return result.scalar_one_or_none()
|
|
|
|
async def create(self, data: CreateDealSchema) -> Deal:
|
|
deal = Deal(**data.model_dump())
|
|
self.session.add(deal)
|
|
await self.session.commit()
|
|
await self.session.refresh(deal)
|
|
print(deal.id)
|
|
return await self.get_by_id(deal.id)
|
|
|
|
async def update(self, deal: Deal, data: UpdateDealSchema) -> Deal:
|
|
deal.lexorank = data.lexorank if data.lexorank else deal.lexorank
|
|
deal.name = data.name if data.name else deal.name
|
|
|
|
if data.status_id and deal.status_id != data.status_id:
|
|
deal.status_history.append(
|
|
CardStatusHistory(
|
|
from_status_id=deal.status_id,
|
|
to_status_id=data.status_id,
|
|
)
|
|
)
|
|
deal.status_id = data.status_id
|
|
|
|
self.session.add(deal)
|
|
await self.session.commit()
|
|
await self.session.refresh(deal)
|
|
return deal
|
|
|
|
async def delete(self, deal: Deal, is_soft: bool):
|
|
if not is_soft:
|
|
await self.session.delete(deal)
|
|
await self.session.commit()
|
|
return
|
|
|
|
deal.is_deleted = True
|
|
self.session.add(deal)
|
|
await self.session.commit()
|