153 lines
5.4 KiB
Python
153 lines
5.4 KiB
Python
from sqlalchemy import and_
|
|
from sqlalchemy.orm import joinedload
|
|
|
|
from models import (
|
|
Attribute,
|
|
AttributeLabel,
|
|
AttributeType,
|
|
AttributeValue,
|
|
module_attribute,
|
|
)
|
|
from repositories.mixins import *
|
|
from schemas.attribute import CreateAttributeSchema, UpdateAttributeSchema, UpdateDealModuleAttributeSchema
|
|
from utils.exceptions import ForbiddenException
|
|
|
|
|
|
class AttributeRepository(
|
|
RepCrudMixin[Attribute, CreateAttributeSchema, UpdateAttributeSchema]
|
|
):
|
|
session: AsyncSession
|
|
entity_class = Attribute
|
|
|
|
def _process_get_all_stmt(self, stmt: Select) -> Select:
|
|
return (
|
|
stmt.options(joinedload(Attribute.type))
|
|
.where(Attribute.is_deleted.is_(False))
|
|
.order_by(Attribute.id)
|
|
)
|
|
|
|
def _process_get_by_id_stmt(self, stmt: Select) -> Select:
|
|
return stmt.options(joinedload(Attribute.type))
|
|
|
|
async def _get_attribute_type_by_id(self, type_id: int) -> AttributeType:
|
|
stmt = select(AttributeType).where(AttributeType.id == type_id)
|
|
result = (await self.session.execute(stmt)).one_or_none()
|
|
if result is None:
|
|
raise ObjectNotFoundException("Тип аттрибута не найден")
|
|
return result[0]
|
|
|
|
async def update(self, attr: Attribute, data: UpdateAttributeSchema) -> Attribute:
|
|
if data.type:
|
|
data.type = await self._get_attribute_type_by_id(data.type.id)
|
|
return await self._apply_update_data_to_model(attr, data, True)
|
|
|
|
async def _before_delete(self, attribute: Attribute) -> None:
|
|
if attribute.is_built_in:
|
|
raise ForbiddenException("Нельзя менять встроенный атрибут")
|
|
|
|
async def _get_attribute_module_label(
|
|
self, module_id: int, attribute_id: int
|
|
) -> Optional[AttributeLabel]:
|
|
stmt = select(AttributeLabel).where(
|
|
AttributeLabel.attribute_id == attribute_id,
|
|
AttributeLabel.module_id == module_id,
|
|
)
|
|
result = await self.session.execute(stmt)
|
|
row = result.one_or_none()
|
|
return row[0] if row else None
|
|
|
|
async def create_or_update_attribute_label(
|
|
self, module_id: int, attribute_id: int, label: str
|
|
):
|
|
attribute_label = await self._get_attribute_module_label(
|
|
module_id, attribute_id
|
|
)
|
|
if attribute_label:
|
|
attribute_label.label = label
|
|
else:
|
|
attribute_label = AttributeLabel(
|
|
module_id=module_id,
|
|
attribute_id=attribute_id,
|
|
label=label,
|
|
)
|
|
|
|
self.session.add(attribute_label)
|
|
await self.session.commit()
|
|
|
|
async def get_attribute_types(self) -> list[AttributeType]:
|
|
stmt = select(AttributeType).where(AttributeType.is_deleted.is_(False))
|
|
result = await self.session.execute(stmt)
|
|
return list(result.scalars().all())
|
|
|
|
async def get_deal_module_attributes(
|
|
self, deal_id: int, module_id: int
|
|
) -> list[tuple[Attribute, AttributeValue, AttributeLabel]]:
|
|
stmt = (
|
|
select(Attribute, AttributeValue, AttributeLabel)
|
|
.outerjoin(
|
|
AttributeValue,
|
|
and_(
|
|
AttributeValue.attribute_id == Attribute.id,
|
|
AttributeValue.module_id == module_id,
|
|
AttributeValue.deal_id == deal_id,
|
|
),
|
|
)
|
|
.outerjoin(
|
|
AttributeLabel,
|
|
and_(
|
|
AttributeLabel.attribute_id == Attribute.id,
|
|
AttributeLabel.module_id == module_id,
|
|
),
|
|
)
|
|
.join(
|
|
module_attribute,
|
|
and_(
|
|
module_attribute.c.attribute_id == Attribute.id,
|
|
module_attribute.c.module_id == module_id,
|
|
),
|
|
)
|
|
)
|
|
result = await self.session.execute(stmt)
|
|
return list(result.all())
|
|
|
|
async def _get_deal_attribute_values(
|
|
self, deal_id: int, module_id: int
|
|
) -> list[AttributeValue]:
|
|
stmt = (
|
|
select(AttributeValue)
|
|
.join(Attribute, AttributeValue.attribute_id == Attribute.id)
|
|
.where(
|
|
AttributeValue.deal_id == deal_id,
|
|
AttributeValue.module_id == module_id,
|
|
Attribute.is_deleted.is_(False),
|
|
)
|
|
)
|
|
result = await self.session.execute(stmt)
|
|
return list(result.scalars().all())
|
|
|
|
async def update_or_create_deal_attribute_values(
|
|
self,
|
|
deal_id: int,
|
|
module_id: int,
|
|
attributes: list[UpdateDealModuleAttributeSchema],
|
|
):
|
|
old_deal_attribute_values: list[AttributeValue] = await self._get_deal_attribute_values(deal_id, module_id)
|
|
dict_old_attrs: dict[int, AttributeValue] = {obj.attribute_id: obj for obj in old_deal_attribute_values}
|
|
|
|
for attribute in attributes:
|
|
if attribute.attribute_id in dict_old_attrs:
|
|
# update
|
|
attribute_value = dict_old_attrs[attribute.attribute_id]
|
|
attribute_value.value = attribute.value
|
|
else:
|
|
# create
|
|
attribute_value = AttributeValue(
|
|
attribute_id=attribute.attribute_id,
|
|
deal_id=deal_id,
|
|
module_id=module_id,
|
|
value=attribute.value,
|
|
)
|
|
self.session.add(attribute_value)
|
|
|
|
await self.session.commit()
|