from datetime import datetime, timedelta, timezone from time import time import jwt from fastapi import APIRouter from app import mongo from app.config import config from app.utils.response_util import response import hashlib import hmac router = APIRouter() def _string_generator(data_incoming): data = data_incoming.copy() del data["hash"] keys = sorted(data.keys()) string_arr = [] for key in keys: if data[key] is not None: string_arr.append(key + "=" + str(data[key])) string_cat = "\n".join(string_arr) return string_cat def _data_check(BOT_TOKEN, tg_data): data_check_string = _string_generator(tg_data) secret_key = hashlib.sha256(BOT_TOKEN.encode("utf-8")).digest() secret_key_bytes = secret_key data_check_string_bytes = bytes(data_check_string, "utf-8") hmac_string = hmac.new(secret_key_bytes, data_check_string_bytes, hashlib.sha256).hexdigest() if hmac_string == tg_data["hash"]: return True else: return False def authorize(telegram_data: dict): return _data_check(config['BOT_TOKEN'], telegram_data) def create_access_token(data: dict) -> str: to_encode = data.copy() expire = datetime.now(timezone.utc) + timedelta(minutes=int(config["JWT_ACCESS_TOKEN_EXPIRE_MINUTES"])) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, config["JWT_SECRET_KEY"], algorithm=config["JWT_ALGORITHM"]) return encoded_jwt @router.post("/login", tags=[""]) async def login(data: dict): start_time = time() if not authorize(data): return response({ "detail": "Ошибка авторизации" }, start_time=start_time, code=401) user = await mongo.users_collection.find_one({ "telegramId": data["id"] }) if not user: return response({ "detail": "Пользователь не найден" }, start_time=start_time, code=401) access_token = create_access_token({ "sub": str(user["id"]), "role": user["role"]["key"], }) return response({ "accessToken": access_token }, start_time=start_time)