| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195 |
- from __future__ import annotations
- from dataclasses import dataclass
- from typing import Optional
- from app.config import settings
- @dataclass
- class GeoInfo:
- country: str
- city: str
- latitude: Optional[float]
- longitude: Optional[float]
- org: Optional[str] = None
- _UNKNOWN = GeoInfo("Unknown", "Unknown", None, None, None)
- # 主要城市/省会坐标(省名/市名 → (lat, lon))
- _CITY_COORDS: dict[str, tuple[float, float]] = {
- # 直辖市
- "北京": (39.9042, 116.4074),
- "上海": (31.2304, 121.4737),
- "天津": (39.0842, 117.2010),
- "重庆": (29.5630, 106.5516),
- # 省会
- "哈尔滨": (45.8038, 126.5349),
- "长春": (43.8171, 125.3235),
- "沈阳": (41.8057, 123.4315),
- "呼和浩特":(40.8426, 111.7496),
- "石家庄": (38.0428, 114.5149),
- "太原": (37.8706, 112.5489),
- "济南": (36.6512, 117.1201),
- "郑州": (34.7466, 113.6253),
- "西安": (34.3416, 108.9398),
- "兰州": (36.0611, 103.8343),
- "西宁": (36.6171, 101.7782),
- "银川": (38.4872, 106.2309),
- "乌鲁木齐":(43.8256, 87.6168),
- "拉萨": (29.6520, 91.1721),
- "成都": (30.5728, 104.0668),
- "贵阳": (26.6470, 106.6302),
- "昆明": (25.0453, 102.7097),
- "南宁": (22.8170, 108.3665),
- "海口": (20.0440, 110.1999),
- "武汉": (30.5928, 114.3055),
- "长沙": (28.2282, 112.9388),
- "南昌": (28.6820, 115.8579),
- "合肥": (31.8206, 117.2272),
- "南京": (32.0603, 118.7969),
- "杭州": (30.2741, 120.1551),
- "福州": (26.0745, 119.2965),
- "广州": (23.1291, 113.2644),
- "深圳": (22.5431, 114.0579),
- # 常见大城市
- "苏州": (31.2989, 120.5853),
- "宁波": (29.8683, 121.5440),
- "青岛": (36.0671, 120.3826),
- "大连": (38.9140, 121.6147),
- "厦门": (24.4798, 118.0894),
- "东莞": (23.0207, 113.7518),
- "佛山": (23.0219, 113.1219),
- "温州": (28.0000, 120.6720),
- "无锡": (31.4912, 120.3119),
- "珠海": (22.2710, 113.5767),
- }
- # 省名 → 省会
- _PROVINCE_CAPITAL: dict[str, str] = {
- "黑龙江": "哈尔滨", "吉林": "长春", "辽宁": "沈阳",
- "内蒙古": "呼和浩特", "河北": "石家庄", "山西": "太原",
- "山东": "济南", "河南": "郑州", "陕西": "西安",
- "甘肃": "兰州", "青海": "西宁", "宁夏": "银川",
- "新疆": "乌鲁木齐", "西藏": "拉萨", "四川": "成都",
- "贵州": "贵阳", "云南": "昆明", "广西": "南宁",
- "海南": "海口", "湖北": "武汉", "湖南": "长沙",
- "江西": "南昌", "安徽": "合肥", "江苏": "南京",
- "浙江": "杭州", "福建": "福州", "广东": "广州",
- "北京": "北京", "上海": "上海", "天津": "天津", "重庆": "重庆",
- }
- # 中文城市名 → 英文(与 GeoLite2 保持一致)
- _CITY_EN: dict[str, str] = {
- "北京": "Beijing", "上海": "Shanghai", "天津": "Tianjin", "重庆": "Chongqing",
- "哈尔滨": "Harbin", "长春": "Changchun", "沈阳": "Shenyang", "呼和浩特": "Hohhot",
- "石家庄": "Shijiazhuang", "太原": "Taiyuan", "济南": "Jinan", "郑州": "Zhengzhou",
- "西安": "Xi'an", "兰州": "Lanzhou", "西宁": "Xining", "银川": "Yinchuan",
- "乌鲁木齐": "Urumqi", "拉萨": "Lhasa", "成都": "Chengdu", "贵阳": "Guiyang",
- "昆明": "Kunming", "南宁": "Nanning", "海口": "Haikou", "武汉": "Wuhan",
- "长沙": "Changsha", "南昌": "Nanchang", "合肥": "Hefei", "南京": "Nanjing",
- "杭州": "Hangzhou", "福州": "Fuzhou", "广州": "Guangzhou", "深圳": "Shenzhen",
- "苏州": "Suzhou", "宁波": "Ningbo", "青岛": "Qingdao", "大连": "Dalian",
- "厦门": "Xiamen", "东莞": "Dongguan", "佛山": "Foshan", "温州": "Wenzhou",
- "无锡": "Wuxi", "珠海": "Zhuhai",
- }
- def _lookup_coords(province: str, city: str) -> tuple[float, float] | None:
- city_clean = city.replace("市", "").replace("区", "").replace("省", "")
- province_clean = province.replace("省", "").replace("市", "").replace("自治区", "").replace("壮族", "").replace("回族", "").replace("维吾尔", "")
- if city_clean in _CITY_COORDS:
- return _CITY_COORDS[city_clean]
- if city in _CITY_COORDS:
- return _CITY_COORDS[city]
- capital = _PROVINCE_CAPITAL.get(province_clean)
- if capital and capital in _CITY_COORDS:
- return _CITY_COORDS[capital]
- return None
- class GeoResolver:
- def __init__(self, db_path: str, asn_db_path: str) -> None:
- self._db_path = db_path
- self._asn_db_path = asn_db_path
- self._geoip_reader = None
- self._asn_reader = None
- self._iputil = None
- def _get_iputil(self):
- if self._iputil is None:
- try:
- import iputil
- self._iputil = iputil
- except Exception:
- self._iputil = False
- return self._iputil if self._iputil is not False else None
- def _get_geoip_reader(self):
- if self._geoip_reader is None:
- import geoip2.database
- self._geoip_reader = geoip2.database.Reader(self._db_path)
- return self._geoip_reader
- def _get_asn_reader(self):
- if self._asn_reader is None:
- import geoip2.database
- self._asn_reader = geoip2.database.Reader(self._asn_db_path)
- return self._asn_reader
- def _resolve_cn(self, ip: str) -> GeoInfo | None:
- """用 iputil(ip2region) 解析中国 IP,返回 None 表示非中国或失败。"""
- lib = self._get_iputil()
- if not lib:
- return None
- try:
- # 返回格式: 国家|区域|省|市|ISP 例: 中国|0|四川|成都|电信
- result = lib.get_region(ip)
- parts = result.split("|")
- if len(parts) < 5:
- return None
- country_raw, _, province, city, isp = parts[0], parts[1], parts[2], parts[3], parts[4]
- if country_raw not in ("中国", "中国大陆"):
- return None
- coords = _lookup_coords(province, city)
- lat, lon = (coords[0], coords[1]) if coords else (None, None)
- city_clean = city.replace("市", "").replace("区", "") if city and city != "0" else province.replace("省", "").replace("自治区", "").replace("壮族", "").replace("回族", "").replace("维吾尔", "")
- city_display = _CITY_EN.get(city_clean, city_clean)
- return GeoInfo("China", city_display, lat, lon, isp if isp != "0" else None)
- except Exception:
- return None
- def _resolve_geoip(self, ip: str) -> GeoInfo:
- """降级用 GeoLite2 解析。"""
- try:
- resp = self._get_geoip_reader().city(ip)
- country = resp.country.name or "Unknown"
- city = resp.city.name or "Unknown"
- lat = resp.location.latitude
- lon = resp.location.longitude
- except Exception:
- country, city, lat, lon = "Unknown", "Unknown", None, None
- org = None
- try:
- asn_resp = self._get_asn_reader().asn(ip)
- org = asn_resp.autonomous_system_organization or None
- except Exception:
- pass
- return GeoInfo(country, city, lat, lon, org)
- def resolve(self, ip: str) -> GeoInfo:
- if ip in ("127.0.0.1", "::1", "localhost") or \
- ip.startswith("192.168.") or ip.startswith("10.") or ip.startswith("172."):
- return GeoInfo("Local", "Loopback", None, None, None)
- cn_info = self._resolve_cn(ip)
- if cn_info:
- return cn_info
- return self._resolve_geoip(ip)
- geo_resolver = GeoResolver(settings.geoip_db_path, settings.geoip_asn_db_path)
|