#!/usr/bin/env python # -*- coding: utf-8 -*- """ 敏感词审查 — 前端测试服务器 提供独立 HTTP API,直接调用 check_sensitive(AC 自动机 + LLM 二审) """ import sys import os import json import time import asyncio from http.server import HTTPServer, SimpleHTTPRequestHandler from urllib.parse import urlparse PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) os.chdir(PROJECT_ROOT) from foundation.observability.logger.loggering import review_logger as logger from core.construction_review.component.reviewers.utils import ( check_sensitive_words_async, SensitiveWordChecker, ) from core.construction_review.component.ai_review_engine import AIReviewEngine from core.base.task_models import TaskFileInfo def run_async(coro): """在同步上下文中运行异步协程""" try: loop = asyncio.get_running_loop() import concurrent.futures with concurrent.futures.ThreadPoolExecutor() as executor: future = executor.submit(asyncio.run, coro) return future.result() except RuntimeError: return asyncio.run(coro) # 初始化引擎和检测器 _ai_engine = None def _get_ai_engine(): global _ai_engine if _ai_engine is None: task_info = TaskFileInfo( file_info={ "file_id": "test_sensitive", "callback_task_id": "test_task", "user_id": "test_user", "review_config": ["sensitive_check"], } ) _ai_engine = AIReviewEngine(task_file_info=task_info) return _ai_engine async def do_sensitive_check(review_content: str) -> dict: """执行敏感词检查(AC 自动机 + LLM 二审)""" trace_id = f"sensitive_check_web_{int(time.time() * 1000)}" engine = _get_ai_engine() logger.info(f"[敏感词Web测试] trace_id={trace_id}, content_length={len(review_content)}") start = time.time() result = await engine.check_sensitive( trace_id_idx=f"_web_{int(start * 1000)}", review_content=review_content, state=None, stage_name=None, ) wall_time = time.time() - start return { "trace_id": trace_id, "success": result.success, "details": result.details, "error_message": result.error_message, "model_execution_time": result.execution_time, "wall_time": round(wall_time, 3), "content_length": len(review_content), } class SensitiveCheckHandler(SimpleHTTPRequestHandler): """HTTP请求处理器""" def do_GET(self): parsed = urlparse(self.path) if parsed.path == '/api/health': self.send_json_response({"status": "ok"}) elif parsed.path in ('', '/', '/index.html'): index_path = os.path.join(os.path.dirname(__file__), 'sensitive_check_test.html') self.serve_file(index_path, 'text/html') else: super().do_GET() def do_POST(self): parsed = urlparse(self.path) if parsed.path == '/api/sensitive_check': content_length = int(self.headers.get('Content-Length', 0)) post_data = self.rfile.read(content_length) try: body = json.loads(post_data.decode('utf-8')) review_content = body.get('content', '') if not review_content: self.send_json_response({"error": "请提供 content 参数"}, 400) return print(f"\n[敏感词Web测试] 收到请求, content_length={len(review_content)}") result = run_async(do_sensitive_check(review_content)) print(f"[敏感词Web测试] 完成, success={result['success']}, wall_time={result['wall_time']}s") self.send_json_response(result) except json.JSONDecodeError: self.send_json_response({"error": "JSON解析失败"}, 400) except Exception as e: logger.error(f"[敏感词Web测试] 处理失败: {e}", exc_info=True) self.send_json_response({"error": str(e)}, 500) else: self.send_json_response({"error": "Not Found"}, 404) def do_OPTIONS(self): self.send_response(200) self.send_header('Access-Control-Allow-Origin', '*') self.send_header('Access-Control-Allow-Methods', 'GET, POST, OPTIONS') self.send_header('Access-Control-Allow-Headers', 'Content-Type') self.end_headers() def send_json_response(self, data, status=200): self.send_response(status) self.send_header('Content-Type', 'application/json; charset=utf-8') self.send_header('Access-Control-Allow-Origin', '*') self.end_headers() self.wfile.write(json.dumps(data, ensure_ascii=False, indent=2).encode('utf-8')) def serve_file(self, filepath: str, content_type: str): if os.path.exists(filepath): self.send_response(200) self.send_header('Content-Type', f'{content_type}; charset=utf-8') self.end_headers() with open(filepath, 'rb') as f: self.wfile.write(f.read()) else: self.send_json_response({"error": f"文件不存在: {filepath}"}, 404) def end_headers(self): self.send_header('Access-Control-Allow-Origin', '*') super().end_headers() def run_server(port=8021): # 初始化敏感词检测器 try: stats = SensitiveWordChecker.initialize() print(f"敏感词检测器初始化完成: {stats}") except Exception as e: print(f"敏感词检测器初始化失败: {e}") server = HTTPServer(('0.0.0.0', port), SensitiveCheckHandler) print(f"\n{'='*70}") print(f" 敏感词审查 — 前端测试服务器") print(f"{'='*70}") print(f" 访问地址: http://localhost:{port}") print(f" API端点: POST /api/sensitive_check") print(f"{'='*70}\n") server.serve_forever() if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description='敏感词审查前端测试服务器') parser.add_argument('--port', type=int, default=8021, help='服务端口 (默认: 8021)') args = parser.parse_args() run_server(args.port)