Files
Crm-Backend/modules/fulfillment_base/repositories/product.py

117 lines
4.1 KiB
Python

from sqlalchemy import or_, delete
from sqlalchemy.orm import selectinload, joinedload
from modules.fulfillment_base.models import Product, ProductBarcode, BarcodeTemplate
from modules.fulfillment_base.models.product import ProductImage
from modules.fulfillment_base.schemas.product import (
CreateProductSchema,
UpdateProductSchema,
)
from repositories.mixins import *
class ProductRepository(
BaseRepository,
RepDeleteMixin[Product],
RepCreateMixin[Product, CreateProductSchema],
RepUpdateMixin[Product, UpdateProductSchema],
RepGetByIdMixin[Product],
):
session: AsyncSession
entity_class = Product
entity_not_found_msg = "Товар не найден"
async def get_all(
self,
page: Optional[int],
items_per_page: Optional[int],
client_id: Optional[int],
search_input: Optional[str],
) -> tuple[list[Product], int]:
stmt = (
select(Product)
.options(selectinload(Product.barcodes))
.where(Product.is_deleted.is_(False))
)
if client_id:
stmt = stmt.where(Product.client_id == client_id)
if search_input:
stmt = stmt.where(
or_(
Product.name.ilike(f"%{search_input}%"),
Product.barcodes.any(
ProductBarcode.barcode.ilike(f"%{search_input}%")
),
Product.article.ilike(f"%{search_input}%"),
Product.factory_article.ilike(f"%{search_input}%"),
)
)
total_items = len((await self.session.execute(stmt)).all())
if page and items_per_page:
stmt = self._apply_pagination(stmt, page, items_per_page)
result = await self.session.execute(stmt)
return list(result.scalars().all()), total_items
def _process_get_by_id_stmt(self, stmt: Select) -> Select:
return stmt.options(
selectinload(Product.barcodes),
joinedload(Product.client),
joinedload(Product.barcode_template).selectinload(
BarcodeTemplate.attributes
),
)
async def _after_create(self, product: Product, data: CreateProductSchema) -> None:
new_barcodes = [
ProductBarcode(product_id=product.id, barcode=barcode)
for barcode in data.barcodes
]
self.session.add_all(new_barcodes)
async def _update_barcodes(self, product: Product, new_barcodes: list[str]):
new_barcodes_set: set[str] = set(new_barcodes)
old_barcodes_set: set[str] = set(obj.barcode for obj in product.barcodes)
barcodes_to_add = new_barcodes_set - old_barcodes_set
barcodes_to_delete = old_barcodes_set - new_barcodes_set
del_stmt = delete(ProductBarcode).where(
ProductBarcode.product_id == product.id,
ProductBarcode.barcode.in_(barcodes_to_delete),
)
await self.session.execute(del_stmt)
new_barcodes = [
ProductBarcode(product_id=product.id, barcode=barcode)
for barcode in barcodes_to_add
]
self.session.add_all(new_barcodes)
await self.session.commit()
await self.session.refresh(product)
async def update(self, product: Product, data: UpdateProductSchema) -> Product:
if data.barcodes is not None:
await self._update_barcodes(product, data.barcodes)
del data.barcodes
return await self._apply_update_data_to_model(product, data, True)
async def delete_images(self, product_images: list[ProductImage], with_commit: bool = False):
for img in product_images:
await self.session.delete(img)
if with_commit:
await self.session.commit()
else:
await self.session.flush()
async def create_image(self, product_id: int, image_url: str) -> ProductImage:
product_image = ProductImage(
product_id=product_id,
image_url=image_url,
)
self.session.add(product_image)
await self.session.commit()
return product_image