""" 阿里云短信服务 提供验证码发送、余额预警短信发送功能 """ import os import json import random import logging from typing import Optional logger = logging.getLogger(__name__) SMS_ACCESS_KEY_ID = os.getenv("SMS_ACCESS_KEY_ID", os.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID", "")) SMS_ACCESS_KEY_SECRET = os.getenv("SMS_ACCESS_KEY_SECRET", os.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET", "")) SMS_SIGN_NAME = os.getenv("SMS_SIGN_NAME", "智创空间") SMS_TEMPLATE_VERIFY = os.getenv("SMS_TEMPLATE_CODE_VERIFY", "") # 验证码模板 SMS_TEMPLATE_WARN = os.getenv("SMS_TEMPLATE_CODE_WARN", "") # 余额预警模板 # 验证码有效期(秒) CODE_TTL = 300 # 5分钟 # 同一手机号发送间隔(秒) SEND_INTERVAL = 60 def _get_client(): from alibabacloud_dysmsapi20170525.client import Client from alibabacloud_tea_openapi import models as open_api_models config = open_api_models.Config( access_key_id=SMS_ACCESS_KEY_ID, access_key_secret=SMS_ACCESS_KEY_SECRET, ) config.endpoint = "dysmsapi.aliyuncs.com" return Client(config) def send_sms(phone: str, template_code: str, params: dict) -> bool: """发送短信,返回是否成功""" try: from alibabacloud_dysmsapi20170525 import models as sms_models client = _get_client() request = sms_models.SendSmsRequest( phone_numbers=phone, sign_name=SMS_SIGN_NAME, template_code=template_code, template_param=json.dumps(params, ensure_ascii=False), ) resp = client.send_sms(request) if resp.body.code == "OK": logger.info(f"短信发送成功: phone={phone}, template={template_code}") return True else: logger.warning(f"短信发送失败: phone={phone}, code={resp.body.code}, msg={resp.body.message}") return False except Exception as e: logger.error(f"短信发送异常: phone={phone}, error={e}") return False class SmsCodeService: """验证码服务(基于 Redis)""" REDIS_PREFIX_CODE = "sms:code:" REDIS_PREFIX_INTERVAL = "sms:interval:" def __init__(self): from app.core.redis import redis_manager self._redis_manager = redis_manager @property def redis(self): return self._redis_manager.get_client() async def send_code(self, phone: str) -> tuple[bool, str]: """ 发送验证码 返回 (success, message) """ if not self.redis: return False, "短信服务暂不可用" # 检查发送间隔 interval_key = f"{self.REDIS_PREFIX_INTERVAL}{phone}" if await self.redis.exists(interval_key): ttl = await self.redis.ttl(interval_key) return False, f"请 {ttl} 秒后再试" # 生成6位验证码 code = str(random.randint(100000, 999999)) # 发送短信 if not SMS_TEMPLATE_VERIFY: logger.warning("未配置验证码短信模板 SMS_TEMPLATE_CODE_VERIFY") # 开发模式:不实际发送,直接存储 else: ok = send_sms(phone, SMS_TEMPLATE_VERIFY, {"code": code}) if not ok: return False, "短信发送失败,请稍后重试" # 存储验证码 code_key = f"{self.REDIS_PREFIX_CODE}{phone}" await self.redis.setex(code_key, CODE_TTL, code) # 设置发送间隔 await self.redis.setex(interval_key, SEND_INTERVAL, "1") logger.info(f"验证码已发送: phone={phone}, code={code}") return True, "验证码已发送" async def verify_code(self, phone: str, code: str, delete_after: bool = True) -> bool: """验证验证码,验证成功后默认删除""" if not self.redis: logger.warning(f"Redis 不可用,验证码验证失败: phone={phone}") return False code_key = f"{self.REDIS_PREFIX_CODE}{phone}" stored = await self.redis.get(code_key) logger.info(f"验证码校验: phone={phone}, input={code}, stored={stored}") if stored and stored == code: if delete_after: await self.redis.delete(code_key) return True return False # 全局单例 sms_code_service = SmsCodeService() class TenantWarnSmsService: """企业余额预警短信服务""" # 已发送预警记录(避免重复发送),key: tenant_id:level REDIS_PREFIX_WARN = "sms:warn:" # 同一档位预警间隔(秒):24小时 WARN_INTERVAL = 86400 def __init__(self): from app.core.redis import redis_manager self._redis_manager = redis_manager @property def redis(self): return self._redis_manager.get_client() async def send_warn(self, tenant_id: int, phone: str, company_name: str, balance: float, level: str) -> bool: """发送余额预警短信,同一档位24小时内不重复发送""" if not SMS_TEMPLATE_WARN: logger.warning("未配置预警短信模板 SMS_TEMPLATE_CODE_WARN") return False # 检查是否已发送过该档位 if self.redis: warn_key = f"{self.REDIS_PREFIX_WARN}{tenant_id}:{level}" if await self.redis.exists(warn_key): return False # 已发送,跳过 ok = send_sms(phone, SMS_TEMPLATE_WARN, { "companyName": company_name, "remainAmount": f"{balance:.2f}", }) if ok and self.redis: warn_key = f"{self.REDIS_PREFIX_WARN}{tenant_id}:{level}" await self.redis.setex(warn_key, self.WARN_INTERVAL, "1") return ok async def check_and_warn(self, db) -> list[dict]: """ 检查所有企业余额,对达到预警阈值的企业发送短信 返回已发送的记录列表 """ from app.models.tenant import Tenant, TenantApplication from decimal import Decimal from sqlalchemy import and_ sent = [] tenants = db.query(Tenant).filter(Tenant.status == "active").all() for tenant in tenants: balance = float(tenant.balance or 0) level = None if balance <= 0: level = "critical" elif balance <= 100: level = "low" elif balance <= 200: level = "medium" elif balance <= 500: level = "warning" if not level: continue # 获取联系电话(从 TenantApplication 里取) app = db.query(TenantApplication).filter( TenantApplication.tenant_id == tenant.id ).order_by(TenantApplication.id.desc()).first() phone = app.contact_phone if app else None if not phone: continue ok = await self.send_warn( tenant_id=tenant.id, phone=phone, company_name=tenant.company_name, balance=balance, level=level, ) if ok: sent.append({ "tenant_id": tenant.id, "company_name": tenant.company_name, "phone": phone, "balance": balance, "level": level, }) return sent tenant_warn_sms = TenantWarnSmsService()