Files

185 lines
6.1 KiB
Python

from sqlalchemy import func
from sqlalchemy.orm import joinedload, selectinload
from models import Deal, Board, DealStatusHistory
from modules.fulfillment_base.models import (
DealService,
Service,
DealProductService,
DealProduct,
)
from repositories import AttributeRepository
from repositories.mixins import *
from schemas.base import SortDir
from schemas.deal import UpdateDealSchema, CreateDealSchema
from utils.sorting import apply_sorting
class DealRepository(
BaseRepository,
RepDeleteMixin[Deal],
RepCreateMixin[Deal, CreateDealSchema],
RepUpdateMixin[Deal, UpdateDealSchema],
RepGetByIdMixin[Deal],
):
entity_class = Deal
entity_not_found_msg = "Сделка не найдена"
def _get_price_subquery(self):
deal_services_subquery = (
select(
DealService.deal_id,
func.sum(DealService.quantity * DealService.price).label("total_price"),
)
.join(Service)
.group_by(DealService.deal_id)
)
product_services_subquery = select(
select(
DealProductService.deal_id,
func.sum(DealProduct.quantity * DealProductService.price).label(
"total_price"
),
)
.join(DealProduct)
.group_by(DealProductService.deal_id)
.subquery()
)
union_subqueries = deal_services_subquery.union_all(
product_services_subquery
).subquery()
final_subquery = (
select(
union_subqueries.c.deal_id,
func.sum(union_subqueries.c.total_price).label("total_price"),
)
.group_by(union_subqueries.c.deal_id)
.subquery()
)
return final_subquery
def _get_products_quantity_subquery(self):
return (
select(
DealProduct.deal_id,
func.sum(DealProduct.quantity).label("products_quantity"),
)
.group_by(DealProduct.deal_id)
.subquery()
)
async def get_all(
self,
page: Optional[int],
items_per_page: Optional[int],
field: Optional[str],
direction: Optional[SortDir],
project_id: Optional[int],
board_id: Optional[int],
status_id: Optional[int],
id: Optional[int],
name: Optional[str],
) -> tuple[list[tuple[Deal, int, int]], int]:
price_subquery = self._get_price_subquery()
products_quantity_subquery = self._get_products_quantity_subquery()
stmt = (
select(
Deal,
func.coalesce(price_subquery.c.total_price, 0),
func.coalesce(products_quantity_subquery.c.products_quantity, 0),
)
.outerjoin(
price_subquery,
Deal.id == price_subquery.c.deal_id,
)
.outerjoin(
products_quantity_subquery,
Deal.id == products_quantity_subquery.c.deal_id,
)
.options(
joinedload(Deal.status),
joinedload(Deal.board),
selectinload(Deal.group),
selectinload(Deal.tags),
)
.where(Deal.is_deleted.is_(False))
)
if id:
stmt = stmt.where(Deal.id == id)
if project_id:
stmt = stmt.join(Board).where(Board.project_id == project_id)
if board_id:
stmt = stmt.where(Deal.board_id == board_id)
if status_id:
stmt = stmt.where(Deal.status_id == status_id)
if name:
stmt = stmt.where(Deal.name.ilike(f"%{name}%"))
total_items = len((await self.session.execute(stmt)).all())
if field and direction is not None:
stmt = apply_sorting(stmt, Deal, field, direction)
else:
stmt = stmt.order_by(Deal.lexorank)
if page and items_per_page:
stmt = self._apply_pagination(stmt, page, items_per_page)
rows: list[tuple[Deal, int, int]] = (await self.session.execute(stmt)).all()
return rows, total_items
async def get_by_group_id(self, group_id: int) -> list[Deal]:
stmt = (
select(Deal)
.where(Deal.group_id == group_id, Deal.is_deleted.is_(False))
.options(joinedload(Deal.status), joinedload(Deal.board))
)
result = await self.session.execute(stmt)
return list(result.scalars().all())
async def get_by_ids(self, deal_ids: list[int]) -> list[Deal]:
stmt = (
select(Deal)
.where(Deal.id.in_(deal_ids), Deal.is_deleted.is_(False))
.options(joinedload(Deal.status), joinedload(Deal.board))
)
result = await self.session.execute(stmt)
return list(result.scalars().all())
async def _prepare_create(self, data: CreateDealSchema) -> dict:
dumped = data.model_dump()
del dumped["project_id"]
return dumped
async def _after_create(self, obj: Deal, data: CreateDealSchema) -> None:
attr_repo = AttributeRepository(self.session)
await attr_repo.create_attributes_for_new_deal(obj.id, data.project_id)
def _process_get_by_id_stmt(self, stmt: Select) -> Select:
return stmt.options(joinedload(Deal.status), joinedload(Deal.board))
async def update_status(self, deal: Deal, status_id: int):
if deal.status_id == status_id:
return
deal.status_history.append(
DealStatusHistory(
from_status_id=deal.status_id,
to_status_id=status_id,
)
)
deal.status_id = status_id
async def update(self, deal: Deal, data: UpdateDealSchema) -> Deal:
fields = ["lexorank", "name", "board_id"]
deal = await self._apply_update_data_to_model(deal, data, False, fields)
if data.status_id:
await self.update_status(deal, data.status_id)
self.session.add(deal)
await self.session.commit()
await self.session.refresh(deal)
return deal