| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120 |
- """阿里云短信服务封装"""
- import base64
- import hashlib
- import hmac
- import logging
- import random
- import string
- import uuid
- from datetime import datetime, timezone
- from urllib.parse import quote
- import httpx
- from app.config import settings
- from app.redis import get_redis
- logger = logging.getLogger(__name__)
- ALIYUN_ENDPOINT = "https://dysmsapi.aliyuncs.com/"
- def _percent_encode(s: str) -> str:
- """RFC 3986 percent encoding"""
- return quote(s, safe="").replace("+", "%20").replace("*", "%2A").replace("%7E", "~")
- def _build_signature(method: str, params: dict[str, str], secret: str) -> str:
- """阿里云 POP 签名"""
- sorted_keys = sorted(params.keys())
- canonicalized = "&".join(f"{_percent_encode(k)}={_percent_encode(params[k])}" for k in sorted_keys)
- string_to_sign = f"{method.upper()}&%2F&{_percent_encode(canonicalized)}"
- signing_key = secret + "&"
- h = hmac.new(signing_key.encode(), string_to_sign.encode(), hashlib.sha1)
- return base64.b64encode(h.digest()).decode()
- async def _send_sms(phone: str, template_code: str, template_params: dict, retry: bool = False) -> tuple[bool, str]:
- """调用阿里云 SendSms 接口。返回 (success, reason)
- retry=True 时遇到冷却期等待后重试;retry=False(默认)时遇到冷却期立即返回。
- """
- if not settings.sms_access_key_id or not settings.sms_access_key_secret:
- logger.warning("阿里云短信 AK/SK 未配置,跳过发送")
- return False, "AK/SK未配置"
- r = await get_redis()
- cooldown_key = f"sms:cooldown:{phone}"
- if await r.exists(cooldown_key):
- if retry:
- ttl = await r.ttl(cooldown_key)
- wait_sec = max(ttl, 1) + 1
- logger.info("手机号 %s 在冷却期内,等待 %d 秒后重试发送", phone, wait_sec)
- import asyncio
- await asyncio.sleep(wait_sec)
- else:
- logger.info("手机号 %s 在冷却期内,跳过立即发送,等待后台重试", phone)
- return False, "冷却期内,等待后台重试"
- await r.setex(cooldown_key, 30, "1")
- now = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
- params = {
- "AccessKeyId": settings.sms_access_key_id,
- "Action": "SendSms",
- "Format": "JSON",
- "PhoneNumbers": phone,
- "RegionId": "cn-hangzhou",
- "SignName": settings.sms_sign_name,
- "SignatureMethod": "HMAC-SHA1",
- "SignatureNonce": uuid.uuid4().hex,
- "SignatureVersion": "1.0",
- "TemplateCode": template_code,
- "TemplateParam": str(template_params).replace("'", '"'),
- "Timestamp": now,
- "Version": "2017-05-25",
- }
- signature = _build_signature("POST", params, settings.sms_access_key_secret)
- params["Signature"] = signature
- try:
- async with httpx.AsyncClient(timeout=10) as client:
- resp = await client.post(ALIYUN_ENDPOINT, data=params)
- result = resp.json()
- if result.get("Code") == "OK":
- logger.info("短信发送成功: phone=%s", phone)
- return True, "发送成功"
- else:
- code = result.get("Code")
- msg = result.get("Message")
- logger.error("短信发送失败: phone=%s, code=%s, msg=%s", phone, code, msg)
- return False, f"阿里云错误: {code} - {msg}"
- except Exception as e:
- logger.exception("短信发送异常: phone=%s", phone)
- return False, f"异常: {e}"
- async def send_sms_code(phone: str, code: str) -> tuple[bool, str]:
- """发送验证码短信。返回 (success, reason)"""
- return await _send_sms(phone, settings.sms_template_code_verify, {"code": code})
- async def send_license_expired(phone: str, company: str, retry: bool = False) -> tuple[bool, str]:
- """发送 License 过期通知短信。返回 (success, reason)"""
- return await _send_sms(phone, settings.sms_template_code_expired, {"company": company}, retry=retry)
- async def send_license_restored(phone: str, company: str, retry: bool = False) -> tuple[bool, str]:
- """发送 License 恢复通知短信。返回 (success, reason)"""
- return await _send_sms(phone, settings.sms_template_code_restored, {"company": company}, retry=retry)
- async def send_license_warning(phone: str, company: str, days: int, retry: bool = False) -> tuple[bool, str]:
- """发送 License 即将过期预警短信。返回 (success, reason)"""
- return await _send_sms(phone, settings.sms_template_code_warning, {"company": company, "days": str(days)}, retry=retry)
- def generate_verify_code() -> str:
- """生成 6 位数字验证码"""
- return "".join(random.choices(string.digits, k=6))
|