geo.py 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195
  1. from __future__ import annotations
  2. from dataclasses import dataclass
  3. from typing import Optional
  4. from app.config import settings
  5. @dataclass
  6. class GeoInfo:
  7. country: str
  8. city: str
  9. latitude: Optional[float]
  10. longitude: Optional[float]
  11. org: Optional[str] = None
  12. _UNKNOWN = GeoInfo("Unknown", "Unknown", None, None, None)
  13. # 主要城市/省会坐标(省名/市名 → (lat, lon))
  14. _CITY_COORDS: dict[str, tuple[float, float]] = {
  15. # 直辖市
  16. "北京": (39.9042, 116.4074),
  17. "上海": (31.2304, 121.4737),
  18. "天津": (39.0842, 117.2010),
  19. "重庆": (29.5630, 106.5516),
  20. # 省会
  21. "哈尔滨": (45.8038, 126.5349),
  22. "长春": (43.8171, 125.3235),
  23. "沈阳": (41.8057, 123.4315),
  24. "呼和浩特":(40.8426, 111.7496),
  25. "石家庄": (38.0428, 114.5149),
  26. "太原": (37.8706, 112.5489),
  27. "济南": (36.6512, 117.1201),
  28. "郑州": (34.7466, 113.6253),
  29. "西安": (34.3416, 108.9398),
  30. "兰州": (36.0611, 103.8343),
  31. "西宁": (36.6171, 101.7782),
  32. "银川": (38.4872, 106.2309),
  33. "乌鲁木齐":(43.8256, 87.6168),
  34. "拉萨": (29.6520, 91.1721),
  35. "成都": (30.5728, 104.0668),
  36. "贵阳": (26.6470, 106.6302),
  37. "昆明": (25.0453, 102.7097),
  38. "南宁": (22.8170, 108.3665),
  39. "海口": (20.0440, 110.1999),
  40. "武汉": (30.5928, 114.3055),
  41. "长沙": (28.2282, 112.9388),
  42. "南昌": (28.6820, 115.8579),
  43. "合肥": (31.8206, 117.2272),
  44. "南京": (32.0603, 118.7969),
  45. "杭州": (30.2741, 120.1551),
  46. "福州": (26.0745, 119.2965),
  47. "广州": (23.1291, 113.2644),
  48. "深圳": (22.5431, 114.0579),
  49. # 常见大城市
  50. "苏州": (31.2989, 120.5853),
  51. "宁波": (29.8683, 121.5440),
  52. "青岛": (36.0671, 120.3826),
  53. "大连": (38.9140, 121.6147),
  54. "厦门": (24.4798, 118.0894),
  55. "东莞": (23.0207, 113.7518),
  56. "佛山": (23.0219, 113.1219),
  57. "温州": (28.0000, 120.6720),
  58. "无锡": (31.4912, 120.3119),
  59. "珠海": (22.2710, 113.5767),
  60. }
  61. # 省名 → 省会
  62. _PROVINCE_CAPITAL: dict[str, str] = {
  63. "黑龙江": "哈尔滨", "吉林": "长春", "辽宁": "沈阳",
  64. "内蒙古": "呼和浩特", "河北": "石家庄", "山西": "太原",
  65. "山东": "济南", "河南": "郑州", "陕西": "西安",
  66. "甘肃": "兰州", "青海": "西宁", "宁夏": "银川",
  67. "新疆": "乌鲁木齐", "西藏": "拉萨", "四川": "成都",
  68. "贵州": "贵阳", "云南": "昆明", "广西": "南宁",
  69. "海南": "海口", "湖北": "武汉", "湖南": "长沙",
  70. "江西": "南昌", "安徽": "合肥", "江苏": "南京",
  71. "浙江": "杭州", "福建": "福州", "广东": "广州",
  72. "北京": "北京", "上海": "上海", "天津": "天津", "重庆": "重庆",
  73. }
  74. # 中文城市名 → 英文(与 GeoLite2 保持一致)
  75. _CITY_EN: dict[str, str] = {
  76. "北京": "Beijing", "上海": "Shanghai", "天津": "Tianjin", "重庆": "Chongqing",
  77. "哈尔滨": "Harbin", "长春": "Changchun", "沈阳": "Shenyang", "呼和浩特": "Hohhot",
  78. "石家庄": "Shijiazhuang", "太原": "Taiyuan", "济南": "Jinan", "郑州": "Zhengzhou",
  79. "西安": "Xi'an", "兰州": "Lanzhou", "西宁": "Xining", "银川": "Yinchuan",
  80. "乌鲁木齐": "Urumqi", "拉萨": "Lhasa", "成都": "Chengdu", "贵阳": "Guiyang",
  81. "昆明": "Kunming", "南宁": "Nanning", "海口": "Haikou", "武汉": "Wuhan",
  82. "长沙": "Changsha", "南昌": "Nanchang", "合肥": "Hefei", "南京": "Nanjing",
  83. "杭州": "Hangzhou", "福州": "Fuzhou", "广州": "Guangzhou", "深圳": "Shenzhen",
  84. "苏州": "Suzhou", "宁波": "Ningbo", "青岛": "Qingdao", "大连": "Dalian",
  85. "厦门": "Xiamen", "东莞": "Dongguan", "佛山": "Foshan", "温州": "Wenzhou",
  86. "无锡": "Wuxi", "珠海": "Zhuhai",
  87. }
  88. def _lookup_coords(province: str, city: str) -> tuple[float, float] | None:
  89. city_clean = city.replace("市", "").replace("区", "").replace("省", "")
  90. province_clean = province.replace("省", "").replace("市", "").replace("自治区", "").replace("壮族", "").replace("回族", "").replace("维吾尔", "")
  91. if city_clean in _CITY_COORDS:
  92. return _CITY_COORDS[city_clean]
  93. if city in _CITY_COORDS:
  94. return _CITY_COORDS[city]
  95. capital = _PROVINCE_CAPITAL.get(province_clean)
  96. if capital and capital in _CITY_COORDS:
  97. return _CITY_COORDS[capital]
  98. return None
  99. class GeoResolver:
  100. def __init__(self, db_path: str, asn_db_path: str) -> None:
  101. self._db_path = db_path
  102. self._asn_db_path = asn_db_path
  103. self._geoip_reader = None
  104. self._asn_reader = None
  105. self._iputil = None
  106. def _get_iputil(self):
  107. if self._iputil is None:
  108. try:
  109. import iputil
  110. self._iputil = iputil
  111. except Exception:
  112. self._iputil = False
  113. return self._iputil if self._iputil is not False else None
  114. def _get_geoip_reader(self):
  115. if self._geoip_reader is None:
  116. import geoip2.database
  117. self._geoip_reader = geoip2.database.Reader(self._db_path)
  118. return self._geoip_reader
  119. def _get_asn_reader(self):
  120. if self._asn_reader is None:
  121. import geoip2.database
  122. self._asn_reader = geoip2.database.Reader(self._asn_db_path)
  123. return self._asn_reader
  124. def _resolve_cn(self, ip: str) -> GeoInfo | None:
  125. """用 iputil(ip2region) 解析中国 IP,返回 None 表示非中国或失败。"""
  126. lib = self._get_iputil()
  127. if not lib:
  128. return None
  129. try:
  130. # 返回格式: 国家|区域|省|市|ISP 例: 中国|0|四川|成都|电信
  131. result = lib.get_region(ip)
  132. parts = result.split("|")
  133. if len(parts) < 5:
  134. return None
  135. country_raw, _, province, city, isp = parts[0], parts[1], parts[2], parts[3], parts[4]
  136. if country_raw not in ("中国", "中国大陆"):
  137. return None
  138. coords = _lookup_coords(province, city)
  139. lat, lon = (coords[0], coords[1]) if coords else (None, None)
  140. city_clean = city.replace("市", "").replace("区", "") if city and city != "0" else province.replace("省", "").replace("自治区", "").replace("壮族", "").replace("回族", "").replace("维吾尔", "")
  141. city_display = _CITY_EN.get(city_clean, city_clean)
  142. return GeoInfo("China", city_display, lat, lon, isp if isp != "0" else None)
  143. except Exception:
  144. return None
  145. def _resolve_geoip(self, ip: str) -> GeoInfo:
  146. """降级用 GeoLite2 解析。"""
  147. try:
  148. resp = self._get_geoip_reader().city(ip)
  149. country = resp.country.name or "Unknown"
  150. city = resp.city.name or "Unknown"
  151. lat = resp.location.latitude
  152. lon = resp.location.longitude
  153. except Exception:
  154. country, city, lat, lon = "Unknown", "Unknown", None, None
  155. org = None
  156. try:
  157. asn_resp = self._get_asn_reader().asn(ip)
  158. org = asn_resp.autonomous_system_organization or None
  159. except Exception:
  160. pass
  161. return GeoInfo(country, city, lat, lon, org)
  162. def resolve(self, ip: str) -> GeoInfo:
  163. if ip in ("127.0.0.1", "::1", "localhost") or \
  164. ip.startswith("192.168.") or ip.startswith("10.") or ip.startswith("172."):
  165. return GeoInfo("Local", "Loopback", None, None, None)
  166. cn_info = self._resolve_cn(ip)
  167. if cn_info:
  168. return cn_info
  169. return self._resolve_geoip(ip)
  170. geo_resolver = GeoResolver(settings.geoip_db_path, settings.geoip_asn_db_path)