| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432 |
- """
- 阿里云百炼 EduTutor 客户端
- 提供拍照解题功能的API调用
- """
- import os
- import logging
- from typing import Optional, AsyncGenerator
- from alibabacloud_edututor20250707.client import Client as EduTutorClient
- from alibabacloud_edututor20250707 import models as edututor_models
- from alibabacloud_tea_openapi import models as open_api_models
- from alibabacloud_credentials.models import Config as CredConfig
- from alibabacloud_credentials.client import Client as CredClient
- logger = logging.getLogger(__name__)
- class BailianEduTutorClient:
- """百炼 EduTutor 客户端"""
-
- def __init__(self, api_key: str, workspace_id: Optional[str] = None):
- """
- 初始化客户端
-
- Args:
- api_key: 用户的 DASHSCOPE API Key(预留参数,当前未使用)
- workspace_id: 百炼工作空间ID,默认从环境变量读取
- """
- self.api_key = api_key
- self.workspace_id = workspace_id or os.getenv('BAILIAN_WORKSPACE_ID', 'llm-uflun9q7q59osmbb')
-
- # 从环境变量获取 AccessKey ID 和 Secret
- access_key_id = os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID')
- access_key_secret = os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET')
-
- if not access_key_id or not access_key_secret:
- raise ValueError("环境变量 ALIBABA_CLOUD_ACCESS_KEY_ID 或 ALIBABA_CLOUD_ACCESS_KEY_SECRET 未配置")
-
- # 使用环境变量的 AccessKey 进行认证
- cred_config = CredConfig(
- type='access_key',
- access_key_id=access_key_id,
- access_key_secret=access_key_secret
- )
- cred = CredClient(cred_config)
-
- # 创建配置(使用 hangzhou endpoint)
- config = open_api_models.Config(
- credential=cred,
- endpoint='edututor.cn-hangzhou.aliyuncs.com'
- )
-
- # 创建客户端
- self.client = EduTutorClient(config)
-
- logger.info(f"EduTutor client initialized with workspace: {self.workspace_id}")
-
- async def answer_sse_async(
- self,
- image_url: str,
- grade: int = 0,
- stage: str = 'other',
- subject: str = 'other'
- ) -> AsyncGenerator[dict, None]:
- """
- 流式解答题目(SSE)
-
- Args:
- image_url: 题目图片URL
- grade: 年级(0-17, 99=其他)
- stage: 学段
- subject: 学科
-
- Yields:
- dict: SSE事件数据
- - type: 'start' | 'chunk' | 'finish'
- - content: 内容文本(仅chunk类型)
- - finish_reason: 完成原因(仅finish类型)
- - tokens: Token统计(仅finish类型)
- """
- try:
- # 构建消息对象(content 是字典列表)
- message = edututor_models.AnswerSSERequestMessages(
- role='user',
- content=[{'image': image_url}]
- )
-
- # 构建参数对象
- parameters = edututor_models.AnswerSSERequestParameters(
- grade=grade,
- stage=stage,
- subject=subject
- )
-
- # 构建请求
- request = edututor_models.AnswerSSERequest(
- workspace_id=self.workspace_id,
- messages=[message],
- parameters=parameters
- )
-
- logger.info(f"Calling EduTutor Answer API: image_url={image_url}, grade={grade}, stage={stage}, subject={subject}")
-
- try:
- # 设置运行时选项
- from alibabacloud_tea_util import models as util_models
- runtime = util_models.RuntimeOptions(read_timeout=1000 * 100)
- headers = {}
-
- # 调用流式API(使用 answer_ssewith_sse 方法)
- logger.info("About to call self.client.answer_ssewith_sse...")
- sse_receiver = self.client.answer_ssewith_sse(request, headers, runtime)
- logger.info(f"Got SSE receiver: {type(sse_receiver)}")
- except Exception as e:
- logger.error(f"Failed to call answer_ssewith_sse: {type(e).__name__}: {e}", exc_info=True)
- raise Exception(f"调用百炼 API 失败: {str(e)}")
-
- logger.info("Starting to parse SSE stream...")
-
- # 解析SSE流
- import json
- event_count = 0
-
- try:
- for response in sse_receiver:
- event_count += 1
- try:
- body = response.body
-
- # 将响应体转换为字典
- if hasattr(body, 'to_map'):
- body_dict = body.to_map()
- elif isinstance(body, dict):
- body_dict = body
- else:
- # 尝试通过属性访问
- body_dict = {}
- for attr in ['code', 'data', 'message', 'request_id', 'finish_reason', 'input_tokens', 'output_tokens']:
- if hasattr(body, attr):
- body_dict[attr] = getattr(body, attr)
-
- # 检查是否成功
- if body_dict.get('code') != 'SUCCESS':
- logger.error(f"API error: {body_dict}")
- continue
-
- # 解析 data 字段(JSON 字符串)
- if 'data' in body_dict and body_dict['data']:
- try:
- inner_data = json.loads(body_dict['data'])
- except json.JSONDecodeError as e:
- logger.warning(f"Event {event_count}: Failed to parse data field: {e}")
- # 跳过这个事件,继续处理下一个
- continue
-
- # 提取message内容
- if 'message' in inner_data:
- message = inner_data['message']
- if 'content' in message and len(message['content']) > 0:
- text = message['content'][0].get('text', '')
- if text:
- yield {
- 'type': 'chunk',
- 'content': text
- }
-
- # 检查是否完成
- if 'finish_reason' in body_dict and body_dict['finish_reason'] and body_dict['finish_reason'] != 'null':
- logger.info(f"Stream finished: {body_dict['finish_reason']}, processed {event_count} events")
- yield {
- 'type': 'finish',
- 'finish_reason': body_dict['finish_reason'],
- 'tokens': {
- 'input': body_dict.get('input_tokens', 0),
- 'output': body_dict.get('output_tokens', 0)
- }
- }
- return
-
- except Exception as e:
- logger.warning(f"Event {event_count}: Error processing SSE response: {e}")
- # 继续处理下一个事件
- continue
-
- except json.JSONDecodeError as e:
- # SDK 内部的 JSON 解析错误
- logger.error(f"SDK JSON decode error after {event_count} events: {e}")
- # 如果已经处理了一些事件,发送 finish 事件
- if event_count > 0:
- logger.info(f"Ending stream early due to JSON error, processed {event_count} events")
- yield {
- 'type': 'finish',
- 'finish_reason': 'error',
- 'tokens': {'input': 0, 'output': 0}
- }
- return
- else:
- # 如果一个事件都没处理,抛出异常
- raise Exception(f"解题失败: {str(e)}")
-
- logger.info(f"SSE stream ended, processed {event_count} events total")
-
- logger.info("EduTutor Answer API call completed")
-
- except Exception as e:
- logger.error(f"EduTutor Answer API error: {e}")
- raise Exception(f"解题失败: {str(e)}")
-
- async def cut_questions_async(
- self,
- image_url: str,
- struct: bool = True,
- extract_images: bool = True
- ) -> dict:
- """
- 异步切题接口
-
- Args:
- image_url: 试卷图片URL
- struct: 是否输出题目结构化(OCR)信息
- extract_images: 是否返回题目图片链接
-
- Returns:
- dict: 切题结果
- - questions: 题目列表
- - count: 题目数量
- """
- try:
- # 构建参数对象
- parameters = edututor_models.CutQuestionsRequestParameters(
- struct=struct,
- extract_images=extract_images
- )
-
- # 构建请求
- request = edututor_models.CutQuestionsRequest(
- image=image_url,
- parameters=parameters,
- workspace_id=self.workspace_id
- )
-
- logger.info(f"Calling EduTutor CutQuestions API: image_url={image_url}")
-
- # 调用API
- response = await self.client.cut_questions_async(request)
- logger.info(f"CutQuestions API raw response type: {type(response)}")
-
- # 解析响应
- if hasattr(response, 'body'):
- body = response.body
- logger.info(f"CutQuestions API response body type: {type(body)}")
-
- # 将响应体转换为字典
- import json
- if hasattr(body, 'to_map'):
- body_dict = body.to_map()
- logger.info(f"Converted body to dict, keys: {body_dict.keys()}")
- elif isinstance(body, dict):
- body_dict = body
- logger.info(f"Body is already dict, keys: {body_dict.keys()}")
- else:
- # 尝试通过属性访问
- logger.info(f"Body attributes: {dir(body)}")
- body_dict = {}
- for attr in ['code', 'data', 'message', 'request_id']:
- if hasattr(body, attr):
- body_dict[attr] = getattr(body, attr)
- logger.info(f"Extracted attributes: {body_dict.keys()}")
-
- # 检查是否有 code 和 data 字段
- if body_dict.get('code') == 'SUCCESS' and 'data' in body_dict:
- logger.info(f"Found SUCCESS code, parsing data field...")
-
- # data 是 JSON 字符串,需要解析
- data_str = body_dict['data']
- logger.info(f"Data field type: {type(data_str)}")
-
- data = json.loads(data_str)
- logger.info(f"Parsed data keys: {data.keys() if isinstance(data, dict) else 'not a dict'}")
-
- questions = data.get('questions', [])
- logger.info(f"Found {len(questions)} questions in parsed data")
-
- # 转换为前端需要的格式
- formatted_questions = []
- for idx, q in enumerate(questions):
- merged_image = q.get('merged_image', '')
- stem_text = ''
-
- # 安全地提取 stem text
- if 'info' in q and isinstance(q['info'], dict):
- if 'stem' in q['info'] and isinstance(q['info']['stem'], dict):
- stem_text = q['info']['stem'].get('text', '')
-
- formatted_q = {
- 'question_id': str(idx + 1),
- 'image_url': merged_image,
- 'text': stem_text
- }
- logger.info(f"Formatted question {idx + 1}: id={formatted_q['question_id']}, has_image={bool(merged_image)}, text_length={len(stem_text)}")
- formatted_questions.append(formatted_q)
-
- result = {
- 'questions': formatted_questions,
- 'count': len(formatted_questions)
- }
- logger.info(f"Returning result with {result['count']} questions")
- return result
-
- elif 'questions' in body_dict:
- # 直接包含 questions 字段
- logger.info(f"Found questions field directly in body")
- return {
- 'questions': body_dict.get('questions', []),
- 'count': len(body_dict.get('questions', []))
- }
- else:
- logger.warning(f"Unexpected response format, available keys: {body_dict.keys()}")
-
- logger.warning("No valid response body found, returning empty result")
- return {'questions': [], 'count': 0}
-
- except Exception as e:
- logger.error(f"EduTutor CutQuestions API error: {e}", exc_info=True)
- raise Exception(f"切题失败: {str(e)}")
-
- def answer_sync(
- self,
- image_url: str,
- grade: int = 0,
- stage: str = 'other',
- subject: str = 'other'
- ) -> dict:
- """
- 同步解答题目(非流式)
-
- Args:
- image_url: 题目图片URL
- grade: 年级
- stage: 学段
- subject: 学科
-
- Returns:
- dict: 解答结果
- - answer: 解答内容
- - input_tokens: 输入Token数
- - output_tokens: 输出Token数
- """
- try:
- # 构建消息对象(content 是字典列表)
- message = edututor_models.AnswerSSERequestMessages(
- role='user',
- content=[{'image': image_url}]
- )
-
- # 构建参数对象
- parameters = edututor_models.AnswerSSERequestParameters(
- grade=grade,
- stage=stage,
- subject=subject
- )
-
- # 构建请求
- request = edututor_models.AnswerSSERequest(
- workspace_id=self.workspace_id,
- messages=[message],
- parameters=parameters
- )
-
- logger.info(f"Calling EduTutor Answer API (sync): image_url={image_url}")
-
- # 设置运行时选项
- from alibabacloud_tea_util import models as util_models
- runtime = util_models.RuntimeOptions(read_timeout=1000 * 100)
- headers = {}
-
- # 调用API
- sse_receiver = self.client.answer_ssewith_sse(request, headers, runtime)
-
- # 收集所有内容
- full_answer = ''
- input_tokens = 0
- output_tokens = 0
-
- import json
- for response in sse_receiver:
- try:
- body = response.body
-
- # 将响应体转换为字典
- if hasattr(body, 'to_map'):
- body_dict = body.to_map()
- elif isinstance(body, dict):
- body_dict = body
- else:
- body_dict = {}
- for attr in ['code', 'data', 'finish_reason', 'input_tokens', 'output_tokens']:
- if hasattr(body, attr):
- body_dict[attr] = getattr(body, attr)
-
- if body_dict.get('code') != 'SUCCESS':
- continue
-
- # 解析 data 字段
- if 'data' in body_dict and body_dict['data']:
- inner_data = json.loads(body_dict['data'])
-
- if 'message' in inner_data:
- message = inner_data['message']
- if 'content' in message and len(message['content']) > 0:
- text = message['content'][0].get('text', '')
- full_answer += text
-
- if 'finish_reason' in body_dict and body_dict['finish_reason'] and body_dict['finish_reason'] != 'null':
- input_tokens = body_dict.get('input_tokens', 0)
- output_tokens = body_dict.get('output_tokens', 0)
- break
-
- except json.JSONDecodeError:
- continue
- except Exception:
- continue
-
- return {
- 'answer': full_answer,
- 'input_tokens': input_tokens,
- 'output_tokens': output_tokens
- }
-
- except Exception as e:
- logger.error(f"EduTutor Answer API error: {e}")
- raise Exception(f"解题失败: {str(e)}")
|