from sqlalchemy.orm import joinedload from models import Attribute, AttributeLabel, AttributeType from repositories.mixins import * from schemas.attribute import CreateAttributeSchema, UpdateAttributeSchema from utils.exceptions import ForbiddenException class AttributeRepository( RepCrudMixin[Attribute, CreateAttributeSchema, UpdateAttributeSchema] ): session: AsyncSession entity_class = Attribute def _process_get_all_stmt(self, stmt: Select) -> Select: return ( stmt.options(joinedload(Attribute.type)) .where(Attribute.is_deleted.is_(False)) .order_by(Attribute.id) ) def _process_get_by_id_stmt(self, stmt: Select) -> Select: return stmt.options(joinedload(Attribute.type)) async def _get_attribute_type_by_id(self, type_id: int) -> AttributeType: stmt = select(AttributeType).where(AttributeType.id == type_id) result = (await self.session.execute(stmt)).one_or_none() if result is None: raise ObjectNotFoundException("Тип аттрибута не найден") return result[0] async def update(self, attr: Attribute, data: UpdateAttributeSchema) -> Attribute: if data.type: data.type = await self._get_attribute_type_by_id(data.type.id) return await self._apply_update_data_to_model(attr, data, True) async def _before_delete(self, attribute: Attribute) -> None: if attribute.is_built_in: raise ForbiddenException("Нельзя менять встроенный атрибут") async def _get_attribute_module_label( self, module_id: int, attribute_id: int ) -> Optional[AttributeLabel]: stmt = select(AttributeLabel).where( AttributeLabel.attribute_id == attribute_id, AttributeLabel.module_id == module_id, ) result = await self.session.execute(stmt) row = result.one_or_none() return row[0] if row else None async def create_or_update_attribute_label( self, module_id: int, attribute_id: int, label: str ): attribute_label = await self._get_attribute_module_label( module_id, attribute_id ) if attribute_label: attribute_label.label = label else: attribute_label = AttributeLabel( module_id=module_id, attribute_id=attribute_id, label=label, ) self.session.add(attribute_label) await self.session.commit() async def get_attribute_types(self) -> list[AttributeType]: stmt = select(AttributeType).where(AttributeType.is_deleted.is_(False)) result = await self.session.execute(stmt) return list(result.scalars().all())