from sqlalchemy import or_, delete from sqlalchemy.orm import selectinload, joinedload from modules.fulfillment_base.models import Product, ProductBarcode, BarcodeTemplate from modules.fulfillment_base.schemas.product import ( CreateProductSchema, UpdateProductSchema, ) from repositories.mixins import * class ProductRepository( BaseRepository, RepDeleteMixin[Product], RepCreateMixin[Product, CreateProductSchema], RepUpdateMixin[Product, UpdateProductSchema], RepGetByIdMixin[Product], ): entity_class = Product entity_not_found_msg = "Товар не найден" async def get_all( self, page: Optional[int], items_per_page: Optional[int], client_id: Optional[int], search_input: Optional[str], ) -> tuple[list[Product], int]: stmt = ( select(Product) .options(selectinload(Product.barcodes)) .where(Product.is_deleted.is_(False)) ) if client_id: stmt = stmt.where(Product.client_id == client_id) if search_input: stmt = stmt.where( or_( Product.name.ilike(f"%{search_input}%"), Product.barcodes.any( ProductBarcode.barcode.ilike(f"%{search_input}%") ), Product.article.ilike(f"%{search_input}%"), Product.factory_article.ilike(f"%{search_input}%"), ) ) total_items = len((await self.session.execute(stmt)).all()) if page and items_per_page: stmt = self._apply_pagination(stmt, page, items_per_page) result = await self.session.execute(stmt) return list(result.scalars().all()), total_items def _process_get_by_id_stmt(self, stmt: Select) -> Select: return stmt.options( selectinload(Product.barcodes), joinedload(Product.client), joinedload(Product.barcode_template).selectinload( BarcodeTemplate.attributes ), ) async def _after_create(self, product: Product, data: CreateProductSchema) -> None: new_barcodes = [ ProductBarcode(product_id=product.id, barcode=barcode) for barcode in data.barcodes ] self.session.add_all(new_barcodes) async def _update_barcodes(self, product: Product, new_barcodes: list[str]): new_barcodes_set: set[str] = set(new_barcodes) old_barcodes_set: set[str] = set(obj.barcode for obj in product.barcodes) barcodes_to_add = new_barcodes_set - old_barcodes_set barcodes_to_delete = old_barcodes_set - new_barcodes_set del_stmt = delete(ProductBarcode).where( ProductBarcode.product_id == product.id, ProductBarcode.barcode.in_(barcodes_to_delete), ) await self.session.execute(del_stmt) new_barcodes = [ ProductBarcode(product_id=product.id, barcode=barcode) for barcode in barcodes_to_add ] self.session.add_all(new_barcodes) await self.session.commit() await self.session.refresh(product) async def update(self, product: Product, data: UpdateProductSchema) -> Product: if data.barcodes is not None: await self._update_barcodes(product, data.barcodes) del data.barcodes return await self._apply_update_data_to_model(product, data, True)