from sqlalchemy import func from sqlalchemy.orm import joinedload from models import Deal, Board, DealStatusHistory from modules.fulfillment_base.models import ( DealService, Service, DealProductService, DealProduct, ) from repositories.mixins import * from schemas.base import SortDir from schemas.deal import UpdateDealSchema, CreateDealSchema, DealSchema from utils.sorting import apply_sorting class DealRepository( BaseRepository, RepDeleteMixin[Deal], RepCreateMixin[Deal, CreateDealSchema], RepUpdateMixin[Deal, UpdateDealSchema], RepGetByIdMixin[Deal], ): entity_class = Deal entity_not_found_msg = "Сделка не найдена" def _get_price_subquery(self): deal_services_subquery = ( select( DealService.deal_id, func.sum(DealService.quantity * DealService.price).label("total_price"), ) .join(Service) .group_by(DealService.deal_id) ) product_services_subquery = select( select( DealProductService.deal_id, func.sum(DealProduct.quantity * DealProductService.price).label( "total_price" ), ) .join(DealProduct) .group_by(DealProductService.deal_id) .subquery() ) union_subqueries = deal_services_subquery.union_all( product_services_subquery ).subquery() final_subquery = ( select( union_subqueries.c.deal_id, func.sum(union_subqueries.c.total_price).label("total_price"), ) .group_by(union_subqueries.c.deal_id) .subquery() ) return final_subquery def _get_products_quantity_subquery(self): return ( select( DealProduct.deal_id, func.sum(DealProduct.quantity).label("products_quantity"), ) .group_by(DealProduct.deal_id) .subquery() ) async def get_all( self, page: Optional[int], items_per_page: Optional[int], field: Optional[str], direction: Optional[SortDir], project_id: Optional[int], board_id: Optional[int], status_id: Optional[int], id: Optional[int], name: Optional[str], ) -> tuple[list[tuple[Deal, int, int]], int]: price_subquery = self._get_price_subquery() products_quantity_subquery = self._get_products_quantity_subquery() stmt = ( select( Deal, func.coalesce(price_subquery.c.total_price, 0), func.coalesce(products_quantity_subquery.c.products_quantity, 0), ) .outerjoin( price_subquery, Deal.id == price_subquery.c.deal_id, ) .outerjoin( products_quantity_subquery, Deal.id == products_quantity_subquery.c.deal_id, ) .options(joinedload(Deal.status), joinedload(Deal.board)) .where(Deal.is_deleted.is_(False)) ) if id: stmt = stmt.where(Deal.id == id) if project_id: stmt = stmt.join(Board).where(Board.project_id == project_id) if board_id: stmt = stmt.where(Deal.board_id == board_id) if status_id: stmt = stmt.where(Deal.status_id == status_id) if name: stmt = stmt.where(Deal.name.ilike(f"%{name}%")) total_items = len((await self.session.execute(stmt)).all()) if field and direction is not None: stmt = apply_sorting(stmt, Deal, field, direction) else: stmt = stmt.order_by(Deal.lexorank) if page and items_per_page: stmt = self._apply_pagination(stmt, page, items_per_page) rows: list[tuple[Deal, int, int]] = (await self.session.execute(stmt)).all() return rows, total_items def _process_get_by_id_stmt(self, stmt: Select) -> Select: return stmt.options(joinedload(Deal.status), joinedload(Deal.board)) async def update(self, deal: Deal, data: UpdateDealSchema) -> Deal: fields = ["lexorank", "name", "board_id"] deal = await self._apply_update_data_to_model(deal, data, False, fields) if data.status_id and deal.status_id != data.status_id: deal.status_history.append( DealStatusHistory( from_status_id=deal.status_id, to_status_id=data.status_id, ) ) deal.status_id = data.status_id self.session.add(deal) await self.session.commit() await self.session.refresh(deal) return deal