Переглянути джерело

fix(dev分支合并):分支合并

lingmin_package@163.com 1 тиждень тому
батько
коміт
9c71165d2d
1 змінених файлів з 542 додано та 0 видалено
  1. 542 0
      utils_test/API_key/api_key_generator.py

+ 542 - 0
utils_test/API_key/api_key_generator.py

@@ -0,0 +1,542 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+"""
+大模型部署 API Key 生成器
+
+特性:
+- 生成加密安全的随机 API Key
+- 支持多种格式(UUID、随机字符串、JWT 风格)
+- 包含前缀标识(便于区分用途和环境)
+- 支持校验和验证(防止篡改)
+- 可配置 Key 长度和复杂度
+
+使用示例:
+    # 生成标准 API Key
+    key = generate_api_key(prefix="sk-prod")
+
+    # 生成带有项目标识的 Key
+    key = generate_api_key(prefix="sk-dev", project_id="proj_123")
+
+    # 验证 API Key 格式
+    is_valid = validate_api_key(key, prefix="sk-prod")
+"""
+
+import secrets
+import hashlib
+import base64
+import re
+import json
+import hmac
+from datetime import datetime, timedelta
+from typing import Optional, Tuple, Dict, Any
+from dataclasses import dataclass
+from enum import Enum
+
+
+class KeyFormat(Enum):
+    """API Key 格式类型"""
+    STANDARD = "standard"      # 标准格式: prefix_xxxxxxxxx
+    UUID = "uuid"              # UUID 格式
+    JWT_STYLE = "jwt_style"    # JWT 风格: xxxxx.yyyyy.zzzzz
+    BASE64 = "base64"          # Base64 编码
+    HEX = "hex"                # 十六进制
+
+
+@dataclass
+class APIKeyConfig:
+    """API Key 配置"""
+    prefix: str = "sk"                 # 前缀,如 sk, pk, dk(secret key, public key, dev key)
+    environment: str = "prod"          # 环境:prod, dev, test
+    length: int = 32                   # 随机部分长度
+    include_checksum: bool = True      # 是否包含校验和
+    include_timestamp: bool = False    # 是否包含时间戳
+    project_id: Optional[str] = None   # 项目标识
+    secret_key: Optional[str] = None   # 用于签名(HMAC)
+
+
+class APIKeyGenerator:
+    """API Key 生成器"""
+
+    # 推荐的密钥长度(字节)
+    RECOMMENDED_LENGTHS = {
+        "low": 16,      # 128 bit
+        "medium": 24,   # 192 bit
+        "high": 32,     # 256 bit
+        "maximum": 48   # 384 bit
+    }
+
+    def __init__(self, config: Optional[APIKeyConfig] = None):
+        self.config = config or APIKeyConfig()
+
+    def generate(self, format_type: KeyFormat = KeyFormat.STANDARD) -> str:
+        """
+        生成 API Key
+
+        Args:
+            format_type: Key 格式类型
+
+        Returns:
+            str: 生成的 API Key
+        """
+        generators = {
+            KeyFormat.STANDARD: self._generate_standard,
+            KeyFormat.UUID: self._generate_uuid,
+            KeyFormat.JWT_STYLE: self._generate_jwt_style,
+            KeyFormat.BASE64: self._generate_base64,
+            KeyFormat.HEX: self._generate_hex,
+        }
+
+        generator = generators.get(format_type, self._generate_standard)
+        return generator()
+
+    def _generate_standard(self) -> str:
+        """生成标准格式 API Key: prefix_env_xxxxxxxxx_checksum"""
+        # 构建前缀
+        prefix_parts = [self.config.prefix]
+        if self.config.environment:
+            prefix_parts.append(self.config.environment)
+        if self.config.project_id:
+            prefix_parts.append(self.config.project_id)
+
+        prefix = "_".join(prefix_parts)
+
+        # 生成随机部分
+        random_bytes = secrets.token_bytes(self.config.length)
+        random_part = base64.urlsafe_b64encode(random_bytes).decode('ascii').rstrip('=')
+
+        # 可选:添加时间戳
+        if self.config.include_timestamp:
+            timestamp = hex(int(datetime.utcnow().timestamp()))[2:]
+            random_part = f"{timestamp}_{random_part[:24]}"
+
+        # 构建基础 key
+        base_key = f"{prefix}_{random_part[:self.config.length]}"
+
+        # 可选:添加校验和
+        if self.config.include_checksum:
+            checksum = self._calculate_checksum(base_key)
+            return f"{base_key}_{checksum}"
+
+        return base_key
+
+    def _generate_uuid(self) -> str:
+        """生成 UUID 风格 API Key"""
+        # 生成 16 字节随机数
+        random_bytes = secrets.token_bytes(16)
+
+        # 转换为 UUID 格式
+        hex_str = random_bytes.hex()
+        uuid_format = f"{hex_str[:8]}-{hex_str[8:12]}-{hex_str[12:16]}-{hex_str[16:20]}-{hex_str[20:32]}"
+
+        prefix = f"{self.config.prefix}-{self.config.environment}" if self.config.environment else self.config.prefix
+        return f"{prefix}-{uuid_format}"
+
+    def _generate_jwt_style(self) -> str:
+        """生成 JWT 风格 API Key: header.payload.signature"""
+        # Header
+        header = {
+            "alg": "HS256" if self.config.secret_key else "none",
+            "typ": "API-KEY",
+            "env": self.config.environment,
+            "prefix": self.config.prefix
+        }
+        header_b64 = base64.urlsafe_b64encode(
+            json.dumps(header, separators=(',', ':')).encode()
+        ).decode().rstrip('=')
+
+        # Payload(随机数据 + 时间戳)
+        payload_data = secrets.token_hex(16)
+        payload = {
+            "rnd": payload_data,
+            "iat": int(datetime.utcnow().timestamp()),
+            "project": self.config.project_id or "default"
+        }
+        payload_b64 = base64.urlsafe_b64encode(
+            json.dumps(payload, separators=(',', ':')).encode()
+        ).decode().rstrip('=')
+
+        # Signature
+        if self.config.secret_key:
+            message = f"{header_b64}.{payload_b64}"
+            signature = hmac.new(
+                self.config.secret_key.encode(),
+                message.encode(),
+                hashlib.sha256
+            ).digest()
+            signature_b64 = base64.urlsafe_b64encode(signature).decode().rstrip('=')[:16]
+        else:
+            signature_b64 = secrets.token_urlsafe(16).rstrip('=')
+
+        return f"{header_b64}.{payload_b64}.{signature_b64}"
+
+    def _generate_base64(self) -> str:
+        """生成 Base64 编码 API Key"""
+        random_bytes = secrets.token_bytes(self.config.length)
+        encoded = base64.urlsafe_b64encode(random_bytes).decode('ascii').rstrip('=')
+        prefix = f"{self.config.prefix}_{self.config.environment}_" if self.config.environment else f"{self.config.prefix}_"
+        return f"{prefix}{encoded}"
+
+    def _generate_hex(self) -> str:
+        """生成十六进制 API Key"""
+        random_bytes = secrets.token_bytes(self.config.length)
+        hex_str = random_bytes.hex()
+        prefix = f"{self.config.prefix}_{self.config.environment}_" if self.config.environment else f"{self.prefix}_"
+        return f"{prefix}{hex_str}"
+
+    def _calculate_checksum(self, data: str) -> str:
+        """计算校验和(前 8 位)"""
+        hash_obj = hashlib.sha256(data.encode())
+        return hash_obj.hexdigest()[:8]
+
+    @staticmethod
+    def validate(api_key: str, prefix: Optional[str] = None,
+                 secret_key: Optional[str] = None) -> Tuple[bool, Optional[Dict[str, Any]]]:
+        """
+        验证 API Key 格式
+
+        Args:
+            api_key: 要验证的 API Key
+            prefix: 期望的前缀(可选)
+            secret_key: 用于验证签名的密钥(可选)
+
+        Returns:
+            Tuple[bool, Optional[Dict]]: (是否有效, 解析后的信息)
+        """
+        if not api_key or len(api_key) < 16:
+            return False, None
+
+        info = {"raw": api_key}
+
+        # 检查前缀
+        if prefix and not api_key.startswith(prefix):
+            return False, info
+
+        # 尝试解析 JWT 风格
+        if '.' in api_key and api_key.count('.') == 2:
+            return APIKeyGenerator._validate_jwt_style(api_key, secret_key, info)
+
+        # 尝试解析标准格式(检查校验和)
+        if '_' in api_key:
+            return APIKeyGenerator._validate_standard(api_key, info)
+
+        return True, info
+
+    @staticmethod
+    def _validate_jwt_style(api_key: str, secret_key: Optional[str],
+                            info: Dict) -> Tuple[bool, Dict]:
+        """验证 JWT 风格 Key"""
+        parts = api_key.split('.')
+        if len(parts) != 3:
+            return False, info
+
+        try:
+            # 解码 header
+            header_padding = '=' * (4 - len(parts[0]) % 4)
+            header_json = base64.urlsafe_b64decode(parts[0] + header_padding)
+            header = json.loads(header_json)
+            info["header"] = header
+
+            # 解码 payload
+            payload_padding = '=' * (4 - len(parts[1]) % 4)
+            payload_json = base64.urlsafe_b64decode(parts[1] + payload_padding)
+            payload = json.loads(payload_json)
+            info["payload"] = payload
+
+            # 验证签名(如果提供了 secret_key)
+            if secret_key and header.get("alg") == "HS256":
+                message = f"{parts[0]}.{parts[1]}"
+                expected_sig = hmac.new(
+                    secret_key.encode(),
+                    message.encode(),
+                    hashlib.sha256
+                ).digest()
+                expected_sig_b64 = base64.urlsafe_b64encode(expected_sig).decode().rstrip('=')[:16]
+
+                if not hmac.compare_digest(parts[2], expected_sig_b64):
+                    return False, info
+
+            return True, info
+
+        except Exception:
+            return False, info
+
+    @staticmethod
+    def _validate_standard(api_key: str, info: Dict) -> Tuple[bool, Dict]:
+        """验证标准格式 Key(带校验和)"""
+        parts = api_key.split('_')
+
+        # 如果有校验和部分(最后一部分是 8 位十六进制)
+        if len(parts) >= 2 and len(parts[-1]) == 8 and all(c in '0123456789abcdef' for c in parts[-1]):
+            # 提取基础 key 和校验和
+            checksum = parts[-1]
+            base_key = '_'.join(parts[:-1])
+
+            # 验证校验和
+            expected_checksum = hashlib.sha256(base_key.encode()).hexdigest()[:8]
+
+            if checksum != expected_checksum:
+                return False, info
+
+            info["checksum_valid"] = True
+            info["parts"] = parts[:-1]  # 排除校验和部分
+        else:
+            info["parts"] = parts
+
+        return True, info
+
+
+# ========== 便捷函数 ==========
+
+def generate_api_key(
+    prefix: str = "sk",
+    environment: str = "prod",
+    length: int = 32,
+    format_type: KeyFormat = KeyFormat.STANDARD,
+    include_checksum: bool = True,
+    project_id: Optional[str] = None,
+    secret_key: Optional[str] = None
+) -> str:
+    """
+    便捷函数:生成 API Key
+
+    Args:
+        prefix: 前缀(如 sk, pk, dk)
+        environment: 环境(prod, dev, test)
+        length: 随机部分长度
+        format_type: 格式类型
+        include_checksum: 是否包含校验和
+        project_id: 项目标识
+        secret_key: 用于签名的密钥
+
+    Returns:
+        str: 生成的 API Key
+
+    Examples:
+        >>> generate_api_key()
+        'sk_prod_xxxxxxxxxxxxxxxx_xxxxxxxx'
+
+        >>> generate_api_key(prefix="sk-dev", environment="test")
+        'sk-dev_test_xxxxxxxxxxxxxxxx_xxxxxxxx'
+
+        >>> generate_api_key(format_type=KeyFormat.UUID)
+        'sk-prod-xxxx-xxxx-xxxx-xxxxxxxxxxxx'
+    """
+    config = APIKeyConfig(
+        prefix=prefix,
+        environment=environment,
+        length=length,
+        include_checksum=include_checksum,
+        project_id=project_id,
+        secret_key=secret_key
+    )
+    generator = APIKeyGenerator(config)
+    return generator.generate(format_type)
+
+
+def validate_api_key(
+    api_key: str,
+    prefix: Optional[str] = None,
+    secret_key: Optional[str] = None
+) -> bool:
+    """
+    便捷函数:验证 API Key 格式
+
+    Args:
+        api_key: 要验证的 Key
+        prefix: 期望的前缀(可选)
+        secret_key: 用于验证签名的密钥(可选)
+
+    Returns:
+        bool: 是否有效
+    """
+    is_valid, _ = APIKeyGenerator.validate(api_key, prefix, secret_key)
+    return is_valid
+
+
+def parse_api_key(api_key: str) -> Optional[Dict[str, Any]]:
+    """
+    解析 API Key,提取其中的信息
+
+    Args:
+        api_key: API Key
+
+    Returns:
+        Optional[Dict]: 解析出的信息,无效返回 None
+    """
+    is_valid, info = APIKeyGenerator.validate(api_key)
+    return info if is_valid else None
+
+
+# ========== 批量生成工具 ==========
+
+def generate_api_keys_batch(
+    count: int = 10,
+    prefix: str = "sk",
+    environment: str = "prod",
+    format_type: KeyFormat = KeyFormat.STANDARD,
+    save_to_file: Optional[str] = None
+) -> list:
+    """
+    批量生成 API Key
+
+    Args:
+        count: 生成数量
+        prefix: 前缀
+        environment: 环境
+        format_type: 格式
+        save_to_file: 保存到文件路径(可选)
+
+    Returns:
+        list: API Key 列表
+    """
+    config = APIKeyConfig(prefix=prefix, environment=environment)
+    generator = APIKeyGenerator(config)
+
+    keys = [generator.generate(format_type) for _ in range(count)]
+
+    if save_to_file:
+        with open(save_to_file, 'w', encoding='utf-8') as f:
+            f.write("# Generated API Keys\n")
+            f.write(f"# Count: {count}\n")
+            f.write(f"# Format: {format_type.value}\n")
+            f.write(f"# Environment: {environment}\n")
+            f.write("-" * 50 + "\n")
+            for i, key in enumerate(keys, 1):
+                f.write(f"{i}. {key}\n")
+        print(f"Keys saved to: {save_to_file}")
+
+    return keys
+
+
+# ========== 主程序 ==========
+
+if __name__ == "__main__":
+    print("=" * 60)
+    print("大模型部署 API Key 生成器")
+    print("=" * 60)
+
+    # 演示各种格式的 API Key
+    formats = [
+        ("标准格式", KeyFormat.STANDARD),
+        ("UUID 格式", KeyFormat.UUID),
+        ("JWT 风格", KeyFormat.JWT_STYLE),
+        ("Base64 格式", KeyFormat.BASE64),
+        ("十六进制格式", KeyFormat.HEX),
+    ]
+
+    print("\n1. 不同格式的 API Key 示例:")
+    print("-" * 60)
+
+    for name, fmt in formats:
+        key = generate_api_key(format_type=fmt)
+        print(f"\n{name}:")
+        print(f"  {key}")
+
+    # 不同环境的 Key
+    print("\n\n2. 不同环境的 API Key:")
+    print("-" * 60)
+
+    for env in ["prod", "dev", "test"]:
+        key = generate_api_key(environment=env)
+        print(f"\n{env.upper()}:")
+        print(f"  {key}")
+
+    # 带校验和的 Key
+    print("\n\n3. 带校验和的 Key(可验证完整性):")
+    print("-" * 60)
+
+    key_with_checksum = generate_api_key(include_checksum=True)
+    print(f"\n生成: {key_with_checksum}")
+
+    is_valid = validate_api_key(key_with_checksum)
+    print(f"验证: {'✓ 有效' if is_valid else '✗ 无效'}")
+
+    # 验证被篡改的 Key
+    tampered_key = key_with_checksum[:-8] + "12345678"
+    print(f"\n篡改: {tampered_key}")
+    is_valid = validate_api_key(tampered_key)
+    print(f"验证: {'✓ 有效' if is_valid else '✗ 无效(校验和不匹配)'}")
+
+    # 带项目标识的 Key
+    print("\n\n4. 带项目标识的 Key:")
+    print("-" * 60)
+
+    key_with_project = generate_api_key(
+        prefix="sk",
+        environment="prod",
+        project_id="proj_chatbot"
+    )
+    print(f"\n{key_with_project}")
+
+    # 批量生成
+    print("\n\n5. 批量生成(10 个测试 Key):")
+    print("-" * 60)
+
+    test_keys = generate_api_keys_batch(
+        count=5,
+        prefix="sk-test",
+        environment="dev",
+        format_type=KeyFormat.STANDARD
+    )
+    for i, key in enumerate(test_keys, 1):
+        print(f"  {i}. {key}")
+
+    # 带签名的 JWT 风格 Key
+    print("\n\n6. 带 HMAC 签名的 JWT 风格 Key:")
+    print("-" * 60)
+
+    secret = "your-secret-key-here-keep-it-safe"
+    jwt_key = generate_api_key(
+        format_type=KeyFormat.JWT_STYLE,
+        secret_key=secret,
+        environment="prod",
+        project_id="llm-api"
+    )
+    print(f"\n生成: {jwt_key}")
+
+    # 验证签名
+    is_valid, info = APIKeyGenerator.validate(jwt_key, secret_key=secret)
+    print(f"签名验证: {'✓ 有效' if is_valid else '✗ 无效'}")
+    if info and "payload" in info:
+        print(f"解析信息:")
+        for k, v in info["payload"].items():
+            print(f"  {k}: {v}")
+
+    print("\n" + "=" * 60)
+    print("使用说明:")
+    print("=" * 60)
+    print("""
+建议的生产环境使用方式:
+
+1. 生成高安全级别的 Key:
+   key = generate_api_key(
+       prefix="sk",
+       environment="prod",
+       length=32,
+       include_checksum=True
+   )
+
+2. 使用带项目标识的 Key 便于管理:
+   key = generate_api_key(
+       prefix="sk",
+       project_id="your_project_id"
+   )
+
+3. 使用 HMAC 签名防止篡改:
+   key = generate_api_key(
+       format_type=KeyFormat.JWT_STYLE,
+       secret_key=your_secret_key
+   )
+
+4. 批量生成并保存到文件:
+   keys = generate_api_keys_batch(
+       count=100,
+       save_to_file="api_keys.txt"
+   )
+
+安全建议:
+- 在生产环境使用 32 字节(256 bit)以上的密钥长度
+- 妥善保管 secret_key,不要硬编码在代码中
+- 定期轮换 API Key
+- 使用环境变量或密钥管理系统存储敏感信息
+""")