health_check.py 957 B

123456789101112131415161718192021222324252627282930313233343536
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. from math import ceil
  4. from fastapi import FastAPI, Request, Response
  5. from fastapi.routing import APIRoute
  6. from common.exception import errors
  7. def ensure_unique_route_names(app: FastAPI) -> None:
  8. """
  9. 检查路由名称是否唯一
  10. :param app:
  11. :return:
  12. """
  13. temp_routes = set()
  14. for route in app.routes:
  15. if isinstance(route, APIRoute):
  16. if route.name in temp_routes:
  17. raise ValueError(f'Non-unique route name: {route.name}')
  18. temp_routes.add(route.name)
  19. async def http_limit_callback(request: Request, response: Response, expire: int):
  20. """
  21. 请求限制时的默认回调函数
  22. :param request:
  23. :param response:
  24. :param expire: 剩余毫秒
  25. :return:
  26. """
  27. expires = ceil(expire / 1000)
  28. raise errors.HTTPError(code=429, msg='请求过于频繁,请稍后重试', headers={'Retry-After': str(expires)})