| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110 |
- """阿里云短信服务封装"""
- import base64
- import hashlib
- import hmac
- import logging
- import random
- import string
- import uuid
- from datetime import datetime, timezone
- from urllib.parse import quote
- import httpx
- from app.config import settings
- logger = logging.getLogger(__name__)
- ALIYUN_ENDPOINT = "https://dysmsapi.aliyuncs.com/"
- # 内存防重:记录每个手机号的最后一次发送时间,60 秒内不发第二次
- _send_times: dict[str, float] = {}
- _SMS_COOLDOWN = 60 # 秒
- def _percent_encode(s: str) -> str:
- """RFC 3986 percent encoding"""
- return quote(s, safe="").replace("+", "%20").replace("*", "%2A").replace("%7E", "~")
- def _build_signature(method: str, params: dict[str, str], secret: str) -> str:
- """阿里云 POP 签名"""
- sorted_keys = sorted(params.keys())
- canonicalized = "&".join(f"{_percent_encode(k)}={_percent_encode(params[k])}" for k in sorted_keys)
- string_to_sign = f"{method.upper()}&%2F&{_percent_encode(canonicalized)}"
- signing_key = secret + "&"
- h = hmac.new(signing_key.encode(), string_to_sign.encode(), hashlib.sha1)
- return base64.b64encode(h.digest()).decode()
- async def _send_sms(phone: str, template_code: str, template_params: dict) -> tuple[bool, str]:
- """调用阿里云 SendSms 接口。返回 (success, reason)"""
- if not settings.sms_access_key_id or not settings.sms_access_key_secret:
- logger.warning("阿里云短信配置未设置(AK/SK为空),跳过短信发送")
- return False, "AK/SK未配置"
- now_ts = datetime.now(timezone.utc).timestamp()
- last = _send_times.get(phone, 0)
- if now_ts - last < _SMS_COOLDOWN:
- logger.info("手机号 %s 距离上次发送不足 %d 秒,跳过防重", phone, _SMS_COOLDOWN)
- return False, "冷却期内,跳过发送"
- _send_times[phone] = now_ts
- now = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
- params = {
- "AccessKeyId": settings.sms_access_key_id,
- "Action": "SendSms",
- "Format": "JSON",
- "PhoneNumbers": phone,
- "RegionId": "cn-hangzhou",
- "SignName": settings.sms_sign_name,
- "SignatureMethod": "HMAC-SHA1",
- "SignatureNonce": uuid.uuid4().hex,
- "SignatureVersion": "1.0",
- "TemplateCode": template_code,
- "TemplateParam": str(template_params).replace("'", '"'),
- "Timestamp": now,
- "Version": "2017-05-25",
- }
- signature = _build_signature("POST", params, settings.sms_access_key_secret)
- params["Signature"] = signature
- try:
- async with httpx.AsyncClient(timeout=10) as client:
- resp = await client.post(ALIYUN_ENDPOINT, data=params)
- result = resp.json()
- if result.get("Code") == "OK":
- logger.info("短信发送成功: %s", phone)
- return True, "发送成功"
- else:
- code = result.get("Code")
- msg = result.get("Message")
- logger.error("短信发送失败: phone=%s, code=%s, msg=%s", phone, code, msg)
- return False, f"阿里云错误: {code} - {msg}"
- except Exception as e:
- logger.exception("短信发送异常: %s", phone)
- return False, f"异常: {e}"
- async def send_sms_code(phone: str, code: str) -> tuple[bool, str]:
- """发送验证码短信。返回 (success, reason)"""
- return await _send_sms(phone, settings.sms_template_code_verify, {"code": code})
- async def send_license_expired(phone: str, company: str) -> tuple[bool, str]:
- """发送 License 过期通知短信。返回 (success, reason)"""
- return await _send_sms(phone, settings.sms_template_code_expired, {"company": company})
- async def send_license_restored(phone: str, company: str) -> tuple[bool, str]:
- """发送 License 恢复通知短信。返回 (success, reason)"""
- return await _send_sms(phone, settings.sms_template_code_restored, {"company": company})
- async def send_license_warning(phone: str, company: str, days: int) -> tuple[bool, str]:
- """发送 License 即将过期预警短信。返回 (success, reason)"""
- return await _send_sms(phone, settings.sms_template_code_warning, {"company": company, "days": str(days)})
- def generate_verify_code() -> str:
- """生成 6 位数字验证码"""
- return "".join(random.choices(string.digits, k=6))
|