175 lines
5.7 KiB
Python
175 lines
5.7 KiB
Python
from sqlalchemy import func
|
|
from sqlalchemy.orm import joinedload, selectinload
|
|
|
|
from models import Deal, Board, DealStatusHistory
|
|
from modules.fulfillment_base.models import (
|
|
DealService,
|
|
Service,
|
|
DealProductService,
|
|
DealProduct,
|
|
)
|
|
from repositories.mixins import *
|
|
from schemas.base import SortDir
|
|
from schemas.deal import UpdateDealSchema, CreateDealSchema
|
|
from utils.sorting import apply_sorting
|
|
|
|
|
|
class DealRepository(
|
|
BaseRepository,
|
|
RepDeleteMixin[Deal],
|
|
RepCreateMixin[Deal, CreateDealSchema],
|
|
RepUpdateMixin[Deal, UpdateDealSchema],
|
|
RepGetByIdMixin[Deal],
|
|
):
|
|
entity_class = Deal
|
|
entity_not_found_msg = "Сделка не найдена"
|
|
|
|
def _get_price_subquery(self):
|
|
deal_services_subquery = (
|
|
select(
|
|
DealService.deal_id,
|
|
func.sum(DealService.quantity * DealService.price).label("total_price"),
|
|
)
|
|
.join(Service)
|
|
.group_by(DealService.deal_id)
|
|
)
|
|
product_services_subquery = select(
|
|
select(
|
|
DealProductService.deal_id,
|
|
func.sum(DealProduct.quantity * DealProductService.price).label(
|
|
"total_price"
|
|
),
|
|
)
|
|
.join(DealProduct)
|
|
.group_by(DealProductService.deal_id)
|
|
.subquery()
|
|
)
|
|
union_subqueries = deal_services_subquery.union_all(
|
|
product_services_subquery
|
|
).subquery()
|
|
final_subquery = (
|
|
select(
|
|
union_subqueries.c.deal_id,
|
|
func.sum(union_subqueries.c.total_price).label("total_price"),
|
|
)
|
|
.group_by(union_subqueries.c.deal_id)
|
|
.subquery()
|
|
)
|
|
return final_subquery
|
|
|
|
def _get_products_quantity_subquery(self):
|
|
return (
|
|
select(
|
|
DealProduct.deal_id,
|
|
func.sum(DealProduct.quantity).label("products_quantity"),
|
|
)
|
|
.group_by(DealProduct.deal_id)
|
|
.subquery()
|
|
)
|
|
|
|
async def get_all(
|
|
self,
|
|
page: Optional[int],
|
|
items_per_page: Optional[int],
|
|
field: Optional[str],
|
|
direction: Optional[SortDir],
|
|
project_id: Optional[int],
|
|
board_id: Optional[int],
|
|
status_id: Optional[int],
|
|
id: Optional[int],
|
|
name: Optional[str],
|
|
) -> tuple[list[tuple[Deal, int, int]], int]:
|
|
price_subquery = self._get_price_subquery()
|
|
products_quantity_subquery = self._get_products_quantity_subquery()
|
|
stmt = (
|
|
select(
|
|
Deal,
|
|
func.coalesce(price_subquery.c.total_price, 0),
|
|
func.coalesce(products_quantity_subquery.c.products_quantity, 0),
|
|
)
|
|
.outerjoin(
|
|
price_subquery,
|
|
Deal.id == price_subquery.c.deal_id,
|
|
)
|
|
.outerjoin(
|
|
products_quantity_subquery,
|
|
Deal.id == products_quantity_subquery.c.deal_id,
|
|
)
|
|
.options(
|
|
joinedload(Deal.status),
|
|
joinedload(Deal.board),
|
|
selectinload(Deal.group),
|
|
selectinload(Deal.tags),
|
|
)
|
|
.where(Deal.is_deleted.is_(False))
|
|
)
|
|
|
|
if id:
|
|
stmt = stmt.where(Deal.id == id)
|
|
if project_id:
|
|
stmt = stmt.join(Board).where(Board.project_id == project_id)
|
|
if board_id:
|
|
stmt = stmt.where(Deal.board_id == board_id)
|
|
if status_id:
|
|
stmt = stmt.where(Deal.status_id == status_id)
|
|
if name:
|
|
stmt = stmt.where(Deal.name.ilike(f"%{name}%"))
|
|
|
|
total_items = len((await self.session.execute(stmt)).all())
|
|
|
|
if field and direction is not None:
|
|
stmt = apply_sorting(stmt, Deal, field, direction)
|
|
else:
|
|
stmt = stmt.order_by(Deal.lexorank)
|
|
|
|
if page and items_per_page:
|
|
stmt = self._apply_pagination(stmt, page, items_per_page)
|
|
|
|
rows: list[tuple[Deal, int, int]] = (await self.session.execute(stmt)).all()
|
|
return rows, total_items
|
|
|
|
async def get_by_group_id(self, group_id: int) -> list[Deal]:
|
|
stmt = (
|
|
select(Deal)
|
|
.where(Deal.group_id == group_id, Deal.is_deleted.is_(False))
|
|
.options(joinedload(Deal.status), joinedload(Deal.board))
|
|
)
|
|
result = await self.session.execute(stmt)
|
|
return list(result.scalars().all())
|
|
|
|
async def get_by_ids(self, deal_ids: list[int]) -> list[Deal]:
|
|
stmt = (
|
|
select(Deal)
|
|
.where(Deal.id.in_(deal_ids), Deal.is_deleted.is_(False))
|
|
.options(joinedload(Deal.status), joinedload(Deal.board))
|
|
)
|
|
result = await self.session.execute(stmt)
|
|
return list(result.scalars().all())
|
|
|
|
def _process_get_by_id_stmt(self, stmt: Select) -> Select:
|
|
return stmt.options(joinedload(Deal.status), joinedload(Deal.board))
|
|
|
|
async def update_status(self, deal: Deal, status_id: int):
|
|
if deal.status_id == status_id:
|
|
return
|
|
|
|
deal.status_history.append(
|
|
DealStatusHistory(
|
|
from_status_id=deal.status_id,
|
|
to_status_id=status_id,
|
|
)
|
|
)
|
|
deal.status_id = status_id
|
|
|
|
async def update(self, deal: Deal, data: UpdateDealSchema) -> Deal:
|
|
fields = ["lexorank", "name", "board_id"]
|
|
deal = await self._apply_update_data_to_model(deal, data, False, fields)
|
|
|
|
if data.status_id:
|
|
await self.update_status(deal, data.status_id)
|
|
|
|
self.session.add(deal)
|
|
await self.session.commit()
|
|
await self.session.refresh(deal)
|
|
return deal
|