76 lines
2.1 KiB
Python
76 lines
2.1 KiB
Python
from datetime import datetime, timedelta, timezone
|
|
from time import time
|
|
|
|
import jwt
|
|
from fastapi import APIRouter
|
|
|
|
from app import mongo
|
|
from app.config import config
|
|
from app.utils.response_util import response
|
|
import hashlib
|
|
import hmac
|
|
|
|
router = APIRouter()
|
|
|
|
def _string_generator(data_incoming):
|
|
data = data_incoming.copy()
|
|
del data["hash"]
|
|
keys = sorted(data.keys())
|
|
string_arr = []
|
|
for key in keys:
|
|
if data[key] is not None:
|
|
string_arr.append(key + "=" + str(data[key]))
|
|
string_cat = "\n".join(string_arr)
|
|
return string_cat
|
|
|
|
|
|
def _data_check(BOT_TOKEN, tg_data):
|
|
data_check_string = _string_generator(tg_data)
|
|
secret_key = hashlib.sha256(BOT_TOKEN.encode("utf-8")).digest()
|
|
secret_key_bytes = secret_key
|
|
data_check_string_bytes = bytes(data_check_string, "utf-8")
|
|
hmac_string = hmac.new(secret_key_bytes, data_check_string_bytes, hashlib.sha256).hexdigest()
|
|
if hmac_string == tg_data["hash"]:
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
|
|
def authorize(telegram_data: dict):
|
|
return _data_check(config['BOT_TOKEN'], telegram_data)
|
|
|
|
def create_access_token(data: dict) -> str:
|
|
to_encode = data.copy()
|
|
expire = datetime.now(timezone.utc) + timedelta(minutes=int(config["JWT_ACCESS_TOKEN_EXPIRE_MINUTES"]))
|
|
to_encode.update({"exp": expire})
|
|
encoded_jwt = jwt.encode(to_encode, config["JWT_SECRET_KEY"], algorithm=config["JWT_ALGORITHM"])
|
|
return encoded_jwt
|
|
|
|
|
|
@router.post("/login", tags=[""])
|
|
async def login(data: dict):
|
|
start_time = time()
|
|
if not authorize(data):
|
|
return response({
|
|
"detail": "Ошибка авторизации"
|
|
}, start_time=start_time, code=401)
|
|
|
|
|
|
user = await mongo.users_collection.find_one({
|
|
"telegramId": data["id"]
|
|
})
|
|
|
|
if not user:
|
|
return response({
|
|
"detail": "Пользователь не найден"
|
|
}, start_time=start_time, code=401)
|
|
|
|
access_token = create_access_token({
|
|
"sub": str(user["id"]),
|
|
"role": user["role"]["key"],
|
|
})
|
|
|
|
return response({
|
|
"accessToken": access_token
|
|
}, start_time=start_time)
|