jwt_call_center_auth_middleware.py 2.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. from typing import Any
  4. from fastapi import Request, Response
  5. from fastapi.security.utils import get_authorization_scheme_param
  6. from starlette.authentication import AuthCredentials, AuthenticationBackend, AuthenticationError
  7. from starlette.requests import HTTPConnection
  8. from app.admin.schema.org import CurrentIntentOrgIns
  9. from common.exception.errors import TokenError
  10. from common.log import log
  11. from common.security.jwt_call_center import jwt_call_center_authentication
  12. from core.conf import settings
  13. from utils.serializers import MsgSpecJSONResponse
  14. class _AuthenticationError(AuthenticationError):
  15. """重写内部认证错误类"""
  16. def __init__(self, *, code: int = None, msg: str = None, headers: dict[str, Any] | None = None):
  17. self.code = code
  18. self.msg = msg
  19. self.headers = headers
  20. class JwtCallCenterAuthMiddleware(AuthenticationBackend):
  21. """JWT 认证中间件"""
  22. @staticmethod
  23. def auth_exception_handler(conn: HTTPConnection, exc: _AuthenticationError) -> Response:
  24. """覆盖内部认证错误处理"""
  25. return MsgSpecJSONResponse(content={'code': exc.code, 'msg': exc.msg, 'data': None}, status_code=exc.code)
  26. async def authenticate(self, request: Request) -> tuple[AuthCredentials, CurrentIntentOrgIns] | None:
  27. token = request.headers.get('Authorization')
  28. if not token:
  29. return
  30. if request.url.path in settings.TOKEN_EXCLUDE:
  31. return
  32. scheme, token = get_authorization_scheme_param(token)
  33. if scheme.lower() != "bearer":
  34. return
  35. try:
  36. org = await jwt_call_center_authentication(token)
  37. except TokenError as exc:
  38. raise _AuthenticationError(code=exc.code, msg=exc.detail, headers=exc.headers)
  39. except Exception as e:
  40. log.error(f'JWT 授权异常:{e}')
  41. raise _AuthenticationError(code=getattr(e, 'code', 500), msg=getattr(e, 'msg', 'Internal Server Error'))
  42. # 请注意,此返回使用非标准模式,所以在认证通过时,将丢失某些标准特性
  43. # 标准返回模式请查看:https://www.starlette.io/authentication/
  44. return AuthCredentials(['authenticated']), org