| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194 |
- from routers.chat import _build_conversation_preview, _rag_search, _sanitize_exam_response
- from models.chat import AIConversation, AIMessage
- from database import SessionLocal
- from fastapi.responses import StreamingResponse
- import asyncio
- import time
- import json
- import re
- from fastapi import APIRouter, Depends, Request
- from sqlalchemy.orm import Session
- from sqlalchemy.exc import OperationalError
- from pydantic import BaseModel, Field
- from typing import Optional
- from database import get_db
- from services.qwen_service import qwen_service
- from utils.config import settings
- from utils.logger import logger
- router = APIRouter()
- class QuestionTypeItem(BaseModel):
- questionType: str = ""
- name: str = ""
- count: int = 0
- questionCount: int = 0
- scorePerQuestion: int = 0
- romanNumeral: str = ""
- class BuildPromptRequest(BaseModel):
- mode: str = ""
- client: str = ""
- projectType: str = ""
- examTitle: str = ""
- totalScore: int = 0
- questionTypes: list[QuestionTypeItem] = Field(default_factory=list)
- pptContent: str = ""
- requireBasis: bool = False
- def _get_exam_section(payload: dict, question_type: str) -> Optional[dict]:
- if not isinstance(payload, dict):
- return None
- question_map = {
- "单选题": ("singleChoice", "single_choice", "single"),
- "判断题": ("judge",),
- "多选题": ("multiple", "multiple_choice"),
- "简答题": ("short", "short_answer"),
- }
- keys = question_map.get(question_type, ())
- questions_obj = payload.get("questions")
- for key in keys:
- section = payload.get(key)
- if isinstance(section, dict):
- return section
- if isinstance(questions_obj, dict):
- nested = questions_obj.get(key)
- if isinstance(nested, dict):
- return nested
- return None
- def _get_section_question_count(section: Optional[dict]) -> int:
- if not isinstance(section, dict):
- return 0
- questions = section.get("questions")
- if not isinstance(questions, list):
- return 0
- return len(questions)
- def _get_knowledge_search_api_url() -> str:
- aichat_config = getattr(settings, "aichat", None)
- aichat_base_url = getattr(aichat_config, "api_url", "").rstrip("/")
- if aichat_base_url:
- return f"{aichat_base_url}/knowledge/search"
- return "http://127.0.0.1:28002/api/v1/knowledge/search"
- def _save_exam_messages_with_fresh_session(
- conv_id: int,
- user_id: int,
- request_payload: dict,
- exam_payload: dict,
- ) -> None:
- last_error = None
- for attempt in range(2):
- save_db = SessionLocal()
- try:
- now = int(time.time())
- user_msg = AIMessage(
- ai_conversation_id=conv_id,
- user_id=user_id,
- type="user",
- content=json.dumps(request_payload, ensure_ascii=False),
- created_at=now,
- updated_at=now,
- is_deleted=0,
- )
- save_db.add(user_msg)
- save_db.flush()
- ai_msg = AIMessage(
- ai_conversation_id=conv_id,
- user_id=user_id,
- type="ai",
- content=json.dumps(exam_payload, ensure_ascii=False),
- prev_user_id=user_msg.id,
- created_at=now,
- updated_at=now,
- is_deleted=0,
- )
- save_db.add(ai_msg)
- save_db.commit()
- return
- except OperationalError as e:
- save_db.rollback()
- last_error = e
- logger.warning(
- f"[exam/generate_stream] 保存试卷时数据库连接异常,准备重试: "
- f"attempt={attempt + 1}/2, detail={repr(e)}")
- if attempt == 1:
- raise
- except Exception:
- save_db.rollback()
- raise
- finally:
- save_db.close()
- if last_error:
- raise last_error
- def _ensure_exam_conversation_with_fresh_session(
- user_id: int,
- exam_title: str,
- ai_conversation_id: Optional[int],
- ) -> int:
- last_error = None
- for attempt in range(2):
- db = SessionLocal()
- try:
- now = int(time.time())
- preview = _build_conversation_preview(
- exam_title or "智能生成试卷", limit=100)
- if not ai_conversation_id:
- conversation = AIConversation(
- user_id=user_id,
- content=preview,
- business_type=3,
- exam_name=exam_title,
- created_at=now,
- updated_at=now,
- is_deleted=0,
- )
- db.add(conversation)
- db.commit()
- db.refresh(conversation)
- return conversation.id
- db.query(AIConversation).filter(
- AIConversation.id == ai_conversation_id,
- AIConversation.user_id == user_id,
- ).update({
- "content": preview,
- "business_type": 3,
- "exam_name": exam_title,
- "updated_at": now,
- })
- db.commit()
- return ai_conversation_id
- except OperationalError as e:
- db.rollback()
- last_error = e
- logger.warning(
- f"[exam/generate_stream] 初始化试卷会话时数据库连接异常,准备重试: "
- f"attempt={attempt + 1}/2, detail={repr(e)}")
- if attempt == 1:
- raise
- except Exception:
- db.rollback()
- raise
- finally:
- db.close()
- if last_error:
- raise last_error
- raise RuntimeError("初始化试卷会话失败")
- def _fallback_exam_title(source_text: str) -> str:
- text = (source_text or "").strip()
- if not text:
- return "智能生成试卷"
- text = re.sub(r"用户指定的主题/关键词[::]\s*", "", text)
- text = re.sub(r"以下是从知识库中检索到的相关原文片段,请严格基于这些原文片段出题[::]?\s*", "", text)
- text = re.sub(r"\s+", " ", text).strip()
- text = text.strip("`\"'“”‘’[]【】()(){}<>")
- for sep in ("。", ";", ";", ",", ",", "\n", ":", ":"):
- if sep in text:
- text = text.split(sep, 1)[0].strip()
- break
- text = re.sub(r"[^\u4e00-\u9fa5A-Za-z0-9]", "", text)
- if not text:
- return "智能生成试卷"
- return _refine_exam_title_candidate(text) or "智能生成试卷"
- def _refine_exam_title_candidate(candidate: str) -> str:
- text = re.sub(r"\s+", "", (candidate or "").strip())
- text = text.strip("`\"'“”‘’[]【】()(){}<>")
- text = re.sub(r"[^\u4e00-\u9fa5A-Za-z0-9]", "", text)
- if not text:
- return ""
- for marker in (
- "仅供内部交流学习",
- "仅供内部交流",
- "请勿外传",
- "讲授人",
- "授课人",
- "主讲人",
- "时间",
- "日期",
- "联系电话",
- "联系方式",
- ):
- idx = text.find(marker)
- if idx > 0:
- text = text[:idx]
- break
- for prefix in ("关于", "有关", "针对", "围绕", "基于", "依据", "结合", "开展", "组织"):
- if text.startswith(prefix) and len(text) - len(prefix) >= 4:
- text = text[len(prefix):]
- break
- suffixes = (
- "相关知识要点", "相关管理要求", "相关技术要求", "相关施工要求", "相关安全要求",
- "培训考试题库", "培训考试", "考试题库", "试题题库", "培训题库",
- "的培训考核", "的培训考试", "的考试题库", "的考试", "的考核", "的考查", "的测验", "的测试", "的练习",
- "相关内容", "主要内容", "核心内容", "培训内容", "培训要点", "知识要点", "基础知识",
- "管理要求", "技术要求", "施工要求", "安全要求", "作业要求", "实施要求",
- "工作要点", "控制要点", "操作要点", "注意事项", "质量要求", "验收要求",
- "考试内容", "试题内容", "考试试题", "考试重点", "培训重点",
- "管理规定", "技术规定", "施工规定", "安全规定",
- "工作方案", "专项方案", "管理办法", "技术措施",
- "相关知识", "基本要求", "有关要求", "总体要求",
- "考核", "考查", "测验", "测试", "练习", "试卷", "考试", "题库", "试题", "内容", "要求",
- )
- changed = True
- while changed and text:
- changed = False
- for suffix in suffixes:
- if text.endswith(suffix) and len(text) - len(suffix) >= 4:
- text = text[:-len(suffix)]
- changed = True
- break
- # 标题尽量收敛成名词短语,去掉“的”这类连接词
- text = re.sub(
- r"(?<=[\u4e00-\u9fa5A-Za-z0-9])的(?=[\u4e00-\u9fa5A-Za-z0-9])",
- "",
- text,
- )
- text = text.strip("的及与和等")
- if len(text) > 15:
- text = text[:15]
- return text if len(text) >= 2 else ""
- def _split_basis_sources(source_text: str) -> tuple[str, str]:
- text = _normalize_exam_text((source_text or "").strip())
- if not text:
- return "", ""
- user_match = re.search(
- r"用户输入依据[::]\s*([\s\S]*?)(?=\n\s*PPT提取内容[::]|\Z)",
- text,
- )
- ppt_match = re.search(r"PPT提取内容[::]\s*([\s\S]*)\Z", text)
- user_text = (user_match.group(1) if user_match else "").strip()
- ppt_text = (ppt_match.group(1) if ppt_match else "").strip()
- if user_text or ppt_text:
- return user_text, ppt_text
- return text, ""
- def _extract_basis_candidate_lines(source_text: str, max_lines: int = 8) -> list[str]:
- text = _normalize_exam_text((source_text or "").strip())
- if not text:
- return []
- candidates = []
- seen = set()
- metadata_patterns = (
- r"^PPT文件信息",
- r"^PPT第一页内容提取结果",
- r"^提取的文本内容",
- r"^WPS",
- r".*全屏显示.*",
- r".*幻灯片放映.*",
- r"^DocumentProperties$",
- r"^DocumentSecurity$",
- r"^DocumentProtection$",
- r"^文件名[::]",
- r"^文件大小[::]",
- r"^文件类型[::]",
- r"^修改时间[::]",
- r"^PPT正文提取失败",
- r"^请手动补充",
- r"^请手动描述",
- r"^您可以描述",
- r"^仅供内部交流",
- r"^请勿外传",
- r"^讲授人[::]",
- r"^授课人[::]",
- r"^主讲人[::]",
- r"^时间[::]",
- r"^日期[::]",
- r"^联系电话[::]",
- r"^联系方式[::]",
- r"^[0-9]+[.、]",
- )
- for raw_line in text.splitlines():
- line = raw_line.strip()
- if not line:
- continue
- line = re.sub(
- r"^(用户输入依据|PPT提取内容|出题依据内容|出题依据|培训主题|主题)[::]\s*", "", line)
- line = re.sub(r"^第\s*\d+\s*(页|张|部分|章|节)[::]?\s*", "", line)
- line = re.sub(r"\.(ppt|pptx)$", "", line, flags=re.IGNORECASE)
- if any(re.search(pattern, line, re.IGNORECASE) for pattern in metadata_patterns):
- continue
- line = re.sub(r"\s+", "", line)
- if len(line) < 2:
- continue
- if re.fullmatch(r"[0-9A-Za-z_-]+", line):
- continue
- if "文件名" in line or "PPT文件信息" in line or "WPS" in line:
- continue
- if line not in seen:
- candidates.append(line)
- seen.add(line)
- if len(candidates) >= max_lines:
- break
- return candidates
- def _build_knowledge_search_query(source_text: str, project_type: str = "") -> str:
- text = _normalize_exam_text((source_text or "").strip())
- if not text:
- return ""
- user_text, ppt_text = _split_basis_sources(text)
- candidates = []
- seen = set()
- title_candidate = _extract_exam_title_from_source(text, project_type)
- if title_candidate and title_candidate != "智能生成试卷":
- candidates.append(title_candidate)
- seen.add(title_candidate)
- for block in filter(None, [user_text, ppt_text, text]):
- for line in _extract_basis_candidate_lines(block, max_lines=6):
- refined = _refine_exam_title_candidate(line)
- if len(refined) < 2:
- continue
- if refined not in seen:
- candidates.append(refined)
- seen.add(refined)
- if len(candidates) >= 5:
- break
- if len(candidates) >= 5:
- break
- return " ".join(candidates[:5]).strip()
- def _extract_exam_title_from_source(source_text: str, project_type: str = "") -> str:
- text = (source_text or "").strip()
- if not text:
- return "智能生成试卷"
- text = _normalize_exam_text(text)
- text = re.sub(r"用户指定的主题/关键词[::]\s*", "", text)
- text = re.sub(r"^(出题依据内容|出题依据|培训主题|主题)[::]\s*", "", text)
- text = re.sub(r"以下是从知识库中检索到的相关原文片段,请严格基于这些原文片段出题[::]?\s*", "", text)
- text = re.sub(r"[ \t]+", " ", text).strip()
- candidate_pool = []
- for line in _extract_basis_candidate_lines(text, max_lines=8):
- candidate_pool.append(line)
- for marker in ("\n\n", "\n", "。", ";", ";"):
- if marker in text:
- head = text.split(marker, 1)[0].strip()
- if head:
- candidate_pool.append(head)
- break
- stop_phrases = (
- "出题依据", "正文", "章节条款", "文件名", "答案解析", "要求", "规定",
- "内容", "相关", "进行", "采用", "包括", "本项目", "本工程", "本次",
- "施工", "安全", "管理", "技术", "规范", "标准", "方案", "办法", "措施",
- "PPT文件信息", "PPT第一页内容提取结果", "提取的文本内容",
- "WPS演示", "全屏显示", "幻灯片放映",
- "仅供内部交流学习", "仅供内部交流", "请勿外传", "讲授人", "授课人", "主讲人",
- "时间", "日期", "联系电话", "联系方式",
- )
- candidates = []
- for piece in candidate_pool + re.split(r"[,,、/()()\-::]", text):
- cleaned = re.sub(r"\s+", "", piece)
- cleaned = re.sub(r"^[0-9A-Za-z一二三四五六七八九十.]+$", "", cleaned)
- cleaned = re.sub(r"[^\u4e00-\u9fa5A-Za-z0-9]", "", cleaned)
- if len(cleaned) < 2:
- continue
- if cleaned in stop_phrases:
- continue
- candidates.append(cleaned)
- if candidates:
- def score(candidate: str) -> tuple[int, int]:
- keyword_bonus = sum(
- 2 for token in ("桥梁", "隧道", "桩基", "桩基础", "钢筋", "混凝土", "施工", "安全", "验收", "培训", "作业", "起重", "便桥")
- if token in candidate
- )
- return (keyword_bonus, min(len(candidate), 15))
- best = max(candidates, key=score)
- best = _refine_exam_title_candidate(best)
- if len(best) >= 2:
- return best
- prefix = _fallback_exam_title(text)
- if prefix != "智能生成试卷":
- return prefix
- project_prefix = re.sub(
- r"[^\u4e00-\u9fa5A-Za-z0-9]", "", (project_type or "").strip())
- if project_prefix:
- return f"{project_prefix[:8]}试卷"
- return "智能生成试卷"
- def _build_exam_section_example(
- question_type: str,
- count: int,
- score: int,
- basis_enabled: bool,
- ) -> str:
- basis_field = ', "basis": "文件名:...;章节条款:...;正文:..."' if basis_enabled else ""
- total_score = count * score
- if question_type == "单选题":
- return (
- f'{{"singleChoice": {{"scorePerQuestion": {score}, "totalScore": {total_score}, '
- f'"count": {count}, "questions": [{{"text": "题干", "options": [{{"key": "A", "text": "内容A"}}], '
- f'"selectedAnswer": "A", "analysis": "解析"{basis_field}}}]}}}}'
- )
- if question_type == "多选题":
- return (
- f'{{"multiple": {{"scorePerQuestion": {score}, "totalScore": {total_score}, '
- f'"count": {count}, "questions": [{{"text": "题干", "options": [{{"key": "A", "text": "内容A"}}], '
- f'"selectedAnswers": ["A", "B"], "analysis": "解析"{basis_field}}}]}}}}'
- )
- if question_type == "判断题":
- return (
- f'{{"judge": {{"scorePerQuestion": {score}, "totalScore": {total_score}, '
- f'"count": {count}, "questions": [{{"text": "题干", "selectedAnswer": "正确", '
- f'"analysis": "解析"{basis_field}}}]}}}}'
- )
- return (
- f'{{"short": {{"scorePerQuestion": {score}, "totalScore": {total_score}, '
- f'"count": {count}, "questions": [{{"text": "题干", "outline": {{"keyFactors": "答题要点"}}, '
- f'"analysis": "解析"{basis_field}}}]}}}}'
- )
- def _normalize_exam_text(value: str) -> str:
- text = (value or "").strip()
- if not text:
- return text
- text = text.replace("章节名/条款名", "章节条款")
- text = text.replace("\\sim", "~")
- text = text.replace("$", "")
- text = re.sub(r"\\mathrm\s*\{\s*([^{}]+?)\s*\}",
- lambda m: re.sub(r"\s+", "", m.group(1)), text)
- text = re.sub(r"([A-Za-z])\s*\^\s*\{\s*2\s*\}", r"\1²", text)
- text = re.sub(r"([A-Za-z])\s*\^\s*\{\s*3\s*\}", r"\1³", text)
- text = re.sub(r"(?<=\d)\s+(?=\d)", "", text)
- text = re.sub(r"(?<=\d)\s*\.\s*(?=\d)", ".", text)
- text = re.sub(r"(?<=\d)\s*~\s*(?=\d)", "~", text)
- text = re.sub(r"(?<=\d)\s*([a-zA-Zµμ%℃°²³]+)\b", r" \1", text)
- text = re.sub(r"\b([A-Za-z])\s+([²³])\b", r"\1\2", text)
- text = re.sub(r"\s+", " ", text)
- return text.strip()
- def _normalize_exam_payload_texts(value):
- if isinstance(value, dict):
- return {key: _normalize_exam_payload_texts(val) for key, val in value.items()}
- if isinstance(value, list):
- return [_normalize_exam_payload_texts(item) for item in value]
- if isinstance(value, str):
- return _normalize_exam_text(value)
- return value
- def _parse_exam_section_payload(raw_response: str, question_type: str) -> tuple[dict, int]:
- cleaned = _sanitize_exam_response(raw_response)
- parsed = _normalize_exam_payload_texts(json.loads(cleaned))
- section = _get_exam_section(parsed, question_type)
- actual_count = _get_section_question_count(section)
- return parsed, actual_count
- async def _repair_exam_section_payload(
- raw_response: str,
- question_type: str,
- count: int,
- score: int,
- basis_enabled: bool,
- ) -> Optional[tuple[dict, int]]:
- source_text = (raw_response or "").strip()
- if not source_text:
- return None
- repair_prompt = (
- f"下面是一段为【{question_type}】生成的原始内容,其中可能夹杂 Thinking Process、说明文字或不合法 JSON。\n"
- "任务:不要重新出题,只对已有内容做结构化整理,输出一个可被 json.loads 直接解析的 JSON 对象。\n"
- "要求:\n"
- f"1. 顶层只保留当前题型对应字段,count 应写为 {count}。\n"
- "2. 只整理原有题目内容,不要新增解释、前言、后记、markdown 代码块或推理过程。\n"
- "3. 如果原文中已经有 analysis、basis、options、selectedAnswer/selectedAnswers 等字段,尽量原样保留。\n"
- "4. 如果存在 basis 字段,必须保留其原始语言和原始含义;知识库原文是中文时,basis 中的文件名、章节条款、正文都必须保持中文,不得翻译成英文,不得改写为英文摘要。\n"
- "5. 如果原文内容本身不足指定题量,不要臆造新题,按已有内容整理即可。\n"
- "6. 最终回复必须以 { 开头、以 } 结尾。\n"
- f"目标 JSON 结构示例:{_build_exam_section_example(question_type, count, score, basis_enabled)}\n"
- f"原始内容如下:\n{source_text[:6000]}"
- )
- try:
- repaired_response = await qwen_service.chat(
- [{"role": "user", "content": repair_prompt}],
- disable_reasoning=True,
- )
- parsed, actual_count = _parse_exam_section_payload(
- repaired_response, question_type)
- logger.info(
- f"[exam/generate_stream] {question_type}轻量修复成功: repaired_count={actual_count}"
- )
- return parsed, actual_count
- except Exception as e:
- logger.warning(
- f"[exam/generate_stream] {question_type}轻量修复失败: {repr(e)}"
- )
- return None
- async def _resolve_exam_title(
- user_title: str,
- title_source: str,
- project_type: str = "",
- ) -> str:
- manual_title = (user_title or "").strip()
- if manual_title:
- return manual_title
- source_text = (title_source or "").strip()
- if not source_text:
- return "智能生成试卷"
- resolved = _extract_exam_title_from_source(source_text, project_type)
- logger.info(
- f"[exam/title] 基于用户输入出题依据提取试卷标题: source_len={len(source_text)}, title={resolved}"
- )
- return resolved
- async def _fetch_knowledge_docs(query_str: str, log_prefix: str) -> Optional[list[str]]:
- import httpx
- search_api_url = _get_knowledge_search_api_url()
- timeout = httpx.Timeout(20.0, connect=5.0)
- last_error = None
- for attempt in range(2):
- try:
- async with httpx.AsyncClient(timeout=timeout) as client:
- resp = await client.post(
- search_api_url,
- json={"query_str": query_str, "n": 20}
- )
- if resp.status_code != 200:
- logger.error(
- f"[{log_prefix}] 知识库检索API响应错误: url={search_api_url}, "
- f"status={resp.status_code}, body={resp.text}")
- return None
- search_data = resp.json()
- results = search_data.get("results")
- if results is None:
- results = search_data.get("data", [])
- if isinstance(results, dict):
- results = results.get("items", [])
- if not isinstance(results, list):
- results = []
- retrieved_docs = []
- for res in results:
- if not isinstance(res, dict):
- continue
- doc_text = res.get("document") or res.get(
- "content") or res.get("text")
- if doc_text:
- retrieved_docs.append(doc_text)
- return retrieved_docs
- except httpx.ReadTimeout as e:
- last_error = e
- if attempt == 0:
- logger.warning(
- f"[{log_prefix}] 知识库检索读取超时,准备重试: "
- f"url={search_api_url}, timeout=20s, attempt={attempt + 1}/2")
- continue
- logger.exception(
- f"[{log_prefix}] 知识库检索失败: url={search_api_url}, "
- f"exc_type={type(e).__name__}, detail={repr(e)}")
- return None
- except Exception as e:
- last_error = e
- logger.exception(
- f"[{log_prefix}] 知识库检索失败: url={search_api_url}, "
- f"exc_type={type(e).__name__}, detail={repr(e)}")
- return None
- if last_error:
- logger.exception(
- f"[{log_prefix}] 知识库检索失败: url={search_api_url}, "
- f"exc_type={type(last_error).__name__}, detail={repr(last_error)}")
- return None
- @router.post("/exam/build_prompt")
- async def build_exam_prompt(
- request: Request,
- data: BuildPromptRequest,
- db: Session = Depends(get_db)
- ):
- """根据前端考试工坊参数生成提示词"""
- user = request.state.user
- if not user:
- return {"statusCode": 401, "msg": "未授权"}
- question_desc = []
- total_count = 0
- for item in data.questionTypes:
- count = item.count or item.questionCount or 0
- score = item.scorePerQuestion or 0
- qtype = item.questionType or item.name or "未命名题型"
- total_count += count
- question_desc.append(f"{qtype}{count}道,每道{score}分")
- question_text = ";".join(question_desc) if question_desc else "题型未提供"
- question_schema_lines = []
- for item in data.questionTypes:
- count = item.count or item.questionCount or 0
- score = item.scorePerQuestion or 0
- qtype = item.questionType or item.name or "未命名题型"
- if count <= 0:
- continue
- question_schema_lines.append(f"- {qtype}: {count}道,每道{score}分")
- question_schema = "\n".join(
- question_schema_lines) if question_schema_lines else "- 未提供有效题型"
- ppt_content = (data.pptContent or "").strip()
- retrieval_query = _build_knowledge_search_query(
- ppt_content, data.projectType)
- combined_source_mode = "用户输入依据:" in ppt_content and "PPT提取内容:" in ppt_content
- # === 新增:如果出题依据不是真正的长文本,而是关键词,则调用远端知识库检索服务 ===
- # 启发式判断:如果文本长度小于 50 字,且不包含明显的段落换行,认为它是关键词,去查知识库
- if ppt_content and (
- (len(ppt_content) < 50 and "\n" not in ppt_content)
- or (combined_source_mode and retrieval_query)
- ):
- query_str = retrieval_query or ppt_content
- logger.info(
- f"[exam/build_prompt] 检测到可检索出题依据,尝试调用知识库检索API: query='{query_str}'")
- retrieved_docs = await _fetch_knowledge_docs(query_str, "exam/build_prompt")
- if retrieved_docs:
- retrieved_text = "\n\n---\n\n".join(retrieved_docs)
- logger.info(
- f"[exam/build_prompt] 知识库检索成功,拼接了 {len(retrieved_docs)} 个文档块,总长度 {len(retrieved_text)}")
- # 覆盖原来的 ppt_content,改为:用户关键词 + 检索到的真实知识库内容
- ppt_content = (
- f"用户指定的主题/关键词:{query_str}\n\n"
- f"原始出题依据:\n{text[:2000] if (text := (data.pptContent or '').strip()) else '无'}\n\n"
- "以下是从知识库中检索到的相关原文片段,请严格基于这些原文片段出题:\n\n"
- f"{retrieved_text}"
- )
- elif retrieved_docs == []:
- logger.warning(
- f"[exam/build_prompt] 知识库中未检索到与 '{query_str}' 相关的文档块")
- ppt_content = f"(注:未能在知识库中检索到相关文档,请仅根据以下关键词及原始依据出题:{query_str}\n\n{data.pptContent or ''})"
- if ppt_content:
- max_chars = 12000
- if len(ppt_content) > max_chars:
- head_len = max_chars // 2
- tail_len = max_chars - head_len
- ppt_content = (
- ppt_content[:head_len]
- + "\n\n(出题依据内容过长,已截断,以下为结尾片段)\n\n"
- + ppt_content[-tail_len:]
- )
- logger.info(
- f"[exam/build_prompt] pptContent truncated: original_len={len(data.pptContent)} kept_len={len(ppt_content)}"
- )
- basis_field = ', "basis": "<文件名:...;章节条款:...;正文:...>"' if data.requireBasis else ''
- basis_instruction = (
- "【出题依据要求】:每道题必须附带一个 'basis' 字段。\n"
- "basis 必须严格按以下顺序组织:先写相关文件名,再写章节条款,最后写与题目直接相关的正文原文内容。\n"
- "推荐格式为:“文件名:xxx;章节条款:xxx;正文:xxx”。\n"
- "basis 只能填写知识库中的原文依据,不得改写成题目,不得出现题干句式,不得包含选项内容,不得直接写出正确答案、错误答案、解析结论或“应选A/应选B/正确/错误”等判断结果。\n"
- "basis 应尽量保持知识库原文原貌,不得做摘要、润色、优化、同义替换、翻译或重组;知识库原文如果是中文,basis 也必须保持中文原文,不得改写成英文或中英混杂表述;如果缺少文件名或章节条款,也必须保留固定标识位,分别写为“文件名:未标注”“章节条款:未标注”。\n"
- "答案解析请放在独立的 analysis 字段中,不要混入 basis。\n"
- ) if data.requireBasis else ""
- prompt = (
- "请根据以下要求直接生成一份完整试卷,并严格返回纯 JSON,不要输出 markdown 代码块、解释说明或额外文字。\n"
- f"生成模式:{data.mode or '未指定'}\n"
- f"客户端:{data.client or '未指定'}\n"
- f"项目类型:{data.projectType or '未指定'}\n"
- f"考试标题:{data.examTitle if data.examTitle else '未提供。请你仔细阅读出题依据内容,高度凝练其核心主题(不要生硬拼凑前缀),生成一个不超过15个字的贴切的试卷名称。特别注意:如果试卷名称中包含公司或组织名称,要么完全省略不写,要么必须使用完整的全称(例如:如果原内容是“蜀道矿业集团”,必须写“蜀道矿业集团”,绝不能擅自简写为“蜀道矿业”)'}\n"
- f"总分:{data.totalScore or 0}\n"
- f"总题量:{total_count}\n"
- f"题型要求:{question_text}\n"
- f"出题依据内容:{ppt_content or '无'}\n"
- "出题依据内容是本次试卷的核心来源,所有题目必须围绕该内容中的知识点、术语、流程、规范要求和场景展开。\n"
- "如果出题依据内容中出现了章节、条款、培训主题或专业术语,题目必须优先考查这些内容,不能偏离到无关知识。\n"
- "单选题、多选题、判断题和简答题的题干、选项、答案解析都要与出题依据内容直接相关,不能泛泛而谈。\n"
- "请结合出题依据内容、工程类型和题型要求,生成有具体内容、具体选项、具体答案、具体解析的试卷。\n"
- "凡是题型配置中 count 大于 0 的题型,必须返回对应数量的非空题目,不能返回空数组,不能少题。\n"
- "即使出题依据内容较短,也要优先围绕已有内容中的关键词、术语、场景和要求组织出题,不能因为信息少而返回空题目。\n"
- "如果某题型要求生成 3 道题,就必须生成 3 道完整可作答的题目,少于要求数量视为不合格。\n"
- "禁止输出“选项A”“题目1”“桥梁工程相关单选题1”“题目内容”“解析内容”这类占位内容,所有题目必须是可直接展示和作答的真实内容。\n"
- "【极度重要的多选题防作弊要求】:\n"
- "近期发现你生成的多选题中,正确答案总是偷懒按顺序排列(比如全都包含A、全都连号如AB、ABC、ABCD)!这在真实考试中是绝对不允许的。\n"
- "你必须强制打乱正确答案的字母组合,严格遵守以下分布规则:\n"
- " - 必须有至少 30% 的题目正确答案【完全不包含A】(如 BC, CD, BD, BCD)!\n"
- " - 必须有至少 30% 的题目正确答案【跳跃分布】(如 AC, AD, BD, ABD, ACD)!\n"
- " - 包含2个正确选项的题目占比应达到 40%\n"
- " - 包含3个正确选项的题目占比应达到 40%\n"
- " - 包含4个正确选项的题目(ABCD)绝对不能超过 20%!\n"
- "【答案随机性要求】:\n"
- "1. 单选题:提供4个选项(A/B/C/D),正确答案只能是其中1个,且正确答案必须在A、B、C、D中随机分布,绝不能所有题目的正确答案都相同。\n"
- "2. 多选题:提供4个选项(A/B/C/D),正确答案的个数在2~4个之间随机,且答案组合必须随机(例如:可以是AB、AC、AD、BC、BD、CD、ABC、ABD、BCD、ABCD等),绝不能都从A开始或全都是ABCD。\n"
- "3. 判断题:正确答案必须在“正确”和“错误”之间随机分布,绝不能所有判断题的答案全都是“正确”或全都是“错误”。\n"
- f"{basis_instruction}"
- "下面的 JSON 结构示例只用于说明字段格式,示例中的字符串不能原样照抄到最终结果中,最终返回的每个字符串都必须替换成结合出题依据生成的具体内容。\n"
- "JSON 输出结构必须符合以下格式:\n"
- "{\n"
- ' "title": "试卷标题",\n'
- ' "totalScore": 100,\n'
- ' "totalQuestions": 10,\n'
- f' "singleChoice": {{"scorePerQuestion": 2, "totalScore": 20, "count": 10, "questions": [{{"text": "<单选题题干>", "options": [{{"key": "A", "text": "<选项A具体内容>"}}, {{"key": "B", "text": "<选项B具体内容>"}}, {{"key": "C", "text": "<选项C具体内容>"}}, {{"key": "D", "text": "<选项D具体内容>"}}], "answer": "A", "analysis": "<解析内容>"{basis_field}}}]}},\n'
- f' "judge": {{"scorePerQuestion": 2, "totalScore": 0, "count": 0, "questions": [{{"text": "<判断题题干>", "answer": "正确", "analysis": "<解析内容>"{basis_field}}}]}},\n'
- f' "multiple": {{"scorePerQuestion": 3, "totalScore": 0, "count": 0, "questions": [{{"text": "<多选题题干>", "options": [{{"key": "A", "text": "<选项A具体内容>"}}, {{"key": "B", "text": "<选项B具体内容>"}}, {{"key": "C", "text": "<选项C具体内容>"}}, {{"key": "D", "text": "<选项D具体内容>"}}], "answers": ["A", "C"], "analysis": "<解析内容>"{basis_field}}}]}},\n'
- f' "short": {{"scorePerQuestion": 10, "totalScore": 0, "count": 0, "questions": [{{"text": "<简答题题干>", "outline": {{"keyFactors": "<答题要点>", "measures": "<参考措施>"}}{basis_field}}}]}}\n'
- "}\n"
- "请按下面的题型配置生成对应数量的题目,没有的题型 count 返回 0、questions 返回空数组:\n"
- f"{question_schema}"
- )
- return {
- "statusCode": 200,
- "msg": "success",
- "data": {"prompt": prompt}
- }
- class BuildSinglePromptRequest(BaseModel):
- question_type: str
- topic: str
- difficulty: str
- class GenerateStreamRequest(BaseModel):
- mode: str = ""
- client: str = ""
- projectType: str = ""
- examTitle: str = ""
- totalScore: int = 0
- questionTypes: list[QuestionTypeItem] = Field(default_factory=list)
- pptContent: str = ""
- requireBasis: bool = False
- ai_conversation_id: Optional[int] = 0
- @router.post("/exam/generate_stream")
- async def generate_exam_stream(
- request: Request,
- data: GenerateStreamRequest,
- ):
- """
- 流式生成试卷(按题型分批输出)
- """
- user = request.state.user
- if not user:
- return {"statusCode": 401, "msg": "未授权"}
- async def event_generator():
- db = None
- try:
- yield f"data: {json.dumps({'type': 'progress', 'message': '正在检索知识库...', 'percent': 5}, ensure_ascii=False)}\n\n"
- # 2. 获取上下文
- raw_basis_content = (data.pptContent or "").strip()
- ppt_content = raw_basis_content
- retrieval_query = _build_knowledge_search_query(
- raw_basis_content, data.projectType)
- combined_source_mode = "用户输入依据:" in raw_basis_content and "PPT提取内容:" in raw_basis_content
- keyword_search_mode = bool(
- ppt_content and len(
- ppt_content) < 50 and "\n" not in ppt_content
- )
- retrieval_mode = bool(
- keyword_search_mode or (
- combined_source_mode and retrieval_query)
- )
- retrieval_succeeded = False
- if retrieval_mode:
- query_str = retrieval_query or ppt_content
- retrieved_docs = await _fetch_knowledge_docs(query_str, "exam/generate_stream")
- if retrieved_docs:
- logger.info(
- f"[exam/generate_stream] 知识库检索成功,拼接了 {len(retrieved_docs)} 个文档块")
- retrieval_succeeded = True
- original_basis = raw_basis_content[:
- 4000] if raw_basis_content else "无"
- ppt_content = f"用户指定的主题/关键词:{query_str}\n\n原始出题依据:\n{original_basis}\n\n以下是从知识库中检索到的相关原文片段,请严格基于这些原文片段出题:\n\n" + "\n\n---\n\n".join(
- retrieved_docs)
- elif retrieved_docs == []:
- logger.warning(
- f"[exam/generate_stream] 知识库中未检索到与 '{query_str}' 相关的文档块")
- resolved_exam_title = await _resolve_exam_title(
- user_title=data.examTitle,
- title_source=raw_basis_content,
- project_type=data.projectType,
- )
- # 1. 创建或获取对话
- conv_id = _ensure_exam_conversation_with_fresh_session(
- user_id=user.user_id,
- exam_title=resolved_exam_title,
- ai_conversation_id=data.ai_conversation_id,
- )
- yield f"data: {json.dumps({'type': 'initial', 'ai_conversation_id': conv_id, 'title': resolved_exam_title}, ensure_ascii=False)}\n\n"
- if ppt_content and len(ppt_content) > 12000:
- head_len = 6000
- tail_len = 6000
- ppt_content = ppt_content[:head_len] + \
- "\n\n(已截断)\n\n" + ppt_content[-tail_len:]
- basis_enabled = bool(
- data.requireBasis and (
- not retrieval_mode or retrieval_succeeded)
- )
- if data.requireBasis and retrieval_mode and not retrieval_succeeded:
- logger.warning(
- "[exam/generate_stream] 联合关键词未成功检索到知识库原文,已禁用 basis 字段以避免模型虚构依据")
- basis_instruction = (
- "【出题依据要求】:本次未成功检索到可核验的知识库原文,因此禁止输出 'basis' 字段。\n"
- "禁止虚构规范名称、标准编号、章节条款、出处或依据内容。\n"
- )
- elif basis_enabled:
- basis_instruction = (
- "【出题依据要求】:每道题必须附带一个 'basis' 字段。\n"
- "basis 必须严格按以下顺序组织:先写相关文件名,再写章节条款,最后写与本题直接相关的正文原文内容。\n"
- "推荐格式为:“文件名:xxx;章节条款:xxx;正文:xxx”。\n"
- "basis 必须尽量保持知识库原文原貌,模型不得做摘要、润色、优化、改写、同义替换、翻译或重组,不得省略关键表述;知识库原文如果是中文,basis 也必须保持中文原文,不得改写成英文或中英混杂表述。\n"
- "如果检索结果中存在文件名或章节标题,必须显式写出;如果同一题涉及多处原文,也必须在每段原文前先写文件名,再写章节条款,最后再写正文。\n"
- "如果缺少文件名或章节名,也必须保留固定标识位,分别写为“文件名:未标注”“章节条款:未标注”。\n"
- "analysis 字段保留为独立解析字段,负责输出基于原文的答案解析;basis 本身只能是原文依据,不能混入模型总结。\n"
- )
- else:
- basis_instruction = ""
- # 过滤出需要生成的题型
- valid_types = []
- for item in data.questionTypes:
- count = item.count or item.questionCount or 0
- if count > 0:
- valid_types.append(item)
- total_types = len(valid_types)
- if total_types == 0:
- yield f"data: {json.dumps({'type': 'progress', 'message': '未配置任何题型', 'percent': 100}, ensure_ascii=False)}\n\n"
- yield "data: [DONE]\n\n"
- return
- full_exam_data = {
- "title": resolved_exam_title,
- "totalScore": data.totalScore
- }
- for index, qtype_item in enumerate(valid_types):
- count = qtype_item.count or qtype_item.questionCount
- score = qtype_item.scorePerQuestion
- name = qtype_item.questionType or qtype_item.name
- # 开始生成当前题型,进度区间起始点
- start_percent = 10 + int(80 * (index / total_types))
- yield f"data: {json.dumps({'type': 'progress', 'message': f'正在生成{name}({index+1}/{total_types})...', 'percent': start_percent}, ensure_ascii=False)}\n\n"
- prompt = (
- f"请根据以下要求直接生成【{name}】题目,并严格返回纯 JSON,不要输出 markdown 代码块、解释说明或额外文字。\n"
- f"生成模式:{data.mode or '未指定'}\n"
- f"客户端:{data.client or '未指定'}\n"
- f"项目类型:{data.projectType or '未指定'}\n"
- f"试卷标题:{resolved_exam_title}\n"
- f"出题依据内容:{ppt_content or '无'}\n"
- "出题依据内容是本次试题的核心来源,必须围绕该内容中的知识点、术语、流程、规范要求和场景展开。\n"
- f"你需要生成:{count}道【{name}】,每道{score}分。\n"
- f"{basis_instruction}\n"
- f"返回 JSON 中的 count 必须等于 {count},questions 数组必须恰好包含 {count} 个题目对象,不能只返回 1 个示例对象。\n"
- "下面的 JSON 仅用于展示字段结构,questions 内的对象格式按此扩展到要求数量。\n"
- "JSON 输出结构必须符合以下格式(根据题型返回单个字段):\n"
- )
- basis_field = ', "basis": "文件名:...;章节条款:...;正文:..."' if basis_enabled else ""
- total_score = count * score
- if name == "单选题":
- prompt += f'{{"singleChoice": {{"scorePerQuestion": {score}, "totalScore": {total_score}, "count": {count}, "questions": [{{"text": "题干", "options": [{{"key": "A", "text": "内容A"}}], "selectedAnswer": "A", "analysis": "解析"{basis_field}}}]}}}}\n'
- prompt += "注意:选项必须且只能是4个,固定为A、B、C、D,禁止出现E或更多选项。正确答案在A、B、C、D中随机。"
- elif name == "多选题":
- prompt += f'{{"multiple": {{"scorePerQuestion": {score}, "totalScore": {total_score}, "count": {count}, "questions": [{{"text": "题干", "options": [{{"key": "A", "text": "内容A"}}], "selectedAnswers": ["A", "B"], "analysis": "解析"{basis_field}}}]}}}}\n'
- prompt += (
- "注意:选项必须且只能是4个,固定为A、B、C、D,禁止出现E或更多选项。"
- "正确答案的个数必须在2~4个之间随机分布,且不能全部都为同一种数量。\n"
- "你必须严格遵守以下多选题正确答案分布规则:\n"
- f" - 在本次生成的 {count} 道多选题中,包含2个正确选项的题目占比应接近40%。\n"
- f" - 在本次生成的 {count} 道多选题中,包含3个正确选项的题目占比应接近40%。\n"
- f" - 在本次生成的 {count} 道多选题中,包含4个正确选项(ABCD)的题目占比不得超过20%。\n"
- "你必须强制打乱正确答案的字母组合,严格遵守以下规则:\n"
- " - 必须有至少30%的题目正确答案完全不包含A(如 BC、CD、BD、BCD)。\n"
- " - 必须有至少30%的题目正确答案采用跳跃分布(如 AC、AD、BD、ABD、ACD)。\n"
- " - 绝不能所有题目都从A开始,绝不能大量重复 ABC、ABD、ACD、BCD、ABCD 这类固定模式。\n"
- " - 正确答案组合必须在 AB、AC、AD、BC、BD、CD、ABC、ABD、ACD、BCD、ABCD 等形式之间充分打散。\n"
- "如果数量分布无法完全整除,也必须尽量逼近上述比例,绝不能出现全部都是3个正确选项或全部都是同一组合模式。"
- )
- elif name == "判断题":
- prompt += f'{{"judge": {{"scorePerQuestion": {score}, "totalScore": {total_score}, "count": {count}, "questions": [{{"text": "题干", "selectedAnswer": "正确", "analysis": "解析"{basis_field}}}]}}}}\n'
- prompt += (
- "注意:正确答案必须在“正确”和“错误”之间随机分布。\n"
- f"判断题必须一次性返回 {count} 道完整题目,questions 数组中必须实际展开为 {count} 个不同的题目对象,"
- "不能只给 1 个示例对象,不能让前端或调用方自行复制。"
- )
- elif name == "简答题":
- prompt += f'{{"short": {{"scorePerQuestion": {score}, "totalScore": {total_score}, "count": {count}, "questions": [{{"text": "题干", "outline": {{"keyFactors": "答题要点"}}, "analysis": "解析"{basis_field}}}]}}}}\n'
- try:
- qwen_response = ""
- parsed = None
- last_error = None
- for attempt in range(2):
- current_prompt = prompt
- if attempt == 1:
- current_prompt += (
- f"\n这是第2次重试,上一次生成的【{name}】结果不可用。\n"
- f"本次必须一次性完整返回 {count} 道【{name}】,"
- "不得少题、不得只返回示例题、不得返回 1 道占位题。\n"
- "严禁输出 Thinking Process、Reasoning、思考过程、解释说明、前言、后记、markdown 代码块或任何 JSON 之外的内容。\n"
- "你的最终回复必须以 { 开头、以 } 结尾,且整个回复只能是一个可被 json.loads 直接解析的 JSON 对象。"
- )
- if name == "判断题":
- current_prompt += (
- f"\n特别强调:你现在生成的是【判断题】。"
- f"questions 数组里必须真实返回 {count} 个判断题对象,"
- "每个对象都要有独立题干和答案,绝不能只返回 1 个对象作为模板。"
- )
- logger.warning(
- f"[exam/generate_stream] {name}首次生成结果不可用,开始第{attempt + 1}次重试")
- qwen_response = await qwen_service.chat(
- [{"role": "user", "content": current_prompt}],
- disable_reasoning=True,
- )
- try:
- parsed, actual_count = _parse_exam_section_payload(
- qwen_response, name)
- if actual_count == count:
- last_error = None
- break
- last_error = ValueError(
- f"{name}返回题量不完整,期望{count}道,实际{actual_count}道")
- logger.warning(
- f"[exam/generate_stream] {last_error}; attempt={attempt + 1}/2")
- except Exception as inner_error:
- last_error = inner_error
- repaired_payload = await _repair_exam_section_payload(
- raw_response=qwen_response,
- question_type=name,
- count=count,
- score=score,
- basis_enabled=basis_enabled,
- )
- if repaired_payload is not None:
- parsed, actual_count = repaired_payload
- if actual_count == count:
- last_error = None
- break
- last_error = ValueError(
- f"{name}轻量修复后题量仍不完整,期望{count}道,实际{actual_count}道")
- logger.warning(
- f"[exam/generate_stream] {last_error}; attempt={attempt + 1}/2")
- continue
- logger.warning(
- f"[exam/generate_stream] {name}结果解析失败,准备重试: "
- f"attempt={attempt + 1}/2, detail={inner_error!r}")
- if last_error is not None:
- raise last_error
- if resolved_exam_title and not parsed.get("title"):
- parsed["title"] = resolved_exam_title
- # 合并到完整试卷
- full_exam_data.update(parsed)
- # 当前题型生成完成,进度推到当前区间的终点
- end_percent = 10 + int(80 * ((index + 1) / total_types))
- yield f"data: {json.dumps({'type': 'progress', 'message': f'{name}生成完成...', 'percent': end_percent}, ensure_ascii=False)}\n\n"
- yield f"data: {json.dumps({'type': 'batch_data', 'questionType': name, 'data': parsed}, ensure_ascii=False)}\n\n"
- except Exception as e:
- logger.error(
- f"生成{name}失败: {e}; raw_snippet={(qwen_response or '')[:300]}")
- yield f"data: {json.dumps({'type': 'error', 'message': f'{name}生成失败,未保存残缺试卷,请重试。'}, ensure_ascii=False)}\n\n"
- return
- # 保存完整试卷到数据库
- yield f"data: {json.dumps({'type': 'progress', 'message': '正在保存试卷...', 'percent': 98}, ensure_ascii=False)}\n\n"
- request_payload = (
- data.model_dump()
- if hasattr(data, "model_dump")
- else data.dict()
- )
- _save_exam_messages_with_fresh_session(
- conv_id=conv_id,
- user_id=user.user_id,
- request_payload=request_payload,
- exam_payload=full_exam_data,
- )
- yield f"data: {json.dumps({'type': 'progress', 'message': '试卷生成完成', 'percent': 100}, ensure_ascii=False)}\n\n"
- except Exception as e:
- logger.error(f"[exam/generate_stream] 异常: {e}")
- yield f"data: {json.dumps({'type': 'error', 'message': str(e)}, ensure_ascii=False)}\n\n"
- finally:
- if db is not None:
- db.close()
- yield "data: [DONE]\n\n"
- return StreamingResponse(event_generator(), media_type="text/event-stream")
- @router.post("/exam/build_single_prompt")
- async def build_single_question_prompt(
- request: Request,
- data: BuildSinglePromptRequest,
- db: Session = Depends(get_db)
- ):
- """生成单题提示词 - 对齐Go版本函数名"""
- user = request.state.user
- if not user:
- return {"statusCode": 401, "msg": "未授权"}
- prompt = f"""请生成1道关于{data.topic}的{data.question_type},难度为{data.difficulty}。"""
- return {
- "statusCode": 200,
- "msg": "success",
- "data": {"prompt": prompt}
- }
- class ModifyQuestionRequest(BaseModel):
- ai_conversation_id: int
- content: str
- @router.post("/re_modify_question")
- async def re_modify_question(
- request: Request,
- data: ModifyQuestionRequest,
- db: Session = Depends(get_db)
- ):
- """修改考试题目 - 实际修改ai_message表"""
- user = request.state.user
- if not user:
- return {"statusCode": 401, "msg": "未授权"}
- # 修改ai_message表中type='ai'的content
- result = db.query(AIMessage).filter(
- AIMessage.ai_conversation_id == data.ai_conversation_id,
- AIMessage.type == 'ai'
- ).update({"content": data.content})
- if result == 0:
- return {"statusCode": 404, "msg": "消息不存在"}
- db.commit()
- return {"statusCode": 200, "msg": "success"}
- class ReproduceSingleQuestionRequest(BaseModel):
- message: str = ""
- ai_conversation_id: Optional[int] = None
- regenerate_reason: str = ""
- @router.post("/re_produce_single_question")
- async def re_produce_single_question(
- request: Request,
- data: ReproduceSingleQuestionRequest,
- db: Session = Depends(get_db)
- ):
- """重新生成单题"""
- user = request.state.user
- if not user:
- return {"statusCode": 401, "msg": "未授权"}
- prompt = (data.message or "").strip()
- # 兼容旧版调用:未传 message 时,尝试根据会话和重生成原因构造提示词。
- if not prompt and data.ai_conversation_id:
- message = db.query(AIMessage).filter(
- AIMessage.ai_conversation_id == data.ai_conversation_id,
- AIMessage.type == 'ai'
- ).first()
- if not message:
- return {"statusCode": 404, "msg": "消息不存在"}
- prompt = (message.content or "").strip()
- if data.regenerate_reason:
- prompt = f"{prompt}\n\n请根据以下要求重新生成:{data.regenerate_reason}"
- if not prompt:
- return {"statusCode": 400, "msg": "缺少生成内容"}
- try:
- new_question = await qwen_service.chat([
- {"role": "user", "content": prompt}
- ])
- except Exception as e:
- return {"statusCode": 500, "msg": f"AI生成失败: {str(e)}"}
- return {
- "statusCode": 200,
- "msg": "success",
- "data": {
- "ai_conversation_id": data.ai_conversation_id,
- "new_question": new_question,
- "reply": new_question,
- "content": new_question,
- "message": new_question
- }
- }
|