59 lines
1.9 KiB
Python
59 lines
1.9 KiB
Python
from typing import Optional
|
|
|
|
from sqlalchemy import select
|
|
|
|
from models import Deal, CardStatusHistory
|
|
from repositories.base import BaseRepository
|
|
from schemas.deal import UpdateDealSchema, CreateDealSchema
|
|
|
|
|
|
class DealRepository(BaseRepository):
|
|
async def get_all(self, board_id: int) -> list[Deal]:
|
|
stmt = (
|
|
select(Deal)
|
|
.where(Deal.is_deleted.is_(False), Deal.board_id == board_id)
|
|
.order_by(Deal.lexorank)
|
|
)
|
|
result = await self.session.execute(stmt)
|
|
return list(result.scalars().all())
|
|
|
|
async def get_by_id(self, deal_id: int) -> Optional[Deal]:
|
|
stmt = select(Deal).where(Deal.id == deal_id, Deal.is_deleted.is_(False))
|
|
result = await self.session.execute(stmt)
|
|
return result.scalar_one_or_none()
|
|
|
|
async def create(self, data: CreateDealSchema) -> Deal:
|
|
deal = Deal(**data.model_dump())
|
|
self.session.add(deal)
|
|
await self.session.commit()
|
|
await self.session.refresh(deal)
|
|
return deal
|
|
|
|
async def update(self, deal: Deal, data: UpdateDealSchema) -> Deal:
|
|
deal.lexorank = data.lexorank if data.lexorank else deal.lexorank
|
|
deal.name = data.name if data.name else deal.name
|
|
|
|
if data.status_id and deal.status_id != data.status_id:
|
|
deal.status_history.append(
|
|
CardStatusHistory(
|
|
from_status_id=deal.status_id,
|
|
to_status_id=data.status_id,
|
|
)
|
|
)
|
|
deal.status_id = data.status_id
|
|
|
|
self.session.add(deal)
|
|
await self.session.commit()
|
|
await self.session.refresh(deal)
|
|
return deal
|
|
|
|
async def delete(self, deal: Deal, is_soft: bool):
|
|
if not is_soft:
|
|
await self.session.delete(deal)
|
|
await self.session.commit()
|
|
return
|
|
|
|
deal.is_deleted = True
|
|
self.session.add(deal)
|
|
await self.session.commit()
|