""" 阿里云百炼 EduTutor 客户端 提供拍照解题功能的API调用 """ import os import logging from typing import Optional, AsyncGenerator from alibabacloud_edututor20250707.client import Client as EduTutorClient from alibabacloud_edututor20250707 import models as edututor_models from alibabacloud_tea_openapi import models as open_api_models from alibabacloud_credentials.models import Config as CredConfig from alibabacloud_credentials.client import Client as CredClient logger = logging.getLogger(__name__) class BailianEduTutorClient: """百炼 EduTutor 客户端""" def __init__(self, api_key: str, workspace_id: Optional[str] = None): """ 初始化客户端 Args: api_key: 用户的 DASHSCOPE API Key(预留参数,当前未使用) workspace_id: 百炼工作空间ID,默认从环境变量读取 """ self.api_key = api_key self.workspace_id = workspace_id or os.getenv('BAILIAN_WORKSPACE_ID', 'llm-uflun9q7q59osmbb') # 从环境变量获取 AccessKey ID 和 Secret access_key_id = os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID') access_key_secret = os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET') if not access_key_id or not access_key_secret: raise ValueError("环境变量 ALIBABA_CLOUD_ACCESS_KEY_ID 或 ALIBABA_CLOUD_ACCESS_KEY_SECRET 未配置") # 使用环境变量的 AccessKey 进行认证 cred_config = CredConfig( type='access_key', access_key_id=access_key_id, access_key_secret=access_key_secret ) cred = CredClient(cred_config) # 创建配置(使用 hangzhou endpoint) config = open_api_models.Config( credential=cred, endpoint='edututor.cn-hangzhou.aliyuncs.com' ) # 创建客户端 self.client = EduTutorClient(config) logger.info(f"EduTutor client initialized with workspace: {self.workspace_id}") async def answer_sse_async( self, image_url: str, grade: int = 0, stage: str = 'other', subject: str = 'other' ) -> AsyncGenerator[dict, None]: """ 流式解答题目(SSE) Args: image_url: 题目图片URL grade: 年级(0-17, 99=其他) stage: 学段 subject: 学科 Yields: dict: SSE事件数据 - type: 'start' | 'chunk' | 'finish' - content: 内容文本(仅chunk类型) - finish_reason: 完成原因(仅finish类型) - tokens: Token统计(仅finish类型) """ try: # 构建消息对象(content 是字典列表) message = edututor_models.AnswerSSERequestMessages( role='user', content=[{'image': image_url}] ) # 构建参数对象 parameters = edututor_models.AnswerSSERequestParameters( grade=grade, stage=stage, subject=subject ) # 构建请求 request = edututor_models.AnswerSSERequest( workspace_id=self.workspace_id, messages=[message], parameters=parameters ) logger.info(f"Calling EduTutor Answer API: image_url={image_url}, grade={grade}, stage={stage}, subject={subject}") try: # 设置运行时选项 from alibabacloud_tea_util import models as util_models runtime = util_models.RuntimeOptions(read_timeout=1000 * 100) headers = {} # 调用流式API(使用 answer_ssewith_sse 方法) logger.info("About to call self.client.answer_ssewith_sse...") sse_receiver = self.client.answer_ssewith_sse(request, headers, runtime) logger.info(f"Got SSE receiver: {type(sse_receiver)}") except Exception as e: logger.error(f"Failed to call answer_ssewith_sse: {type(e).__name__}: {e}", exc_info=True) raise Exception(f"调用百炼 API 失败: {str(e)}") logger.info("Starting to parse SSE stream...") # 解析SSE流 import json event_count = 0 try: for response in sse_receiver: event_count += 1 try: body = response.body # 将响应体转换为字典 if hasattr(body, 'to_map'): body_dict = body.to_map() elif isinstance(body, dict): body_dict = body else: # 尝试通过属性访问 body_dict = {} for attr in ['code', 'data', 'message', 'request_id', 'finish_reason', 'input_tokens', 'output_tokens']: if hasattr(body, attr): body_dict[attr] = getattr(body, attr) # 检查是否成功 if body_dict.get('code') != 'SUCCESS': logger.error(f"API error: {body_dict}") continue # 解析 data 字段(JSON 字符串) if 'data' in body_dict and body_dict['data']: try: inner_data = json.loads(body_dict['data']) except json.JSONDecodeError as e: logger.warning(f"Event {event_count}: Failed to parse data field: {e}") # 跳过这个事件,继续处理下一个 continue # 提取message内容 if 'message' in inner_data: message = inner_data['message'] if 'content' in message and len(message['content']) > 0: text = message['content'][0].get('text', '') if text: yield { 'type': 'chunk', 'content': text } # 检查是否完成 if 'finish_reason' in body_dict and body_dict['finish_reason'] and body_dict['finish_reason'] != 'null': logger.info(f"Stream finished: {body_dict['finish_reason']}, processed {event_count} events") yield { 'type': 'finish', 'finish_reason': body_dict['finish_reason'], 'tokens': { 'input': body_dict.get('input_tokens', 0), 'output': body_dict.get('output_tokens', 0) } } return except Exception as e: logger.warning(f"Event {event_count}: Error processing SSE response: {e}") # 继续处理下一个事件 continue except json.JSONDecodeError as e: # SDK 内部的 JSON 解析错误 logger.error(f"SDK JSON decode error after {event_count} events: {e}") # 如果已经处理了一些事件,发送 finish 事件 if event_count > 0: logger.info(f"Ending stream early due to JSON error, processed {event_count} events") yield { 'type': 'finish', 'finish_reason': 'error', 'tokens': {'input': 0, 'output': 0} } return else: # 如果一个事件都没处理,抛出异常 raise Exception(f"解题失败: {str(e)}") logger.info(f"SSE stream ended, processed {event_count} events total") logger.info("EduTutor Answer API call completed") except Exception as e: logger.error(f"EduTutor Answer API error: {e}") raise Exception(f"解题失败: {str(e)}") async def cut_questions_async( self, image_url: str, struct: bool = True, extract_images: bool = True ) -> dict: """ 异步切题接口 Args: image_url: 试卷图片URL struct: 是否输出题目结构化(OCR)信息 extract_images: 是否返回题目图片链接 Returns: dict: 切题结果 - questions: 题目列表 - count: 题目数量 """ try: # 构建参数对象 parameters = edututor_models.CutQuestionsRequestParameters( struct=struct, extract_images=extract_images ) # 构建请求 request = edututor_models.CutQuestionsRequest( image=image_url, parameters=parameters, workspace_id=self.workspace_id ) logger.info(f"Calling EduTutor CutQuestions API: image_url={image_url}") # 调用API response = await self.client.cut_questions_async(request) logger.info(f"CutQuestions API raw response type: {type(response)}") # 解析响应 if hasattr(response, 'body'): body = response.body logger.info(f"CutQuestions API response body type: {type(body)}") # 将响应体转换为字典 import json if hasattr(body, 'to_map'): body_dict = body.to_map() logger.info(f"Converted body to dict, keys: {body_dict.keys()}") elif isinstance(body, dict): body_dict = body logger.info(f"Body is already dict, keys: {body_dict.keys()}") else: # 尝试通过属性访问 logger.info(f"Body attributes: {dir(body)}") body_dict = {} for attr in ['code', 'data', 'message', 'request_id']: if hasattr(body, attr): body_dict[attr] = getattr(body, attr) logger.info(f"Extracted attributes: {body_dict.keys()}") # 检查是否有 code 和 data 字段 if body_dict.get('code') == 'SUCCESS' and 'data' in body_dict: logger.info(f"Found SUCCESS code, parsing data field...") # data 是 JSON 字符串,需要解析 data_str = body_dict['data'] logger.info(f"Data field type: {type(data_str)}") data = json.loads(data_str) logger.info(f"Parsed data keys: {data.keys() if isinstance(data, dict) else 'not a dict'}") questions = data.get('questions', []) logger.info(f"Found {len(questions)} questions in parsed data") # 转换为前端需要的格式 formatted_questions = [] for idx, q in enumerate(questions): merged_image = q.get('merged_image', '') stem_text = '' # 安全地提取 stem text if 'info' in q and isinstance(q['info'], dict): if 'stem' in q['info'] and isinstance(q['info']['stem'], dict): stem_text = q['info']['stem'].get('text', '') formatted_q = { 'question_id': str(idx + 1), 'image_url': merged_image, 'text': stem_text } logger.info(f"Formatted question {idx + 1}: id={formatted_q['question_id']}, has_image={bool(merged_image)}, text_length={len(stem_text)}") formatted_questions.append(formatted_q) result = { 'questions': formatted_questions, 'count': len(formatted_questions) } logger.info(f"Returning result with {result['count']} questions") return result elif 'questions' in body_dict: # 直接包含 questions 字段 logger.info(f"Found questions field directly in body") return { 'questions': body_dict.get('questions', []), 'count': len(body_dict.get('questions', [])) } else: logger.warning(f"Unexpected response format, available keys: {body_dict.keys()}") logger.warning("No valid response body found, returning empty result") return {'questions': [], 'count': 0} except Exception as e: logger.error(f"EduTutor CutQuestions API error: {e}", exc_info=True) raise Exception(f"切题失败: {str(e)}") def answer_sync( self, image_url: str, grade: int = 0, stage: str = 'other', subject: str = 'other' ) -> dict: """ 同步解答题目(非流式) Args: image_url: 题目图片URL grade: 年级 stage: 学段 subject: 学科 Returns: dict: 解答结果 - answer: 解答内容 - input_tokens: 输入Token数 - output_tokens: 输出Token数 """ try: # 构建消息对象(content 是字典列表) message = edututor_models.AnswerSSERequestMessages( role='user', content=[{'image': image_url}] ) # 构建参数对象 parameters = edututor_models.AnswerSSERequestParameters( grade=grade, stage=stage, subject=subject ) # 构建请求 request = edututor_models.AnswerSSERequest( workspace_id=self.workspace_id, messages=[message], parameters=parameters ) logger.info(f"Calling EduTutor Answer API (sync): image_url={image_url}") # 设置运行时选项 from alibabacloud_tea_util import models as util_models runtime = util_models.RuntimeOptions(read_timeout=1000 * 100) headers = {} # 调用API sse_receiver = self.client.answer_ssewith_sse(request, headers, runtime) # 收集所有内容 full_answer = '' input_tokens = 0 output_tokens = 0 import json for response in sse_receiver: try: body = response.body # 将响应体转换为字典 if hasattr(body, 'to_map'): body_dict = body.to_map() elif isinstance(body, dict): body_dict = body else: body_dict = {} for attr in ['code', 'data', 'finish_reason', 'input_tokens', 'output_tokens']: if hasattr(body, attr): body_dict[attr] = getattr(body, attr) if body_dict.get('code') != 'SUCCESS': continue # 解析 data 字段 if 'data' in body_dict and body_dict['data']: inner_data = json.loads(body_dict['data']) if 'message' in inner_data: message = inner_data['message'] if 'content' in message and len(message['content']) > 0: text = message['content'][0].get('text', '') full_answer += text if 'finish_reason' in body_dict and body_dict['finish_reason'] and body_dict['finish_reason'] != 'null': input_tokens = body_dict.get('input_tokens', 0) output_tokens = body_dict.get('output_tokens', 0) break except json.JSONDecodeError: continue except Exception: continue return { 'answer': full_answer, 'input_tokens': input_tokens, 'output_tokens': output_tokens } except Exception as e: logger.error(f"EduTutor Answer API error: {e}") raise Exception(f"解题失败: {str(e)}")