Files
CRM-OLD-API/app/api/v1/auth.py
2025-07-24 20:13:47 +03:00

76 lines
2.1 KiB
Python

from datetime import datetime, timedelta, timezone
from time import time
import jwt
from fastapi import APIRouter
from app import mongo
from app.config import config
from app.utils.response_util import response
import hashlib
import hmac
router = APIRouter()
def _string_generator(data_incoming):
data = data_incoming.copy()
del data["hash"]
keys = sorted(data.keys())
string_arr = []
for key in keys:
if data[key] is not None:
string_arr.append(key + "=" + str(data[key]))
string_cat = "\n".join(string_arr)
return string_cat
def _data_check(BOT_TOKEN, tg_data):
data_check_string = _string_generator(tg_data)
secret_key = hashlib.sha256(BOT_TOKEN.encode("utf-8")).digest()
secret_key_bytes = secret_key
data_check_string_bytes = bytes(data_check_string, "utf-8")
hmac_string = hmac.new(secret_key_bytes, data_check_string_bytes, hashlib.sha256).hexdigest()
if hmac_string == tg_data["hash"]:
return True
else:
return False
def authorize(telegram_data: dict):
return _data_check(config['BOT_TOKEN'], telegram_data)
def create_access_token(data: dict) -> str:
to_encode = data.copy()
expire = datetime.now(timezone.utc) + timedelta(minutes=int(config["JWT_ACCESS_TOKEN_EXPIRE_MINUTES"]))
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, config["JWT_SECRET_KEY"], algorithm=config["JWT_ALGORITHM"])
return encoded_jwt
@router.post("/login", tags=[""])
async def login(data: dict):
start_time = time()
if not authorize(data):
return response({
"detail": "Ошибка авторизации"
}, start_time=start_time, code=401)
user = await mongo.users_collection.find_one({
"telegramId": data["id"]
})
if not user:
return response({
"detail": "Пользователь не найден"
}, start_time=start_time, code=401)
access_token = create_access_token({
"sub": str(user["id"]),
"role": user["role"]["key"],
})
return response({
"accessToken": access_token
}, start_time=start_time)