49 lines
1.5 KiB
Python
49 lines
1.5 KiB
Python
from typing import Optional
|
|
|
|
from sqlalchemy import Select, select
|
|
from sqlalchemy.orm import joinedload
|
|
|
|
from modules.fulfillment_base.models import DealService
|
|
from modules.fulfillment_base.schemas.deal_service import (
|
|
UpdateDealServiceSchema,
|
|
CreateDealServiceSchema,
|
|
)
|
|
from repositories.base import BaseRepository
|
|
from repositories.mixins import RepGetAllMixin, RepUpdateMixin
|
|
|
|
|
|
class DealServiceRepository(
|
|
BaseRepository,
|
|
RepGetAllMixin[DealService],
|
|
RepUpdateMixin[DealService, UpdateDealServiceSchema],
|
|
):
|
|
entity_class = DealService
|
|
|
|
def _process_get_all_stmt_with_args(self, stmt: Select, *args) -> Select:
|
|
deal_id = args[0]
|
|
return (
|
|
stmt.options(
|
|
joinedload(DealService.service),
|
|
)
|
|
.where(DealService.deal_id == deal_id)
|
|
.order_by(DealService.service_id)
|
|
)
|
|
|
|
async def get_by_id(self, deal_id: int, service_id: int) -> Optional[DealService]:
|
|
stmt = (
|
|
select(DealService)
|
|
.options(joinedload(DealService.service))
|
|
.where(DealService.deal_id == deal_id, DealService.service_id == service_id)
|
|
)
|
|
result = await self.session.execute(stmt)
|
|
return result.scalar_one_or_none()
|
|
|
|
async def create(self, data: CreateDealServiceSchema):
|
|
deal_service = DealService(**data.model_dump())
|
|
self.session.add(deal_service)
|
|
await self.session.commit()
|
|
|
|
async def delete(self, obj: DealService):
|
|
await self.session.delete(obj)
|
|
await self.session.commit()
|