| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- """
- 词句语法审查 — 前端测试服务器
- 提供独立 HTTP API,直接调用 GrammarCheckReviewer
- """
- import sys
- import os
- import json
- import time
- import asyncio
- from http.server import HTTPServer, SimpleHTTPRequestHandler
- from urllib.parse import urlparse
- PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
- os.chdir(PROJECT_ROOT)
- from foundation.observability.logger.loggering import review_logger as logger
- from core.construction_review.component.reviewers.grammar_check_reviewer import GrammarCheckReviewer
- def run_async(coro):
- """在同步上下文中运行异步协程"""
- try:
- loop = asyncio.get_running_loop()
- import concurrent.futures
- with concurrent.futures.ThreadPoolExecutor() as executor:
- future = executor.submit(asyncio.run, coro)
- return future.result()
- except RuntimeError:
- return asyncio.run(coro)
- async def do_grammar_check(review_content: str, enable_thinking: bool = False) -> dict:
- """
- 执行词句语法审查
- 直接调用 GrammarCheckReviewer.check_grammar
- """
- trace_id = f"grammar_check_web_{int(time.time() * 1000)}"
- reviewer = GrammarCheckReviewer()
- logger.info(f"[词句语法Web测试] trace_id={trace_id}, content_length={len(review_content)}, enable_thinking={enable_thinking}")
- start = time.time()
- result = await reviewer.check_grammar(
- trace_id=trace_id,
- review_content=review_content,
- state=None,
- stage_name=None,
- enable_thinking=enable_thinking,
- )
- wall_time = time.time() - start
- return {
- "trace_id": trace_id,
- "success": result.success,
- "details": result.details,
- "error_message": result.error_message,
- "model_execution_time": result.execution_time,
- "wall_time": round(wall_time, 3),
- "content_length": len(review_content),
- }
- class GrammarCheckHandler(SimpleHTTPRequestHandler):
- """HTTP请求处理器"""
- def do_GET(self):
- parsed = urlparse(self.path)
- if parsed.path == '/api/health':
- self.send_json_response({"status": "ok"})
- elif parsed.path in ('', '/', '/index.html'):
- index_path = os.path.join(os.path.dirname(__file__), 'grammar_check_test.html')
- self.serve_file(index_path, 'text/html')
- else:
- super().do_GET()
- def do_POST(self):
- parsed = urlparse(self.path)
- if parsed.path == '/api/grammar_check':
- content_length = int(self.headers.get('Content-Length', 0))
- post_data = self.rfile.read(content_length)
- try:
- body = json.loads(post_data.decode('utf-8'))
- review_content = body.get('content', '')
- enable_thinking = body.get('enable_thinking', False)
- if not review_content:
- self.send_json_response({"error": "请提供 content 参数"}, 400)
- return
- print(f"\n[词句语法Web测试] 收到请求, content_length={len(review_content)}, enable_thinking={enable_thinking}")
- result = run_async(do_grammar_check(review_content, enable_thinking))
- print(f"[词句语法Web测试] 完成, success={result['success']}, wall_time={result['wall_time']}s")
- self.send_json_response(result)
- except json.JSONDecodeError:
- self.send_json_response({"error": "JSON解析失败"}, 400)
- except Exception as e:
- logger.error(f"[词句语法Web测试] 处理失败: {e}", exc_info=True)
- self.send_json_response({"error": str(e)}, 500)
- else:
- self.send_json_response({"error": "Not Found"}, 404)
- def do_OPTIONS(self):
- self.send_response(200)
- self.send_header('Access-Control-Allow-Origin', '*')
- self.send_header('Access-Control-Allow-Methods', 'GET, POST, OPTIONS')
- self.send_header('Access-Control-Allow-Headers', 'Content-Type')
- self.end_headers()
- def send_json_response(self, data, status=200):
- self.send_response(status)
- self.send_header('Content-Type', 'application/json; charset=utf-8')
- self.send_header('Access-Control-Allow-Origin', '*')
- self.end_headers()
- self.wfile.write(json.dumps(data, ensure_ascii=False, indent=2).encode('utf-8'))
- def serve_file(self, filepath: str, content_type: str):
- if os.path.exists(filepath):
- self.send_response(200)
- self.send_header('Content-Type', f'{content_type}; charset=utf-8')
- self.end_headers()
- with open(filepath, 'rb') as f:
- self.wfile.write(f.read())
- else:
- self.send_json_response({"error": f"文件不存在: {filepath}"}, 404)
- def end_headers(self):
- self.send_header('Access-Control-Allow-Origin', '*')
- super().end_headers()
- def run_server(port=8022):
- server = HTTPServer(('0.0.0.0', port), GrammarCheckHandler)
- print(f"\n{'='*70}")
- print(f" 词句语法审查 — 前端测试服务器")
- print(f"{'='*70}")
- print(f" 访问地址: http://localhost:{port}")
- print(f" API端点: POST /api/grammar_check")
- print(f"{'='*70}\n")
- server.serve_forever()
- if __name__ == "__main__":
- import argparse
- parser = argparse.ArgumentParser(description='词句语法审查前端测试服务器')
- parser.add_argument('--port', type=int, default=8022, help='服务端口 (默认: 8022)')
- args = parser.parse_args()
- run_server(args.port)
|