deep_routes.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314
  1. from flask import Blueprint, render_template, request, jsonify
  2. from flask_login import login_required
  3. from app import db, create_app
  4. from app.models import SpiderResult, DeepCollection, AIModel, TokenUsageLog
  5. from sqlalchemy.exc import OperationalError
  6. import time
  7. import asyncio
  8. import threading
  9. import json
  10. import requests
  11. from crawl4ai import AsyncWebCrawler
  12. import sys
  13. if sys.platform == 'win32':
  14. asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
  15. bp = Blueprint('deep', __name__, url_prefix='/deep')
  16. @bp.route('/dashboard')
  17. @login_required
  18. def dashboard():
  19. return render_template('deep_management.html')
  20. @bp.route('/api/list')
  21. @login_required
  22. def list_data():
  23. page = request.args.get('page', 1, type=int)
  24. per_page = request.args.get('per_page', 10, type=int)
  25. query = request.args.get('query', '')
  26. q = DeepCollection.query
  27. if query:
  28. q = q.filter(DeepCollection.url.like(f'%{query}%') | DeepCollection.content.like(f'%{query}%'))
  29. pagination = q.order_by(DeepCollection.updated_at.desc()).paginate(page=page, per_page=per_page, error_out=False)
  30. items = []
  31. for item in pagination.items:
  32. items.append({
  33. 'id': item.id,
  34. 'title': item.title,
  35. 'url': item.url,
  36. 'summary': item.summary,
  37. 'status': item.status,
  38. 'created_at': item.created_at.strftime('%Y-%m-%d %H:%M:%S'),
  39. 'updated_at': item.updated_at.strftime('%Y-%m-%d %H:%M:%S'),
  40. 'content_length': len(item.content) if item.content else 0
  41. })
  42. return jsonify({
  43. 'items': items,
  44. 'total': pagination.total,
  45. 'pages': pagination.pages,
  46. 'current_page': page
  47. })
  48. async def run_crawl(url):
  49. async with AsyncWebCrawler(verbose=True) as crawler:
  50. result = await crawler.arun(url=url)
  51. return result.markdown
  52. def safe_commit(session, max_retries=5, delay=0.5):
  53. for i in range(max_retries):
  54. try:
  55. session.commit()
  56. return True
  57. except OperationalError as e:
  58. if "locked" in str(e):
  59. print(f"Database locked, retrying {i+1}/{max_retries}...")
  60. session.rollback()
  61. time.sleep(delay)
  62. else:
  63. raise e
  64. print("Database commit failed after max retries")
  65. return False
  66. def generate_summary(content, model):
  67. if not model or not content:
  68. return None
  69. try:
  70. url = model.api_base
  71. if not url.endswith('/chat/completions'):
  72. if not url.endswith('/'):
  73. url += '/'
  74. url += 'chat/completions'
  75. headers = {
  76. "Authorization": f"Bearer {model.api_key}",
  77. "Content-Type": "application/json"
  78. }
  79. # Truncate content to avoid token limits
  80. truncated_content = content[:10000]
  81. payload = {
  82. "model": model.model_name,
  83. "messages": [
  84. {"role": "system", "content": "You are a helpful assistant. Please summarize the following web page content in Chinese within 200 words."},
  85. {"role": "user", "content": truncated_content}
  86. ],
  87. "stream": False,
  88. "max_tokens": 500,
  89. "temperature": 0.5
  90. }
  91. response = requests.post(url, json=payload, headers=headers, timeout=30)
  92. if response.status_code == 200:
  93. res_json = response.json()
  94. # Extract and log usage
  95. if 'usage' in res_json:
  96. usage = res_json['usage']
  97. print(f"Token Usage: {usage}")
  98. prompt_tokens = usage.get('prompt_tokens', 0)
  99. completion_tokens = usage.get('completion_tokens', 0)
  100. total_tokens = usage.get('total_tokens', 0)
  101. log = TokenUsageLog(
  102. model_id=model.id,
  103. prompt_tokens=prompt_tokens,
  104. completion_tokens=completion_tokens,
  105. total_tokens=total_tokens,
  106. request_type='deep_collect'
  107. )
  108. model.total_tokens = (model.total_tokens or 0) + total_tokens
  109. db.session.add(log)
  110. safe_commit(db.session)
  111. else:
  112. print(f"Warning: No 'usage' field in AI response. Keys: {list(res_json.keys())}")
  113. return res_json['choices'][0]['message']['content']
  114. else:
  115. print(f"AI Summary Error: {response.text}")
  116. return None
  117. except Exception as e:
  118. print(f"AI Summary Exception: {e}")
  119. return None
  120. def execute_deep_task(deep_id, app_context=None):
  121. """Background task for deep collection"""
  122. app = create_app()
  123. with app.app_context():
  124. try:
  125. deep_item = DeepCollection.query.get(deep_id)
  126. if not deep_item:
  127. return
  128. deep_item.status = 'running'
  129. deep_item.progress = 10
  130. deep_item.progress_msg = 'Initializing task...'
  131. safe_commit(db.session)
  132. url = deep_item.url
  133. # 1. Crawl
  134. deep_item.progress = 20
  135. deep_item.progress_msg = f'Connecting to {url}...'
  136. safe_commit(db.session)
  137. print(f"Starting crawl for {url}")
  138. try:
  139. markdown = asyncio.run(run_crawl(url))
  140. except Exception as e:
  141. deep_item.status = 'failed'
  142. deep_item.error_msg = f"Crawl failed: {str(e)}"
  143. deep_item.progress_msg = 'Crawl failed'
  144. safe_commit(db.session)
  145. return
  146. if not markdown:
  147. deep_item.status = 'failed'
  148. deep_item.error_msg = 'Failed to crawl content (empty result)'
  149. deep_item.progress_msg = 'Crawl returned empty'
  150. safe_commit(db.session)
  151. return
  152. deep_item.content = markdown
  153. deep_item.progress = 50
  154. deep_item.progress_msg = 'Content crawled successfully. Analyzing...'
  155. safe_commit(db.session)
  156. # 2. AI Summary
  157. model = AIModel.query.filter_by(is_active=True).first()
  158. summary = None
  159. if model:
  160. deep_item.progress = 60
  161. deep_item.progress_msg = f'Generating summary with {model.name}...'
  162. safe_commit(db.session)
  163. print(f"Generating summary with model {model.name}")
  164. summary = generate_summary(markdown, model)
  165. if summary:
  166. deep_item.progress = 90
  167. deep_item.progress_msg = 'Summary generated.'
  168. else:
  169. deep_item.progress_msg = 'Summary generation skipped or failed.'
  170. else:
  171. deep_item.progress_msg = 'No active AI model, skipping summary.'
  172. # 3. Save final result
  173. deep_item.summary = summary
  174. deep_item.status = 'completed'
  175. deep_item.progress = 100
  176. deep_item.progress_msg = 'Deep collection completed successfully.'
  177. deep_item.error_msg = None
  178. safe_commit(db.session)
  179. except Exception as e:
  180. print(f"Deep task failed: {e}")
  181. # Re-query in case of session issues
  182. with app.app_context():
  183. deep_item = DeepCollection.query.get(deep_id)
  184. if deep_item:
  185. deep_item.status = 'failed'
  186. deep_item.error_msg = str(e)
  187. deep_item.progress_msg = 'Internal error occurred'
  188. safe_commit(db.session)
  189. @bp.route('/api/collect', methods=['POST'])
  190. @login_required
  191. def deep_collect():
  192. data = request.json
  193. data_id = data.get('id') # SpiderResult ID
  194. source_data = SpiderResult.query.get(data_id)
  195. if not source_data:
  196. return jsonify({'error': 'Source data not found'}), 404
  197. url = source_data.link
  198. if not url:
  199. return jsonify({'error': 'No URL in source data'}), 400
  200. # Check or create DeepCollection record
  201. deep_item = DeepCollection.query.filter_by(url=url).first()
  202. if not deep_item:
  203. deep_item = DeepCollection(
  204. url=url,
  205. title=source_data.title,
  206. status='pending',
  207. progress=0,
  208. progress_msg='Queued...'
  209. )
  210. db.session.add(deep_item)
  211. else:
  212. # Reset for re-run
  213. deep_item.status = 'pending'
  214. deep_item.title = source_data.title # Update title in case it changed
  215. deep_item.progress = 0
  216. deep_item.progress_msg = 'Queued...'
  217. deep_item.error_msg = None
  218. # Update SpiderResult to show it has deep collection
  219. source_data.has_deep_collection = True
  220. db.session.commit()
  221. # Start background thread
  222. thread = threading.Thread(target=execute_deep_task, args=(deep_item.id,))
  223. thread.start()
  224. return jsonify({
  225. 'message': 'Deep collection started',
  226. 'task_id': deep_item.id,
  227. 'status': 'started'
  228. })
  229. @bp.route('/api/status/<int:id>', methods=['GET'])
  230. @login_required
  231. def check_status(id):
  232. item = DeepCollection.query.get(id)
  233. if not item:
  234. return jsonify({'error': 'Task not found'}), 404
  235. return jsonify({
  236. 'id': item.id,
  237. 'status': item.status,
  238. 'progress': item.progress,
  239. 'progress_msg': item.progress_msg,
  240. 'error': item.error_msg,
  241. 'summary': item.summary if item.status == 'completed' else None,
  242. 'url': item.url
  243. })
  244. @bp.route('/api/get/<int:id>')
  245. @login_required
  246. def get_deep_data(id):
  247. item = DeepCollection.query.get_or_404(id)
  248. return jsonify({
  249. 'id': item.id,
  250. 'url': item.url,
  251. 'content': item.content,
  252. 'summary': item.summary,
  253. 'status': item.status,
  254. 'updated_at': item.updated_at.strftime('%Y-%m-%d %H:%M:%S')
  255. })
  256. @bp.route('/api/delete', methods=['POST'])
  257. @login_required
  258. def delete_deep_data():
  259. data = request.json
  260. ids = data.get('ids', [])
  261. if not ids:
  262. return jsonify({'error': 'No IDs provided'}), 400
  263. try:
  264. DeepCollection.query.filter(DeepCollection.id.in_(ids)).delete(synchronize_session=False)
  265. db.session.commit()
  266. return jsonify({'message': 'Deleted successfully'})
  267. except Exception as e:
  268. db.session.rollback()
  269. return jsonify({'error': str(e)}), 500