from sqlalchemy import func from sqlalchemy.orm import joinedload, selectinload from models import Deal, Board, DealStatusHistory from modules.fulfillment_base.models import ( DealService, Service, DealProductService, DealProduct, ) from repositories.mixins import * from schemas.base import SortDir from schemas.deal import UpdateDealSchema, CreateDealSchema from utils.sorting import apply_sorting class DealRepository( BaseRepository, RepDeleteMixin[Deal], RepCreateMixin[Deal, CreateDealSchema], RepUpdateMixin[Deal, UpdateDealSchema], RepGetByIdMixin[Deal], ): entity_class = Deal entity_not_found_msg = "Сделка не найдена" def _get_price_subquery(self): deal_services_subquery = ( select( DealService.deal_id, func.sum(DealService.quantity * DealService.price).label("total_price"), ) .join(Service) .group_by(DealService.deal_id) ) product_services_subquery = select( select( DealProductService.deal_id, func.sum(DealProduct.quantity * DealProductService.price).label( "total_price" ), ) .join(DealProduct) .group_by(DealProductService.deal_id) .subquery() ) union_subqueries = deal_services_subquery.union_all( product_services_subquery ).subquery() final_subquery = ( select( union_subqueries.c.deal_id, func.sum(union_subqueries.c.total_price).label("total_price"), ) .group_by(union_subqueries.c.deal_id) .subquery() ) return final_subquery def _get_products_quantity_subquery(self): return ( select( DealProduct.deal_id, func.sum(DealProduct.quantity).label("products_quantity"), ) .group_by(DealProduct.deal_id) .subquery() ) async def get_all( self, page: Optional[int], items_per_page: Optional[int], field: Optional[str], direction: Optional[SortDir], project_id: Optional[int], board_id: Optional[int], status_id: Optional[int], id: Optional[int], name: Optional[str], ) -> tuple[list[tuple[Deal, int, int]], int]: price_subquery = self._get_price_subquery() products_quantity_subquery = self._get_products_quantity_subquery() stmt = ( select( Deal, func.coalesce(price_subquery.c.total_price, 0), func.coalesce(products_quantity_subquery.c.products_quantity, 0), ) .outerjoin( price_subquery, Deal.id == price_subquery.c.deal_id, ) .outerjoin( products_quantity_subquery, Deal.id == products_quantity_subquery.c.deal_id, ) .options( joinedload(Deal.status), joinedload(Deal.board), selectinload(Deal.group), selectinload(Deal.tags), ) .where(Deal.is_deleted.is_(False)) ) if id: stmt = stmt.where(Deal.id == id) if project_id: stmt = stmt.join(Board).where(Board.project_id == project_id) if board_id: stmt = stmt.where(Deal.board_id == board_id) if status_id: stmt = stmt.where(Deal.status_id == status_id) if name: stmt = stmt.where(Deal.name.ilike(f"%{name}%")) total_items = len((await self.session.execute(stmt)).all()) if field and direction is not None: stmt = apply_sorting(stmt, Deal, field, direction) else: stmt = stmt.order_by(Deal.lexorank) if page and items_per_page: stmt = self._apply_pagination(stmt, page, items_per_page) rows: list[tuple[Deal, int, int]] = (await self.session.execute(stmt)).all() return rows, total_items async def get_by_group_id(self, group_id: int) -> list[Deal]: stmt = ( select(Deal) .where(Deal.group_id == group_id, Deal.is_deleted.is_(False)) .options(joinedload(Deal.status), joinedload(Deal.board)) ) result = await self.session.execute(stmt) return list(result.scalars().all()) async def get_by_ids(self, deal_ids: list[int]) -> list[Deal]: stmt = ( select(Deal) .where(Deal.id.in_(deal_ids), Deal.is_deleted.is_(False)) .options(joinedload(Deal.status), joinedload(Deal.board)) ) result = await self.session.execute(stmt) return list(result.scalars().all()) def _process_get_by_id_stmt(self, stmt: Select) -> Select: return stmt.options(joinedload(Deal.status), joinedload(Deal.board)) async def update_status(self, deal: Deal, status_id: int): if deal.status_id == status_id: return deal.status_history.append( DealStatusHistory( from_status_id=deal.status_id, to_status_id=status_id, ) ) deal.status_id = status_id async def update(self, deal: Deal, data: UpdateDealSchema) -> Deal: fields = ["lexorank", "name", "board_id"] deal = await self._apply_update_data_to_model(deal, data, False, fields) if data.status_id: await self.update_status(deal, data.status_id) self.session.add(deal) await self.session.commit() await self.session.refresh(deal) return deal