from typing import Optional from sqlalchemy import select, delete from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import joinedload from modules.fulfillment_base.models import DealProductService from modules.fulfillment_base.schemas.product_service import * from repositories.base import BaseRepository from repositories.mixins import RepUpdateMixin from utils.exceptions import ObjectNotFoundException class ProductServiceRepository( BaseRepository, RepUpdateMixin[DealProductService, UpdateProductServiceSchema], ): entity_class = DealProductService entity_not_found_msg = "Связь услуги с товаром не найдена" session: AsyncSession async def get_by_id( self, deal_id: int, product_id: int, service_id: int, raise_if_not_found: Optional[bool] = True, ) -> Optional[DealProductService]: stmt = ( select(DealProductService) .options( joinedload(DealProductService.service), ) .where( DealProductService.deal_id == deal_id, DealProductService.product_id == product_id, DealProductService.service_id == service_id, ) ) result = (await self.session.execute(stmt)).scalar_one_or_none() if result is None and raise_if_not_found: raise ObjectNotFoundException("Связь услуги с товаром не найдена") return result async def create(self, data: CreateProductServiceSchema): deal_product_service = DealProductService(**data.model_dump()) self.session.add(deal_product_service) await self.session.commit() async def delete(self, obj: DealProductService): await self.session.delete(obj) await self.session.commit() async def get_product_services( self, deal_id: int, product_id: int ) -> list[DealProductService]: stmt = ( select(DealProductService) .options( joinedload(DealProductService.service), ) .where( DealProductService.deal_id == deal_id, DealProductService.product_id == product_id, ) ) return list(await self.session.scalars(stmt)) async def delete_product_services(self, deal_id: int, product_ids: list[int]): stmt = delete(DealProductService).where( DealProductService.deal_id == deal_id, DealProductService.product_id.in_(product_ids), ) await self.session.execute(stmt) await self.session.flush() async def duplicate_services( self, deal_id: int, product_ids: list[int], services: list[DealProductService] ): await self.delete_product_services(deal_id, product_ids) for product_id in product_ids: for prod_service in services: product_service = DealProductService( deal_id=deal_id, product_id=product_id, service_id=prod_service.service.id, price=prod_service.price, is_fixed_price=prod_service.is_fixed_price, ) self.session.add(product_service) await self.session.commit()