92 lines
3.2 KiB
Python
92 lines
3.2 KiB
Python
from typing import Optional
|
||
|
||
from sqlalchemy import select, delete
|
||
from sqlalchemy.ext.asyncio import AsyncSession
|
||
from sqlalchemy.orm import joinedload
|
||
|
||
from modules.fulfillment_base.models import DealProductService
|
||
from modules.fulfillment_base.schemas.product_service import *
|
||
from repositories.base import BaseRepository
|
||
from repositories.mixins import RepUpdateMixin
|
||
from utils.exceptions import ObjectNotFoundException
|
||
|
||
|
||
class ProductServiceRepository(
|
||
BaseRepository,
|
||
RepUpdateMixin[DealProductService, UpdateProductServiceSchema],
|
||
):
|
||
entity_class = DealProductService
|
||
session: AsyncSession
|
||
|
||
async def get_by_id(
|
||
self,
|
||
deal_id: int,
|
||
product_id: int,
|
||
service_id: int,
|
||
raise_if_not_found: Optional[bool] = True,
|
||
) -> Optional[DealProductService]:
|
||
stmt = (
|
||
select(DealProductService)
|
||
.options(
|
||
joinedload(DealProductService.service),
|
||
)
|
||
.where(
|
||
DealProductService.deal_id == deal_id,
|
||
DealProductService.product_id == product_id,
|
||
DealProductService.service_id == service_id,
|
||
)
|
||
)
|
||
result = (await self.session.execute(stmt)).scalar_one_or_none()
|
||
if result is None and raise_if_not_found:
|
||
raise ObjectNotFoundException("Связь услуги с товаром не найдена")
|
||
return result
|
||
|
||
async def create(self, data: CreateProductServiceSchema):
|
||
deal_product_service = DealProductService(**data.model_dump())
|
||
self.session.add(deal_product_service)
|
||
await self.session.commit()
|
||
|
||
async def delete(self, obj: DealProductService):
|
||
await self.session.delete(obj)
|
||
await self.session.commit()
|
||
|
||
async def get_product_services(
|
||
self, deal_id: int, product_id: int
|
||
) -> list[DealProductService]:
|
||
stmt = (
|
||
select(DealProductService)
|
||
.options(
|
||
joinedload(DealProductService.service),
|
||
)
|
||
.where(
|
||
DealProductService.deal_id == deal_id,
|
||
DealProductService.product_id == product_id,
|
||
)
|
||
)
|
||
return list(await self.session.scalars(stmt))
|
||
|
||
async def delete_product_services(self, deal_id: int, product_ids: list[int]):
|
||
stmt = delete(DealProductService).where(
|
||
DealProductService.deal_id == deal_id,
|
||
DealProductService.product_id.in_(product_ids),
|
||
)
|
||
await self.session.execute(stmt)
|
||
await self.session.flush()
|
||
|
||
async def duplicate_services(
|
||
self, deal_id: int, product_ids: list[int], services: list[DealProductService]
|
||
):
|
||
await self.delete_product_services(deal_id, product_ids)
|
||
|
||
for product_id in product_ids:
|
||
for prod_service in services:
|
||
product_service = DealProductService(
|
||
deal_id=deal_id,
|
||
product_id=product_id,
|
||
service_id=prod_service.service.id,
|
||
price=prod_service.price,
|
||
is_fixed_price=prod_service.is_fixed_price,
|
||
)
|
||
self.session.add(product_service)
|
||
await self.session.commit()
|