sms.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. """阿里云短信服务封装"""
  2. import base64
  3. import hashlib
  4. import hmac
  5. import logging
  6. import random
  7. import string
  8. import uuid
  9. from datetime import datetime, timezone
  10. from urllib.parse import quote
  11. import httpx
  12. from app.config import settings
  13. from app.redis import get_redis
  14. logger = logging.getLogger(__name__)
  15. ALIYUN_ENDPOINT = "https://dysmsapi.aliyuncs.com/"
  16. def _percent_encode(s: str) -> str:
  17. """RFC 3986 percent encoding"""
  18. return quote(s, safe="").replace("+", "%20").replace("*", "%2A").replace("%7E", "~")
  19. def _build_signature(method: str, params: dict[str, str], secret: str) -> str:
  20. """阿里云 POP 签名"""
  21. sorted_keys = sorted(params.keys())
  22. canonicalized = "&".join(f"{_percent_encode(k)}={_percent_encode(params[k])}" for k in sorted_keys)
  23. string_to_sign = f"{method.upper()}&%2F&{_percent_encode(canonicalized)}"
  24. signing_key = secret + "&"
  25. h = hmac.new(signing_key.encode(), string_to_sign.encode(), hashlib.sha1)
  26. return base64.b64encode(h.digest()).decode()
  27. async def _send_sms(phone: str, template_code: str, template_params: dict, retry: bool = False) -> tuple[bool, str]:
  28. """调用阿里云 SendSms 接口。返回 (success, reason)
  29. retry=True 时遇到冷却期等待后重试;retry=False(默认)时遇到冷却期立即返回。
  30. """
  31. if not settings.sms_access_key_id or not settings.sms_access_key_secret:
  32. logger.warning("阿里云短信 AK/SK 未配置,跳过发送")
  33. return False, "AK/SK未配置"
  34. r = await get_redis()
  35. cooldown_key = f"sms:cooldown:{phone}"
  36. if await r.exists(cooldown_key):
  37. if retry:
  38. ttl = await r.ttl(cooldown_key)
  39. wait_sec = max(ttl, 1) + 1
  40. logger.info("手机号 %s 在冷却期内,等待 %d 秒后重试发送", phone, wait_sec)
  41. import asyncio
  42. await asyncio.sleep(wait_sec)
  43. else:
  44. logger.info("手机号 %s 在冷却期内,跳过立即发送,等待后台重试", phone)
  45. return False, "冷却期内,等待后台重试"
  46. await r.setex(cooldown_key, 30, "1")
  47. now = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
  48. params = {
  49. "AccessKeyId": settings.sms_access_key_id,
  50. "Action": "SendSms",
  51. "Format": "JSON",
  52. "PhoneNumbers": phone,
  53. "RegionId": "cn-hangzhou",
  54. "SignName": settings.sms_sign_name,
  55. "SignatureMethod": "HMAC-SHA1",
  56. "SignatureNonce": uuid.uuid4().hex,
  57. "SignatureVersion": "1.0",
  58. "TemplateCode": template_code,
  59. "TemplateParam": str(template_params).replace("'", '"'),
  60. "Timestamp": now,
  61. "Version": "2017-05-25",
  62. }
  63. signature = _build_signature("POST", params, settings.sms_access_key_secret)
  64. params["Signature"] = signature
  65. try:
  66. async with httpx.AsyncClient(timeout=10) as client:
  67. resp = await client.post(ALIYUN_ENDPOINT, data=params)
  68. result = resp.json()
  69. if result.get("Code") == "OK":
  70. logger.info("短信发送成功: phone=%s", phone)
  71. return True, "发送成功"
  72. else:
  73. code = result.get("Code")
  74. msg = result.get("Message")
  75. logger.error("短信发送失败: phone=%s, code=%s, msg=%s", phone, code, msg)
  76. return False, f"阿里云错误: {code} - {msg}"
  77. except Exception as e:
  78. logger.exception("短信发送异常: phone=%s", phone)
  79. return False, f"异常: {e}"
  80. async def send_sms_code(phone: str, code: str) -> tuple[bool, str]:
  81. """发送验证码短信。返回 (success, reason)"""
  82. return await _send_sms(phone, settings.sms_template_code_verify, {"code": code})
  83. async def send_license_expired(phone: str, company: str, retry: bool = False) -> tuple[bool, str]:
  84. """发送 License 过期通知短信。返回 (success, reason)"""
  85. return await _send_sms(phone, settings.sms_template_code_expired, {"company": company}, retry=retry)
  86. async def send_license_restored(phone: str, company: str, retry: bool = False) -> tuple[bool, str]:
  87. """发送 License 恢复通知短信。返回 (success, reason)"""
  88. return await _send_sms(phone, settings.sms_template_code_restored, {"company": company}, retry=retry)
  89. async def send_license_warning(phone: str, company: str, days: int, retry: bool = False) -> tuple[bool, str]:
  90. """发送 License 即将过期预警短信。返回 (success, reason)"""
  91. return await _send_sms(phone, settings.sms_template_code_warning, {"company": company, "days": str(days)}, retry=retry)
  92. def generate_verify_code() -> str:
  93. """生成 6 位数字验证码"""
  94. return "".join(random.choices(string.digits, k=6))