jwt_call_center.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. from datetime import timedelta
  4. from app.admin.schema.intent_org import CurrentIntentOrgIns
  5. from fastapi import Depends, Request
  6. from fastapi.security import HTTPBearer
  7. from fastapi.security.utils import get_authorization_scheme_param
  8. from jose import ExpiredSignatureError, JWTError, jwt
  9. from pydantic_core import from_json
  10. from sqlalchemy.ext.asyncio import AsyncSession
  11. from common.log import log
  12. from model.intent_org import IntentOrg
  13. from common.dataclasses import AccessToken, NewToken, RefreshToken
  14. from common.exception.errors import AuthorizationError, TokenError
  15. from core.conf import settings
  16. from database.db_mysql import async_db_session
  17. from database.db_redis import redis_client
  18. from utils.serializers import select_as_dict
  19. from utils.timezone import timezone
  20. # JWT authorizes dependency injection
  21. DependsJwtAuth = Depends(HTTPBearer())
  22. async def create_access_token(sub: str) -> AccessToken:
  23. """
  24. Generate encryption token
  25. :param sub: The subject/userid of the JWT
  26. :param multi_login: multipoint login for user
  27. :return:
  28. """
  29. # expire = timezone.now() + timedelta(seconds=settings.TOKEN_EXPIRE_SECONDS)
  30. to_encode = {'sub': sub}
  31. access_token = jwt.encode(to_encode, settings.TOKEN_SECRET_KEY, settings.TOKEN_ALGORITHM)
  32. return AccessToken(access_token=access_token, access_token_expire_time=None)
  33. def get_token(request: Request) -> str:
  34. """
  35. Get token for request header
  36. :return:
  37. """
  38. authorization = request.headers.get('Authorization')
  39. scheme, token = get_authorization_scheme_param(authorization)
  40. if not authorization or scheme.lower() != 'bearer':
  41. raise TokenError(msg='Token 无效')
  42. return token
  43. def jwt_decode(token: str) -> int:
  44. """
  45. Decode token
  46. :param token:
  47. :return:
  48. """
  49. try:
  50. payload = jwt.decode(token, settings.TOKEN_SECRET_KEY, algorithms=[settings.TOKEN_ALGORITHM])
  51. user_id = int(payload.get('sub'))
  52. if not user_id:
  53. raise TokenError(msg='Token 无效')
  54. except ExpiredSignatureError:
  55. raise TokenError(msg='Token 无效')
  56. except (JWTError, Exception):
  57. raise TokenError(msg='Token 无效')
  58. return user_id
  59. async def get_current_organization(db: AsyncSession, pk: str) -> IntentOrg:
  60. """
  61. Get the current user through token
  62. :param db:
  63. :param pk:
  64. :return:
  65. """
  66. from app.admin.crud.crud_intent_org import intent_org_dao
  67. org = await intent_org_dao.get_by_token(db, pk)
  68. if not org:
  69. raise TokenError(msg='Token 无效')
  70. if org.status is not 1:
  71. raise AuthorizationError(msg='用户已被锁定,请联系系统管理员')
  72. return org
  73. async def jwt_call_center_authentication(token: str) -> CurrentIntentOrgIns:
  74. """
  75. JWT authentication
  76. :param token:
  77. :return:
  78. """
  79. org_id = jwt_decode(token)
  80. key = f'{settings.TOKEN_CALL_REDIS_PREFIX}:{org_id}'
  81. token_verify = await redis_client.get(key)
  82. if not token_verify:
  83. async with async_db_session() as db:
  84. current_org = await get_current_organization(db, token)
  85. org = CurrentIntentOrgIns(**select_as_dict(current_org))
  86. await redis_client.setex(
  87. key,
  88. settings.JWT_USER_REDIS_EXPIRE_SECONDS,
  89. org.model_dump_json(),
  90. )
  91. else:
  92. # TODO: 在恰当的时机,应替换为使用 model_validate_json
  93. # https://docs.pydantic.dev/latest/concepts/json/#partial-json-parsing
  94. org = CurrentIntentOrgIns.model_validate(from_json(token_verify, allow_partial=True))
  95. return org