123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- from datetime import timedelta
- from app.admin.schema.intent_org import CurrentIntentOrgIns
- from fastapi import Depends, Request
- from fastapi.security import HTTPBearer
- from fastapi.security.utils import get_authorization_scheme_param
- from jose import ExpiredSignatureError, JWTError, jwt
- from pydantic_core import from_json
- from sqlalchemy.ext.asyncio import AsyncSession
- from common.log import log
- from model.intent_org import IntentOrg
- from common.dataclasses import AccessToken, NewToken, RefreshToken
- from common.exception.errors import AuthorizationError, TokenError
- from core.conf import settings
- from database.db_mysql import async_db_session
- from database.db_redis import redis_client
- from utils.serializers import select_as_dict
- from utils.timezone import timezone
- # JWT authorizes dependency injection
- DependsJwtAuth = Depends(HTTPBearer())
- async def create_access_token(sub: str) -> AccessToken:
- """
- Generate encryption token
- :param sub: The subject/userid of the JWT
- :param multi_login: multipoint login for user
- :return:
- """
- # expire = timezone.now() + timedelta(seconds=settings.TOKEN_EXPIRE_SECONDS)
- to_encode = {'sub': sub}
- access_token = jwt.encode(to_encode, settings.TOKEN_SECRET_KEY, settings.TOKEN_ALGORITHM)
- return AccessToken(access_token=access_token, access_token_expire_time=None)
- def get_token(request: Request) -> str:
- """
- Get token for request header
- :return:
- """
- authorization = request.headers.get('Authorization')
- scheme, token = get_authorization_scheme_param(authorization)
- if not authorization or scheme.lower() != 'bearer':
- raise TokenError(msg='Token 无效')
- return token
- def jwt_decode(token: str) -> int:
- """
- Decode token
- :param token:
- :return:
- """
- try:
- payload = jwt.decode(token, settings.TOKEN_SECRET_KEY, algorithms=[settings.TOKEN_ALGORITHM])
- user_id = int(payload.get('sub'))
- if not user_id:
- raise TokenError(msg='Token 无效')
- except ExpiredSignatureError:
- raise TokenError(msg='Token 无效')
- except (JWTError, Exception):
- raise TokenError(msg='Token 无效')
- return user_id
- async def get_current_organization(db: AsyncSession, pk: str) -> IntentOrg:
- """
- Get the current user through token
- :param db:
- :param pk:
- :return:
- """
- from app.admin.crud.crud_intent_org import intent_org_dao
- org = await intent_org_dao.get_by_token(db, pk)
- if not org:
- raise TokenError(msg='Token 无效')
- if org.status is not 1:
- raise AuthorizationError(msg='用户已被锁定,请联系系统管理员')
- return org
- async def jwt_call_center_authentication(token: str) -> CurrentIntentOrgIns:
- """
- JWT authentication
- :param token:
- :return:
- """
- org_id = jwt_decode(token)
- key = f'{settings.TOKEN_CALL_REDIS_PREFIX}:{org_id}'
- token_verify = await redis_client.get(key)
- if not token_verify:
- async with async_db_session() as db:
- current_org = await get_current_organization(db, token)
- org = CurrentIntentOrgIns(**select_as_dict(current_org))
- await redis_client.setex(
- key,
- settings.JWT_USER_REDIS_EXPIRE_SECONDS,
- org.model_dump_json(),
- )
- else:
- # TODO: 在恰当的时机,应替换为使用 model_validate_json
- # https://docs.pydantic.dev/latest/concepts/json/#partial-json-parsing
- org = CurrentIntentOrgIns.model_validate(from_json(token_verify, allow_partial=True))
- return org
|