knowledge_poller.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. import json
  2. import logging
  3. import threading
  4. from datetime import datetime, timedelta
  5. from app.sample_center_client import SampleCenterClient, SampleCenterError
  6. logger = logging.getLogger(__name__)
  7. MAX_POLL_COUNT = 20
  8. POLL_INTERVAL_INIT = 2
  9. POLL_INTERVAL_MAX = 30
  10. POLL_MULTIPLIER = 1.5
  11. class KnowledgePoller:
  12. """后台轮询线程,定期检查 pending/processing 的入库任务状态。"""
  13. def __init__(self, app):
  14. self.app = app
  15. self._thread = None
  16. self._stop_event = threading.Event()
  17. def start(self):
  18. if self._thread and self._thread.is_alive():
  19. return
  20. self._stop_event.clear()
  21. self._thread = threading.Thread(target=self._run, name="knowledge-poller", daemon=True)
  22. self._thread.start()
  23. logger.info("Knowledge poller started")
  24. def stop(self):
  25. self._stop_event.set()
  26. if self._thread:
  27. self._thread.join(timeout=10)
  28. logger.info("Knowledge poller stopped")
  29. def _run(self):
  30. while not self._stop_event.is_set():
  31. try:
  32. self._poll_due_tasks()
  33. except Exception:
  34. logger.exception("Poller error")
  35. self._stop_event.wait(5)
  36. def _poll_due_tasks(self):
  37. from app.models import KnowledgeImportTask
  38. with self.app.app_context():
  39. now = datetime.utcnow()
  40. tasks = KnowledgeImportTask.query.filter(
  41. KnowledgeImportTask.status.in_(['pending', 'processing']),
  42. KnowledgeImportTask.next_poll_at <= now,
  43. KnowledgeImportTask.poll_count < MAX_POLL_COUNT,
  44. ).all()
  45. for task in tasks:
  46. self._poll_single_task(task)
  47. def _poll_single_task(self, task):
  48. from app import db
  49. cfg = self.app.config
  50. client = SampleCenterClient(
  51. base_url=cfg['SAMPLE_CENTER_BASE_URL'],
  52. app_id=cfg['SAMPLE_CENTER_APP_ID'],
  53. app_secret=cfg['SAMPLE_CENTER_APP_SECRET'],
  54. )
  55. task.poll_count += 1
  56. task.last_poll_at = datetime.utcnow()
  57. try:
  58. result = client.get_import_task(task.sample_task_id)
  59. sc_data = result.get('data', {})
  60. sc_status = sc_data.get('status', '')
  61. task.status_detail = json.dumps(sc_data, ensure_ascii=False)
  62. progress = sc_data.get('progress')
  63. if progress:
  64. task.progress = json.dumps(progress, ensure_ascii=False)
  65. if sc_status in ('completed',):
  66. task.status = 'success'
  67. task.next_poll_at = None
  68. elif sc_status == 'failed':
  69. task.status = 'failed'
  70. task.error_message = sc_data.get('error', '')
  71. task.next_poll_at = None
  72. else:
  73. task.status = 'processing'
  74. interval = min(
  75. POLL_INTERVAL_INIT * (POLL_MULTIPLIER ** (task.poll_count - 1)),
  76. POLL_INTERVAL_MAX,
  77. )
  78. task.next_poll_at = datetime.utcnow() + timedelta(seconds=interval)
  79. db.session.commit()
  80. logger.info(f"Polled task {task.task_no}: status={task.status}")
  81. except SampleCenterError as e:
  82. task.error_message = str(e)
  83. interval = min(
  84. POLL_INTERVAL_INIT * (POLL_MULTIPLIER ** (task.poll_count - 1)),
  85. POLL_INTERVAL_MAX,
  86. )
  87. task.next_poll_at = datetime.utcnow() + timedelta(seconds=interval)
  88. db.session.commit()
  89. logger.warning(f"Poll error for {task.task_no}: {e}")
  90. except Exception:
  91. db.session.rollback()
  92. logger.exception(f"Unexpected poll error for {task.task_no}")