#!/usr/bin/env python3 # -*- coding: utf-8 -*- from datetime import timedelta from app.admin.schema.intent_org import CurrentIntentOrgIns from fastapi import Depends, Request from fastapi.security import HTTPBearer from fastapi.security.utils import get_authorization_scheme_param from jose import ExpiredSignatureError, JWTError, jwt from pydantic_core import from_json from sqlalchemy.ext.asyncio import AsyncSession from common.log import log from model.intent_org import IntentOrg from common.dataclasses import AccessToken, NewToken, RefreshToken from common.exception.errors import AuthorizationError, TokenError from core.conf import settings from database.db_mysql import async_db_session from database.db_redis import redis_client from utils.serializers import select_as_dict from utils.timezone import timezone # JWT authorizes dependency injection DependsJwtAuth = Depends(HTTPBearer()) async def create_access_token(sub: str) -> AccessToken: """ Generate encryption token :param sub: The subject/userid of the JWT :param multi_login: multipoint login for user :return: """ # expire = timezone.now() + timedelta(seconds=settings.TOKEN_EXPIRE_SECONDS) to_encode = {'sub': sub} access_token = jwt.encode(to_encode, settings.TOKEN_SECRET_KEY, settings.TOKEN_ALGORITHM) return AccessToken(access_token=access_token, access_token_expire_time=None) def get_token(request: Request) -> str: """ Get token for request header :return: """ authorization = request.headers.get('Authorization') scheme, token = get_authorization_scheme_param(authorization) if not authorization or scheme.lower() != 'bearer': raise TokenError(msg='Token 无效') return token def jwt_decode(token: str) -> int: """ Decode token :param token: :return: """ try: payload = jwt.decode(token, settings.TOKEN_SECRET_KEY, algorithms=[settings.TOKEN_ALGORITHM]) user_id = int(payload.get('sub')) if not user_id: raise TokenError(msg='Token 无效') except ExpiredSignatureError: raise TokenError(msg='Token 无效') except (JWTError, Exception): raise TokenError(msg='Token 无效') return user_id async def get_current_organization(db: AsyncSession, pk: str) -> IntentOrg: """ Get the current user through token :param db: :param pk: :return: """ from app.admin.crud.crud_intent_org import intent_org_dao org = await intent_org_dao.get_by_token(db, pk) if not org: raise TokenError(msg='Token 无效') if org.status is not 1: raise AuthorizationError(msg='用户已被锁定,请联系系统管理员') return org async def jwt_call_center_authentication(token: str) -> CurrentIntentOrgIns: """ JWT authentication :param token: :return: """ org_id = jwt_decode(token) key = f'{settings.TOKEN_CALL_REDIS_PREFIX}:{org_id}' token_verify = await redis_client.get(key) if not token_verify: async with async_db_session() as db: current_org = await get_current_organization(db, token) org = CurrentIntentOrgIns(**select_as_dict(current_org)) await redis_client.setex( key, settings.JWT_USER_REDIS_EXPIRE_SECONDS, org.model_dump_json(), ) else: # TODO: 在恰当的时机,应替换为使用 model_validate_json # https://docs.pydantic.dev/latest/concepts/json/#partial-json-parsing org = CurrentIntentOrgIns.model_validate(from_json(token_verify, allow_partial=True)) return org