main.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250
  1. import uuid
  2. import os,sys
  3. current_path = os.getcwd()
  4. sys.path.append(current_path)
  5. import subprocess
  6. from datetime import datetime
  7. from agent.db.database import SessionLocal
  8. from agent.models.db.agent import Job
  9. from config.site import SiteConfig
  10. from agent.libs.agent import AgentBusiness
  11. from datetime import datetime
  12. import logging
  13. import re
  14. config = SiteConfig()
  15. logging.basicConfig(level=logging.INFO)
  16. handler = logging.FileHandler('/app/logs/job-executor.log', mode='w',encoding="utf-8")
  17. handler.setLevel(logging.INFO)
  18. formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  19. handler.setFormatter(formatter)
  20. logging.getLogger().addHandler(handler)
  21. logger = logging.getLogger(__name__)
  22. SCRIPT_CONFIG = {
  23. "SYSTEM_WORD": {
  24. 'command': "python", # 脚本路径
  25. 'script':'standard_word_extractor.py',
  26. 'args': [], # 脚本参数
  27. 'success': { 'queue_category': 'SYSTEM', 'queue_name':'CHUNKS'},
  28. 'failed': { 'queue_category': 'SYSTEM', 'queue_name': 'WORD'},
  29. 'error': { 'queue_category': 'SYSTEM', 'queue_name': 'WORD'}
  30. },
  31. "SYSTEM_OCR": {
  32. 'command': "python", # 脚本路径
  33. 'script':'standard_pdf_extractor.py',
  34. 'args': [], # 脚本参数
  35. 'success': { 'queue_category': 'SYSTEM', 'queue_name':'CHUNKS'},
  36. 'failed': { 'queue_category': 'SYSTEM', 'queue_name': 'OCR'},
  37. 'error': { 'queue_category': 'SYSTEM', 'queue_name': 'OCR'}
  38. },
  39. "SYSTEM_CHUNKS": {
  40. 'command': "python", # 脚本路径
  41. 'script':'standard_txt_chunk.py',
  42. 'args': [], # 脚本参数
  43. 'success': { 'queue_category': 'SYSTEM', 'queue_name':'CHUNKS'},
  44. 'failed': { 'queue_category': 'SYSTEM', 'queue_name': 'CHUNKS'},
  45. 'error': { 'queue_category': 'SYSTEM', 'queue_name': 'CHUNKS'}
  46. },
  47. "SYSTEM_KB_EXTRACT": {
  48. 'command': "python", # 脚本路径
  49. 'script':'standard_kb_extractor.py',
  50. 'args': [], # 脚本参数
  51. 'success': { 'queue_category': 'SYSTEM', 'queue_name':'KB_BUILD'},
  52. 'failed': { 'queue_category': 'SYSTEM', 'queue_name': 'KB_EXTRACT'},
  53. 'error': { 'queue_category': 'SYSTEM', 'queue_name': 'KB_EXTRACT'}
  54. },
  55. "SYSTEM_KB_BUILD": {
  56. 'command': "python", # 脚本路径
  57. 'script':'standard_kb_build.py',
  58. 'args': [], # 脚本参数
  59. 'success': { 'queue_category': 'SYSTEM', 'queue_name':'KB_BUILD'},
  60. 'failed': { 'queue_category': 'SYSTEM', 'queue_name': 'KB_BUILD'},
  61. 'error': { 'queue_category': 'SYSTEM', 'queue_name': 'KB_BUILD'}
  62. }
  63. }
  64. EXECUTOR_NAME='AGENT_1'
  65. class ExecutorBase:
  66. def __init__(self, **kwargs):
  67. self.script_path = kwargs.get("script_path", "")
  68. if len(self.script_path) == 0:
  69. current_path = os.path.join("/".join(re.split(r"[\\/]",__file__)[:-1]))
  70. self.script_path = os.path.join(current_path, "job_script")
  71. logger.info("init executor: "+self.script_path)
  72. args = kwargs.get("script_args", "")
  73. log_file = kwargs.get("log_file", "")
  74. if isinstance(args, str):
  75. args = args.split()
  76. self.args = args
  77. self.log_file = log_file
  78. def _get_current_datetime(self):
  79. return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
  80. def _get_log_id(self):
  81. return datetime.now().strftime("%Y%m%d_%H%M%S")
  82. def _run_job(self, job, db):
  83. biz = AgentBusiness(db)
  84. job_path = config.get_config('JOB_PATH')+f"/{job.id}"
  85. logger.info(f"check job path: {job_path}")
  86. if os.path.exists(job_path) == False:
  87. logger.error(f"job path not exists: {job_path}")
  88. return
  89. # 创建日志目录,希望脚本运行的时候在这里输出日志,虽然也不是很一定
  90. os.makedirs(job_path+"/logs", exist_ok=True)
  91. job_category_name = f"{job.job_category}"
  92. if job_category_name == "SYSTEM_DEFAULT":
  93. logger.info(f"job category is SYSTEM_DEFAULT, skipped")
  94. biz.update_job(job.id,
  95. status=AgentBusiness.JOB_STATUS_SKIPPED,
  96. job_logs=job.job_logs+f"{self._get_current_datetime()}: job was skipped because it's category is {job_category_name}\n")
  97. return # 不处理默认的任务
  98. #检查脚本是否存在
  99. if job_category_name in SCRIPT_CONFIG:
  100. script_config = SCRIPT_CONFIG[job_category_name]
  101. logger.info(f"job script config: {script_config}")
  102. script_file = os.path.join(self.script_path, script_config['script'])
  103. if not (os.path.exists(script_file) and os.path.isdir(script_file) == False):
  104. #脚本不存在,结束
  105. logger.error(f"script file not found: {script_file}")
  106. dt = self._get_current_datetime()
  107. biz.update_job(job.id, status=AgentBusiness.JOB_STATUS_ERROR, executor="",job_logs=job.job_logs+f"{dt}: script file not found: {script_file}\n")
  108. return
  109. # 现在可以开始执行了
  110. logger.info(f"start run job")
  111. # 将job状态设置为运行中,并且将executor设置为当前执行器的名称,后续检查executor是否是当前执行器
  112. # 如果不是当前执行器,说明当前执行器已经被其他执行器接管了,需要跳过当前任务
  113. job = biz.update_job(job.id, status=AgentBusiness.JOB_STATUS_RUNNING, executor=EXECUTOR_NAME)
  114. if job is None:
  115. logger.error(f"update job executor error: {job.id} {job.job_name} {job.job_category} {job.status}")
  116. return
  117. if job.executor == EXECUTOR_NAME:
  118. try:
  119. #更新工作日志
  120. job.job_logs = job.job_logs + f"{self._get_current_datetime()}: start run job {job_category_name}\n" if job.job_logs else f"{self._get_current_datetime()}: start run job\n"
  121. job = biz.update_job(job.id, job_logs=job.job_logs)
  122. with open(job_path+f"/logs/{job.id}_{self._get_log_id()}.log", 'w', encoding="utf-8") as log:
  123. # 添加参数支持
  124. command_line = [script_config['command'], script_file] + [job_path] + script_config["args"]
  125. logger.info(f"run job:{command_line}")
  126. process = subprocess.Popen(
  127. command_line,
  128. stdout=log,
  129. stderr=subprocess.STDOUT,
  130. encoding="utf-8"
  131. )
  132. return_code = process.wait(timeout=3600)
  133. #工作执行完毕,要根据返回值来判断是否成功
  134. #返回值为0,成功,返回值为1,失败且需要重试
  135. job.job_logs = job.job_logs + f"{self._get_current_datetime()}: job finished\n" if job.job_logs else f"{self._get_current_datetime()}: job finished\n"
  136. if return_code == 0: #SUCCESS
  137. job.job_logs = job.job_logs + f"{self._get_current_datetime()}: job success\n" if job.job_logs else f"{self._get_current_datetime()}: job success\n"
  138. current_job_status = biz.get_job_status(job.id)
  139. if current_job_status != AgentBusiness.JOB_STATUS_RUNNING:
  140. #如果当前任务状态不是运行中,说明当前任务已经状态被其他执行器修改了,需要跳过更新状态的过程
  141. logger.info(f"job status was changed after set it to JOB_STATUS_RUNNING, skipped")
  142. biz.update_job(job.id, job_logs=job.job_logs)
  143. return
  144. biz.update_job(job.id, status=AgentBusiness.JOB_STATUS_FINISHED, job_logs=job.job_logs)
  145. queue = biz.get_queue(queue_category=script_config["success"]["queue_category"],
  146. queue_name=script_config["success"]["queue_name"])
  147. if queue:
  148. job_queue = biz.get_job_queue(job_id=job.id)
  149. if job_queue is not None and job_queue.queue_id == queue.id:
  150. pass
  151. else:
  152. biz.put_job(queue=queue, job=job)
  153. elif return_code == 1: #FAILED: 脚本执行失败,需要重新执行
  154. job.job_logs = job.job_logs + f"{self._get_current_datetime()}: job retry later\n" if job.job_logs else f"{self._get_current_datetime()}: job failed\n"
  155. current_job_status = biz.get_job_status(job.id)
  156. if current_job_status == AgentBusiness.JOB_STATUS_RUNNING:
  157. logger.info(f"job status is JOB_STATUS_RUNNING, set it to JOB_STATUS_RETRYING")
  158. biz.update_job(job.id, status=AgentBusiness.JOB_STATUS_RETRYING, job_logs=job.job_logs)
  159. return
  160. else:
  161. job.job_logs = job.job_logs + f"{self._get_current_datetime()}: job error: {return_code}\n" if job.job_logs else f"{self._get_current_datetime()}: job error: {return_code}\n"
  162. current_job_status = biz.get_job_status(job.id)
  163. if current_job_status != AgentBusiness.JOB_STATUS_RUNNING:
  164. #如果当前任务状态不是运行中,说明当前任务已经状态被其他执行器修改了,需要跳过更新状态的过程
  165. #也需要跳过转变队列的过程
  166. logger.info(f"job status was changed after set it to JOB_STATUS_RUNNING, skipped")
  167. biz.update_job(job.id, job_logs=job.job_logs)
  168. return
  169. biz.update_job(job.id, status=AgentBusiness.JOB_STATUS_ERROR, job_logs=job.job_logs)
  170. queue = biz.get_queue(queue_category=script_config["failed"]["queue_category"],
  171. queue_name=script_config["failed"]["queue_name"])
  172. if queue:
  173. job_queue = biz.get_job_queue(job_id=job.id)
  174. if job_queue is not None and job_queue.queue_id == queue.id:
  175. pass
  176. else:
  177. biz.put_job(queue=queue, job=job)
  178. except subprocess.TimeoutExpired:
  179. job.job_logs = job.job_logs + f"{self._get_current_datetime()}: job timeout\n" if job.job_logs else f"{self._get_current_datetime()}: job timeout\n"
  180. biz.update_job(job.id, status=AgentBusiness.JOB_STATUS_ERROR, job_logs=job.job_logs)
  181. except Exception as e:
  182. job.job_logs = job.job_logs + f"{self._get_current_datetime()}: job error: {e}\n" if job.job_logs else f"{self._get_current_datetime()}: job error: {e}\n"
  183. logger.error(f"run job error: {e}")
  184. biz.update_job(job.id, status=AgentBusiness.JOB_STATUS_ERROR, job_logs=job.job_logs)
  185. else:
  186. biz.update_job(job.id, status=AgentBusiness.JOB_STATUS_ERROR, job_logs=job.job_logs+f"{self._get_current_datetime()}: job category not found: {job_category_name}\n")
  187. logger.info(f"job category not found: {job_category_name}")
  188. def _format_job_log(self, job:Job):
  189. content = f"[job_id:{job.id}, job_name:'{job.job_name}', job_category:'{job.job_category}', executor:'{job.executor}', status:{job.status}]"
  190. return content
  191. def check_jobs(self):
  192. db = SessionLocal()
  193. biz = AgentBusiness(db)
  194. #这里是对数据库的检查,检查是否有任务执行超时
  195. logger.info("check running jobs timeout")
  196. jobs = db.query(Job).filter(Job.status.in_([AgentBusiness.JOB_STATUS_RUNNING])).all()
  197. for job in jobs:
  198. logger.info(f"check job timeout: {self._format_job_log(job)}")
  199. #超时的时长是5分钟
  200. if job.executor != EXECUTOR_NAME:
  201. logger.info(f"job is timeout, but executor not match")
  202. if (datetime.now() - job.updated).seconds > 300:
  203. logger.info(f"job timeout: {job.id} {job.job_name} {job.job_category} {job.status}")
  204. biz.update_job(job.id, status=AgentBusiness.JOB_STATUS_ERROR, executor="")
  205. #这里是对数据库的检查,检查是否有任务需要执行
  206. #这里没有检查任务的类型
  207. logger.info("check jobs waiting to run")
  208. jobs = db.query(Job).filter(Job.job_category!="SYSTEM_KB_EXTRACT",
  209. Job.status.in_([AgentBusiness.JOB_STATUS_READY, AgentBusiness.JOB_STATUS_RETRYING])).all()
  210. for job in jobs:
  211. logger.info(f"job is ready for launch: {self._format_job_log(job)}")
  212. self._run_job(job=job, db = db)
  213. logger.info("check jobs finished")
  214. db.close()
  215. def check_kb_extract_jobs(self):
  216. #这里是对数据库的检查,检查是否有任务执行超时
  217. db = SessionLocal()
  218. logger.info("check_kb_extract_jobs waiting to run")
  219. jobs = db.query(Job).filter(Job.job_category=="SYSTEM_KB_EXTRACT",
  220. Job.status.in_([AgentBusiness.JOB_STATUS_READY, AgentBusiness.JOB_STATUS_RETRYING])).all()
  221. for job in jobs:
  222. logger.info(f"job is ready for launch: {self._format_job_log(job)}")
  223. self._run_job(job=job, db=db)
  224. logger.info("check_kb_extract_jobs finished")
  225. db.close()
  226. if __name__ == "__main__":
  227. executor = ExecutorBase(script_args=["-a", "1", "-b", "2"])
  228. from apscheduler.schedulers.background import BlockingScheduler
  229. scheduler = BlockingScheduler()
  230. scheduler.add_job(executor.check_jobs, 'interval', seconds=10)
  231. scheduler.add_job(executor.check_kb_extract_jobs, 'interval', seconds=10)
  232. scheduler.start()