request_parse.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. import httpx
  4. from asgiref.sync import sync_to_async
  5. from fastapi import Request
  6. from user_agents import parse
  7. from XdbSearchIP.xdbSearcher import XdbSearcher
  8. from common.dataclasses import IpInfo, UserAgentInfo
  9. from common.log import log
  10. from core.conf import settings
  11. from core.path_conf import IP2REGION_XDB
  12. from database.db_redis import redis_client
  13. @sync_to_async
  14. def get_request_ip(request: Request) -> str:
  15. """获取请求的 ip 地址"""
  16. real = request.headers.get('X-Real-IP')
  17. if real:
  18. ip = real
  19. else:
  20. forwarded = request.headers.get('X-Forwarded-For')
  21. if forwarded:
  22. ip = forwarded.split(',')[0]
  23. else:
  24. ip = request.client.host
  25. # 忽略 pytest
  26. if ip == 'testclient':
  27. ip = '127.0.0.1'
  28. return ip
  29. async def get_location_online(ip: str, user_agent: str) -> dict | None:
  30. """
  31. 在线获取 ip 地址属地,无法保证可用性,准确率较高
  32. :param ip:
  33. :param user_agent:
  34. :return:
  35. """
  36. async with httpx.AsyncClient(timeout=3) as client:
  37. ip_api_url = f'http://ip-api.com/json/{ip}?lang=zh-CN'
  38. headers = {'User-Agent': user_agent}
  39. try:
  40. response = await client.get(ip_api_url, headers=headers)
  41. if response.status_code == 200:
  42. return response.json()
  43. except Exception as e:
  44. log.error(f'在线获取 ip 地址属地失败,错误信息:{e}')
  45. return None
  46. @sync_to_async
  47. def get_location_offline(ip: str) -> dict | None:
  48. """
  49. 离线获取 ip 地址属地,无法保证准确率,100%可用
  50. :param ip:
  51. :return:
  52. """
  53. try:
  54. cb = XdbSearcher.loadContentFromFile(dbfile=IP2REGION_XDB)
  55. searcher = XdbSearcher(contentBuff=cb)
  56. data = searcher.search(ip)
  57. searcher.close()
  58. data = data.split('|')
  59. return {
  60. 'country': data[0] if data[0] != '0' else None,
  61. 'regionName': data[2] if data[2] != '0' else None,
  62. 'city': data[3] if data[3] != '0' else None,
  63. }
  64. except Exception as e:
  65. log.error(f'离线获取 ip 地址属地失败,错误信息:{e}')
  66. return None
  67. async def parse_ip_info(request: Request) -> IpInfo:
  68. country, region, city = None, None, None
  69. ip = await get_request_ip(request)
  70. location = await redis_client.get(f'{settings.IP_LOCATION_REDIS_PREFIX}:{ip}')
  71. if location:
  72. country, region, city = location.split(' ')
  73. return IpInfo(ip=ip, country=country, region=region, city=city)
  74. if settings.LOCATION_PARSE == 'online':
  75. location_info = await get_location_online(ip, request.headers.get('User-Agent'))
  76. elif settings.LOCATION_PARSE == 'offline':
  77. location_info = await get_location_offline(ip)
  78. else:
  79. location_info = None
  80. if location_info:
  81. country = location_info.get('country')
  82. region = location_info.get('regionName')
  83. city = location_info.get('city')
  84. await redis_client.set(
  85. f'{settings.IP_LOCATION_REDIS_PREFIX}:{ip}',
  86. f'{country} {region} {city}',
  87. ex=settings.IP_LOCATION_EXPIRE_SECONDS,
  88. )
  89. return IpInfo(ip=ip, country=country, region=region, city=city)
  90. @sync_to_async
  91. def parse_user_agent_info(request: Request) -> UserAgentInfo:
  92. user_agent = request.headers.get('User-Agent')
  93. _user_agent = parse(user_agent)
  94. os = _user_agent.get_os()
  95. browser = _user_agent.get_browser()
  96. device = _user_agent.get_device()
  97. return UserAgentInfo(user_agent=user_agent, device=device, os=os, browser=browser)