123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- import httpx
- from asgiref.sync import sync_to_async
- from fastapi import Request
- from user_agents import parse
- from XdbSearchIP.xdbSearcher import XdbSearcher
- from common.dataclasses import IpInfo, UserAgentInfo
- from common.log import log
- from core.conf import settings
- from core.path_conf import IP2REGION_XDB
- from database.db_redis import redis_client
- @sync_to_async
- def get_request_ip(request: Request) -> str:
- """获取请求的 ip 地址"""
- real = request.headers.get('X-Real-IP')
- if real:
- ip = real
- else:
- forwarded = request.headers.get('X-Forwarded-For')
- if forwarded:
- ip = forwarded.split(',')[0]
- else:
- ip = request.client.host
- # 忽略 pytest
- if ip == 'testclient':
- ip = '127.0.0.1'
- return ip
- async def get_location_online(ip: str, user_agent: str) -> dict | None:
- """
- 在线获取 ip 地址属地,无法保证可用性,准确率较高
- :param ip:
- :param user_agent:
- :return:
- """
- async with httpx.AsyncClient(timeout=3) as client:
- ip_api_url = f'http://ip-api.com/json/{ip}?lang=zh-CN'
- headers = {'User-Agent': user_agent}
- try:
- response = await client.get(ip_api_url, headers=headers)
- if response.status_code == 200:
- return response.json()
- except Exception as e:
- log.error(f'在线获取 ip 地址属地失败,错误信息:{e}')
- return None
- @sync_to_async
- def get_location_offline(ip: str) -> dict | None:
- """
- 离线获取 ip 地址属地,无法保证准确率,100%可用
- :param ip:
- :return:
- """
- try:
- cb = XdbSearcher.loadContentFromFile(dbfile=IP2REGION_XDB)
- searcher = XdbSearcher(contentBuff=cb)
- data = searcher.search(ip)
- searcher.close()
- data = data.split('|')
- return {
- 'country': data[0] if data[0] != '0' else None,
- 'regionName': data[2] if data[2] != '0' else None,
- 'city': data[3] if data[3] != '0' else None,
- }
- except Exception as e:
- log.error(f'离线获取 ip 地址属地失败,错误信息:{e}')
- return None
- async def parse_ip_info(request: Request) -> IpInfo:
- country, region, city = None, None, None
- ip = await get_request_ip(request)
- location = await redis_client.get(f'{settings.IP_LOCATION_REDIS_PREFIX}:{ip}')
- if location:
- country, region, city = location.split(' ')
- return IpInfo(ip=ip, country=country, region=region, city=city)
- if settings.LOCATION_PARSE == 'online':
- location_info = await get_location_online(ip, request.headers.get('User-Agent'))
- elif settings.LOCATION_PARSE == 'offline':
- location_info = await get_location_offline(ip)
- else:
- location_info = None
- if location_info:
- country = location_info.get('country')
- region = location_info.get('regionName')
- city = location_info.get('city')
- await redis_client.set(
- f'{settings.IP_LOCATION_REDIS_PREFIX}:{ip}',
- f'{country} {region} {city}',
- ex=settings.IP_LOCATION_EXPIRE_SECONDS,
- )
- return IpInfo(ip=ip, country=country, region=region, city=city)
- @sync_to_async
- def parse_user_agent_info(request: Request) -> UserAgentInfo:
- user_agent = request.headers.get('User-Agent')
- _user_agent = parse(user_agent)
- os = _user_agent.get_os()
- browser = _user_agent.get_browser()
- device = _user_agent.get_device()
- return UserAgentInfo(user_agent=user_agent, device=device, os=os, browser=browser)
|