from __future__ import annotations from dataclasses import dataclass from typing import Optional from app.config import settings @dataclass class GeoInfo: country: str city: str latitude: Optional[float] longitude: Optional[float] org: Optional[str] = None _UNKNOWN = GeoInfo("Unknown", "Unknown", None, None, None) # 主要城市/省会坐标(省名/市名 → (lat, lon)) _CITY_COORDS: dict[str, tuple[float, float]] = { # 直辖市 "北京": (39.9042, 116.4074), "上海": (31.2304, 121.4737), "天津": (39.0842, 117.2010), "重庆": (29.5630, 106.5516), # 省会 "哈尔滨": (45.8038, 126.5349), "长春": (43.8171, 125.3235), "沈阳": (41.8057, 123.4315), "呼和浩特":(40.8426, 111.7496), "石家庄": (38.0428, 114.5149), "太原": (37.8706, 112.5489), "济南": (36.6512, 117.1201), "郑州": (34.7466, 113.6253), "西安": (34.3416, 108.9398), "兰州": (36.0611, 103.8343), "西宁": (36.6171, 101.7782), "银川": (38.4872, 106.2309), "乌鲁木齐":(43.8256, 87.6168), "拉萨": (29.6520, 91.1721), "成都": (30.5728, 104.0668), "贵阳": (26.6470, 106.6302), "昆明": (25.0453, 102.7097), "南宁": (22.8170, 108.3665), "海口": (20.0440, 110.1999), "武汉": (30.5928, 114.3055), "长沙": (28.2282, 112.9388), "南昌": (28.6820, 115.8579), "合肥": (31.8206, 117.2272), "南京": (32.0603, 118.7969), "杭州": (30.2741, 120.1551), "福州": (26.0745, 119.2965), "广州": (23.1291, 113.2644), "深圳": (22.5431, 114.0579), # 常见大城市 "苏州": (31.2989, 120.5853), "宁波": (29.8683, 121.5440), "青岛": (36.0671, 120.3826), "大连": (38.9140, 121.6147), "厦门": (24.4798, 118.0894), "东莞": (23.0207, 113.7518), "佛山": (23.0219, 113.1219), "温州": (28.0000, 120.6720), "无锡": (31.4912, 120.3119), "珠海": (22.2710, 113.5767), } # 省名 → 省会 _PROVINCE_CAPITAL: dict[str, str] = { "黑龙江": "哈尔滨", "吉林": "长春", "辽宁": "沈阳", "内蒙古": "呼和浩特", "河北": "石家庄", "山西": "太原", "山东": "济南", "河南": "郑州", "陕西": "西安", "甘肃": "兰州", "青海": "西宁", "宁夏": "银川", "新疆": "乌鲁木齐", "西藏": "拉萨", "四川": "成都", "贵州": "贵阳", "云南": "昆明", "广西": "南宁", "海南": "海口", "湖北": "武汉", "湖南": "长沙", "江西": "南昌", "安徽": "合肥", "江苏": "南京", "浙江": "杭州", "福建": "福州", "广东": "广州", "北京": "北京", "上海": "上海", "天津": "天津", "重庆": "重庆", } # 中文城市名 → 英文(与 GeoLite2 保持一致) _CITY_EN: dict[str, str] = { "北京": "Beijing", "上海": "Shanghai", "天津": "Tianjin", "重庆": "Chongqing", "哈尔滨": "Harbin", "长春": "Changchun", "沈阳": "Shenyang", "呼和浩特": "Hohhot", "石家庄": "Shijiazhuang", "太原": "Taiyuan", "济南": "Jinan", "郑州": "Zhengzhou", "西安": "Xi'an", "兰州": "Lanzhou", "西宁": "Xining", "银川": "Yinchuan", "乌鲁木齐": "Urumqi", "拉萨": "Lhasa", "成都": "Chengdu", "贵阳": "Guiyang", "昆明": "Kunming", "南宁": "Nanning", "海口": "Haikou", "武汉": "Wuhan", "长沙": "Changsha", "南昌": "Nanchang", "合肥": "Hefei", "南京": "Nanjing", "杭州": "Hangzhou", "福州": "Fuzhou", "广州": "Guangzhou", "深圳": "Shenzhen", "苏州": "Suzhou", "宁波": "Ningbo", "青岛": "Qingdao", "大连": "Dalian", "厦门": "Xiamen", "东莞": "Dongguan", "佛山": "Foshan", "温州": "Wenzhou", "无锡": "Wuxi", "珠海": "Zhuhai", } def _lookup_coords(province: str, city: str) -> tuple[float, float] | None: city_clean = city.replace("市", "").replace("区", "").replace("省", "") province_clean = province.replace("省", "").replace("市", "").replace("自治区", "").replace("壮族", "").replace("回族", "").replace("维吾尔", "") if city_clean in _CITY_COORDS: return _CITY_COORDS[city_clean] if city in _CITY_COORDS: return _CITY_COORDS[city] capital = _PROVINCE_CAPITAL.get(province_clean) if capital and capital in _CITY_COORDS: return _CITY_COORDS[capital] return None class GeoResolver: def __init__(self, db_path: str, asn_db_path: str) -> None: self._db_path = db_path self._asn_db_path = asn_db_path self._geoip_reader = None self._asn_reader = None self._iputil = None def _get_iputil(self): if self._iputil is None: try: import iputil self._iputil = iputil except Exception: self._iputil = False return self._iputil if self._iputil is not False else None def _get_geoip_reader(self): if self._geoip_reader is None: import geoip2.database self._geoip_reader = geoip2.database.Reader(self._db_path) return self._geoip_reader def _get_asn_reader(self): if self._asn_reader is None: import geoip2.database self._asn_reader = geoip2.database.Reader(self._asn_db_path) return self._asn_reader def _resolve_cn(self, ip: str) -> GeoInfo | None: """用 iputil(ip2region) 解析中国 IP,返回 None 表示非中国或失败。""" lib = self._get_iputil() if not lib: return None try: # 返回格式: 国家|区域|省|市|ISP 例: 中国|0|四川|成都|电信 result = lib.get_region(ip) parts = result.split("|") if len(parts) < 5: return None country_raw, _, province, city, isp = parts[0], parts[1], parts[2], parts[3], parts[4] if country_raw not in ("中国", "中国大陆"): return None coords = _lookup_coords(province, city) lat, lon = (coords[0], coords[1]) if coords else (None, None) city_clean = city.replace("市", "").replace("区", "") if city and city != "0" else province.replace("省", "").replace("自治区", "").replace("壮族", "").replace("回族", "").replace("维吾尔", "") city_display = _CITY_EN.get(city_clean, city_clean) return GeoInfo("China", city_display, lat, lon, isp if isp != "0" else None) except Exception: return None def _resolve_geoip(self, ip: str) -> GeoInfo: """降级用 GeoLite2 解析。""" try: resp = self._get_geoip_reader().city(ip) country = resp.country.name or "Unknown" city = resp.city.name or "Unknown" lat = resp.location.latitude lon = resp.location.longitude except Exception: country, city, lat, lon = "Unknown", "Unknown", None, None org = None try: asn_resp = self._get_asn_reader().asn(ip) org = asn_resp.autonomous_system_organization or None except Exception: pass return GeoInfo(country, city, lat, lon, org) def resolve(self, ip: str) -> GeoInfo: if ip in ("127.0.0.1", "::1", "localhost") or \ ip.startswith("192.168.") or ip.startswith("10.") or ip.startswith("172."): return GeoInfo("Local", "Loopback", None, None, None) cn_info = self._resolve_cn(ip) if cn_info: return cn_info return self._resolve_geoip(ip) geo_resolver = GeoResolver(settings.geoip_db_path, settings.geoip_asn_db_path)