Files
Crm-Backend/external/s3_uploader/uploader.py

82 lines
2.4 KiB
Python

import uuid
from typing import Optional
from aioboto3 import Session
from fastapi import UploadFile
from backend import config
from logger import logger_builder
class S3Uploader:
session: Session
def __init__(self):
self.session = Session()
def _get_client(self) -> int:
return self.session.client(
"s3",
endpoint_url=config.S3_URL,
aws_access_key_id=config.S3_ACCESS_KEY,
aws_secret_access_key=config.S3_SECRET_ACCESS_KEY,
region_name=config.S3_REGION,
)
@staticmethod
async def _generate_s3_key() -> str:
return uuid.uuid4().hex
@staticmethod
def get_file_path_from_name(filename: str) -> str:
return f"{config.S3_URL}/{config.S3_BUCKET}/{filename}"
async def upload_from_bytes(
self,
image_bytes: bytes,
content_type: str,
extension: str,
unique_s3_key: Optional[str] = None,
) -> str:
logger = logger_builder.get_logger()
if unique_s3_key is None:
unique_s3_key = await self._generate_s3_key()
filename = unique_s3_key + "." + extension
file_url = self.get_file_path_from_name(filename)
try:
async with self._get_client() as s3_client:
await s3_client.put_object(
Bucket=config.S3_BUCKET,
Key=filename,
Body=image_bytes,
ContentType=content_type,
)
logger.info(f"Successfully uploaded {filename} to S3")
return file_url
except Exception as e:
logger.error(f"Error uploading image bytes: {e}")
raise
async def upload_from_upload_file_obj(self, upload_file: UploadFile) -> str:
file_bytes = upload_file.file.read()
extension = upload_file.filename.split(".")[-1]
file_url = await self.upload_from_bytes(
file_bytes, upload_file.content_type, extension
)
return file_url
async def delete_image(self, s3_key: str):
logger = logger_builder.get_logger()
try:
async with self._get_client() as s3_client:
await s3_client.delete_object(Bucket=config.S3_BUCKET, Key=s3_key)
logger.info(f"Successfully deleted {s3_key} from S3")
except Exception as e:
logger.error(f"Error deleting image from S3: {e}")