jwt_call_center_auth_middleware.py 2.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. from typing import Any
  4. from fastapi import Request, Response
  5. from fastapi.security.utils import get_authorization_scheme_param
  6. from starlette.authentication import AuthCredentials, AuthenticationBackend, AuthenticationError
  7. from starlette.requests import HTTPConnection
  8. from app.admin.schema.intent_org import CurrentIntentOrgIns
  9. from common.exception.errors import TokenError
  10. from common.log import log
  11. from common.security.jwt_call_center import jwt_call_center_authentication
  12. from core.conf import settings
  13. from utils.serializers import MsgSpecJSONResponse
  14. class _AuthenticationError(AuthenticationError):
  15. """重写内部认证错误类"""
  16. def __init__(self, *, code: int = None, msg: str = None, headers: dict[str, Any] | None = None):
  17. self.code = code
  18. self.msg = msg
  19. self.headers = headers
  20. class JwtCallCenterAuthMiddleware(AuthenticationBackend):
  21. """JWT 认证中间件"""
  22. @staticmethod
  23. def auth_exception_handler(conn: HTTPConnection, exc: _AuthenticationError) -> Response:
  24. """覆盖内部认证错误处理"""
  25. return MsgSpecJSONResponse(content={'code': exc.code, 'msg': exc.msg, 'data': None}, status_code=exc.code)
  26. async def authenticate(self, request: Request) -> tuple[AuthCredentials, CurrentIntentOrgIns] | None:
  27. token = request.headers.get('Authorization')
  28. if not token:
  29. return
  30. if request.url.path in settings.TOKEN_EXCLUDE:
  31. return
  32. scheme, token = get_authorization_scheme_param(token)
  33. if scheme.lower() != "bearer":
  34. return
  35. try:
  36. org = await jwt_call_center_authentication(token)
  37. except TokenError as exc:
  38. raise _AuthenticationError(code=exc.code, msg=exc.detail, headers=exc.headers)
  39. except Exception as e:
  40. log.error(f'JWT 授权异常:{e}')
  41. raise _AuthenticationError(code=getattr(e, 'code', 500), msg=getattr(e, 'msg', 'Internal Server Error'))
  42. # 请注意,此返回使用非标准模式,所以在认证通过时,将丢失某些标准特性
  43. # 标准返回模式请查看:https://www.starlette.io/authentication/
  44. return AuthCredentials(['authenticated']), org