92 lines
3.2 KiB
Python
92 lines
3.2 KiB
Python
from sqlalchemy import or_, delete
|
|
from sqlalchemy.orm import selectinload
|
|
|
|
from modules.fulfillment_base.models import Product, ProductBarcode
|
|
from modules.fulfillment_base.schemas.product import (
|
|
CreateProductSchema,
|
|
UpdateProductSchema,
|
|
)
|
|
from repositories.mixins import *
|
|
|
|
|
|
class ProductRepository(
|
|
BaseRepository,
|
|
RepDeleteMixin[Product],
|
|
RepCreateMixin[Product, CreateProductSchema],
|
|
RepUpdateMixin[Product, UpdateProductSchema],
|
|
RepGetByIdMixin[Product],
|
|
):
|
|
entity_class = Product
|
|
entity_not_found_msg = "Товар не найден"
|
|
|
|
async def get_all(
|
|
self,
|
|
page: Optional[int],
|
|
items_per_page: Optional[int],
|
|
client_id: Optional[int],
|
|
search_input: Optional[str],
|
|
) -> tuple[list[Product], int]:
|
|
stmt = (
|
|
select(Product)
|
|
.options(selectinload(Product.barcodes))
|
|
.where(Product.is_deleted.is_(False))
|
|
)
|
|
|
|
if client_id:
|
|
stmt = stmt.where(Product.client_id == client_id)
|
|
if search_input:
|
|
stmt = stmt.where(
|
|
or_(
|
|
Product.name.ilike(f"%{search_input}%"),
|
|
Product.barcodes.any(
|
|
ProductBarcode.barcode.ilike(f"%{search_input}%")
|
|
),
|
|
Product.article.ilike(f"%{search_input}%"),
|
|
Product.factory_article.ilike(f"%{search_input}%"),
|
|
)
|
|
)
|
|
|
|
total_items = len((await self.session.execute(stmt)).all())
|
|
|
|
if page and items_per_page:
|
|
stmt = self._apply_pagination(stmt, page, items_per_page)
|
|
|
|
result = await self.session.execute(stmt)
|
|
return list(result.scalars().all()), total_items
|
|
|
|
def _process_get_by_id_stmt(self, stmt: Select) -> Select:
|
|
return stmt.options(selectinload(Product.barcodes))
|
|
|
|
async def _after_create(self, product: Product, data: CreateProductSchema) -> None:
|
|
new_barcodes = [
|
|
ProductBarcode(product_id=product.id, barcode=barcode)
|
|
for barcode in data.barcodes
|
|
]
|
|
self.session.add_all(new_barcodes)
|
|
|
|
async def _update_barcodes(self, product: Product, new_barcodes: list[str]):
|
|
new_barcodes_set: set[str] = set(new_barcodes)
|
|
old_barcodes_set: set[str] = set(obj.barcode for obj in product.barcodes)
|
|
barcodes_to_add = new_barcodes_set - old_barcodes_set
|
|
barcodes_to_delete = old_barcodes_set - new_barcodes_set
|
|
|
|
del_stmt = delete(ProductBarcode).where(
|
|
ProductBarcode.product_id == product.id,
|
|
ProductBarcode.barcode.in_(barcodes_to_delete),
|
|
)
|
|
await self.session.execute(del_stmt)
|
|
|
|
new_barcodes = [
|
|
ProductBarcode(product_id=product.id, barcode=barcode)
|
|
for barcode in barcodes_to_add
|
|
]
|
|
self.session.add_all(new_barcodes)
|
|
await self.session.commit()
|
|
await self.session.refresh(product)
|
|
|
|
async def update(self, product: Product, data: UpdateProductSchema) -> Product:
|
|
if data.barcodes is not None:
|
|
await self._update_barcodes(product, data.barcodes)
|
|
del data.barcodes
|
|
return await self._apply_update_data_to_model(product, data, True)
|