123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250 |
- import uuid
- import os,sys
- current_path = os.getcwd()
- sys.path.append(current_path)
- import subprocess
- from datetime import datetime
- from agent.db.database import SessionLocal
- from agent.models.db.agent import Job
- from config.site import SiteConfig
- from agent.libs.agent import AgentBusiness
- from datetime import datetime
- import logging
- import re
- config = SiteConfig()
- logging.basicConfig(level=logging.INFO)
- handler = logging.FileHandler('/app/logs/job-executor.log', mode='w',encoding="utf-8")
- handler.setLevel(logging.INFO)
- formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
- handler.setFormatter(formatter)
- logging.getLogger().addHandler(handler)
- logger = logging.getLogger(__name__)
- SCRIPT_CONFIG = {
- "SYSTEM_WORD": {
- 'command': "python", # 脚本路径
- 'script':'standard_word_extractor.py',
- 'args': [], # 脚本参数
- 'success': { 'queue_category': 'SYSTEM', 'queue_name':'CHUNKS'},
- 'failed': { 'queue_category': 'SYSTEM', 'queue_name': 'WORD'},
- 'error': { 'queue_category': 'SYSTEM', 'queue_name': 'WORD'}
- },
- "SYSTEM_OCR": {
- 'command': "python", # 脚本路径
- 'script':'standard_pdf_extractor.py',
- 'args': [], # 脚本参数
- 'success': { 'queue_category': 'SYSTEM', 'queue_name':'CHUNKS'},
- 'failed': { 'queue_category': 'SYSTEM', 'queue_name': 'OCR'},
- 'error': { 'queue_category': 'SYSTEM', 'queue_name': 'OCR'}
- },
- "SYSTEM_CHUNKS": {
- 'command': "python", # 脚本路径
- 'script':'standard_txt_chunk.py',
- 'args': [], # 脚本参数
- 'success': { 'queue_category': 'SYSTEM', 'queue_name':'CHUNKS'},
- 'failed': { 'queue_category': 'SYSTEM', 'queue_name': 'CHUNKS'},
- 'error': { 'queue_category': 'SYSTEM', 'queue_name': 'CHUNKS'}
- },
- "SYSTEM_KB_EXTRACT": {
- 'command': "python", # 脚本路径
- 'script':'standard_kb_extractor.py',
- 'args': [], # 脚本参数
- 'success': { 'queue_category': 'SYSTEM', 'queue_name':'KB_BUILD'},
- 'failed': { 'queue_category': 'SYSTEM', 'queue_name': 'KB_EXTRACT'},
- 'error': { 'queue_category': 'SYSTEM', 'queue_name': 'KB_EXTRACT'}
- },
- "SYSTEM_KB_BUILD": {
- 'command': "python", # 脚本路径
- 'script':'standard_kb_build.py',
- 'args': [], # 脚本参数
- 'success': { 'queue_category': 'SYSTEM', 'queue_name':'KB_BUILD'},
- 'failed': { 'queue_category': 'SYSTEM', 'queue_name': 'KB_BUILD'},
- 'error': { 'queue_category': 'SYSTEM', 'queue_name': 'KB_BUILD'}
- }
- }
- EXECUTOR_NAME='AGENT_1'
- class ExecutorBase:
- def __init__(self, **kwargs):
- self.script_path = kwargs.get("script_path", "")
- if len(self.script_path) == 0:
- current_path = os.path.join("/".join(re.split(r"[\\/]",__file__)[:-1]))
- self.script_path = os.path.join(current_path, "job_script")
- logger.info("init executor: "+self.script_path)
- args = kwargs.get("script_args", "")
- log_file = kwargs.get("log_file", "")
-
- if isinstance(args, str):
- args = args.split()
- self.args = args
- self.log_file = log_file
-
- def _get_current_datetime(self):
- return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
- def _get_log_id(self):
- return datetime.now().strftime("%Y%m%d_%H%M%S")
-
- def _run_job(self, job, db):
- biz = AgentBusiness(db)
- job_path = config.get_config('JOB_PATH')+f"/{job.id}"
- logger.info(f"check job path: {job_path}")
- if os.path.exists(job_path) == False:
- logger.error(f"job path not exists: {job_path}")
- return
- # 创建日志目录,希望脚本运行的时候在这里输出日志,虽然也不是很一定
- os.makedirs(job_path+"/logs", exist_ok=True)
- job_category_name = f"{job.job_category}"
- if job_category_name == "SYSTEM_DEFAULT":
- logger.info(f"job category is SYSTEM_DEFAULT, skipped")
- biz.update_job(job.id,
- status=AgentBusiness.JOB_STATUS_SKIPPED,
- job_logs=job.job_logs+f"{self._get_current_datetime()}: job was skipped because it's category is {job_category_name}\n")
- return # 不处理默认的任务
- #检查脚本是否存在
- if job_category_name in SCRIPT_CONFIG:
- script_config = SCRIPT_CONFIG[job_category_name]
- logger.info(f"job script config: {script_config}")
- script_file = os.path.join(self.script_path, script_config['script'])
- if not (os.path.exists(script_file) and os.path.isdir(script_file) == False):
- #脚本不存在,结束
- logger.error(f"script file not found: {script_file}")
- dt = self._get_current_datetime()
-
- biz.update_job(job.id, status=AgentBusiness.JOB_STATUS_ERROR, executor="",job_logs=job.job_logs+f"{dt}: script file not found: {script_file}\n")
- return
- # 现在可以开始执行了
- logger.info(f"start run job")
- # 将job状态设置为运行中,并且将executor设置为当前执行器的名称,后续检查executor是否是当前执行器
- # 如果不是当前执行器,说明当前执行器已经被其他执行器接管了,需要跳过当前任务
- job = biz.update_job(job.id, status=AgentBusiness.JOB_STATUS_RUNNING, executor=EXECUTOR_NAME)
- if job is None:
- logger.error(f"update job executor error: {job.id} {job.job_name} {job.job_category} {job.status}")
- return
- if job.executor == EXECUTOR_NAME:
- try:
- #更新工作日志
- job.job_logs = job.job_logs + f"{self._get_current_datetime()}: start run job {job_category_name}\n" if job.job_logs else f"{self._get_current_datetime()}: start run job\n"
- job = biz.update_job(job.id, job_logs=job.job_logs)
-
- with open(job_path+f"/logs/{job.id}_{self._get_log_id()}.log", 'w', encoding="utf-8") as log:
- # 添加参数支持
- command_line = [script_config['command'], script_file] + [job_path] + script_config["args"]
- logger.info(f"run job:{command_line}")
- process = subprocess.Popen(
- command_line,
- stdout=log,
- stderr=subprocess.STDOUT,
- encoding="utf-8"
- )
- return_code = process.wait(timeout=3600)
- #工作执行完毕,要根据返回值来判断是否成功
- #返回值为0,成功,返回值为1,失败且需要重试
- job.job_logs = job.job_logs + f"{self._get_current_datetime()}: job finished\n" if job.job_logs else f"{self._get_current_datetime()}: job finished\n"
- if return_code == 0: #SUCCESS
- job.job_logs = job.job_logs + f"{self._get_current_datetime()}: job success\n" if job.job_logs else f"{self._get_current_datetime()}: job success\n"
- current_job_status = biz.get_job_status(job.id)
- if current_job_status != AgentBusiness.JOB_STATUS_RUNNING:
- #如果当前任务状态不是运行中,说明当前任务已经状态被其他执行器修改了,需要跳过更新状态的过程
- logger.info(f"job status was changed after set it to JOB_STATUS_RUNNING, skipped")
- biz.update_job(job.id, job_logs=job.job_logs)
- return
- biz.update_job(job.id, status=AgentBusiness.JOB_STATUS_FINISHED, job_logs=job.job_logs)
- queue = biz.get_queue(queue_category=script_config["success"]["queue_category"],
- queue_name=script_config["success"]["queue_name"])
- if queue:
- job_queue = biz.get_job_queue(job_id=job.id)
- if job_queue is not None and job_queue.queue_id == queue.id:
- pass
- else:
- biz.put_job(queue=queue, job=job)
- elif return_code == 1: #FAILED: 脚本执行失败,需要重新执行
- job.job_logs = job.job_logs + f"{self._get_current_datetime()}: job retry later\n" if job.job_logs else f"{self._get_current_datetime()}: job failed\n"
- current_job_status = biz.get_job_status(job.id)
- if current_job_status == AgentBusiness.JOB_STATUS_RUNNING:
- logger.info(f"job status is JOB_STATUS_RUNNING, set it to JOB_STATUS_RETRYING")
- biz.update_job(job.id, status=AgentBusiness.JOB_STATUS_RETRYING, job_logs=job.job_logs)
- return
- else:
- job.job_logs = job.job_logs + f"{self._get_current_datetime()}: job error: {return_code}\n" if job.job_logs else f"{self._get_current_datetime()}: job error: {return_code}\n"
- current_job_status = biz.get_job_status(job.id)
- if current_job_status != AgentBusiness.JOB_STATUS_RUNNING:
- #如果当前任务状态不是运行中,说明当前任务已经状态被其他执行器修改了,需要跳过更新状态的过程
- #也需要跳过转变队列的过程
- logger.info(f"job status was changed after set it to JOB_STATUS_RUNNING, skipped")
- biz.update_job(job.id, job_logs=job.job_logs)
- return
- biz.update_job(job.id, status=AgentBusiness.JOB_STATUS_ERROR, job_logs=job.job_logs)
- queue = biz.get_queue(queue_category=script_config["failed"]["queue_category"],
- queue_name=script_config["failed"]["queue_name"])
- if queue:
- job_queue = biz.get_job_queue(job_id=job.id)
- if job_queue is not None and job_queue.queue_id == queue.id:
- pass
- else:
- biz.put_job(queue=queue, job=job)
- except subprocess.TimeoutExpired:
- job.job_logs = job.job_logs + f"{self._get_current_datetime()}: job timeout\n" if job.job_logs else f"{self._get_current_datetime()}: job timeout\n"
- biz.update_job(job.id, status=AgentBusiness.JOB_STATUS_ERROR, job_logs=job.job_logs)
- except Exception as e:
- job.job_logs = job.job_logs + f"{self._get_current_datetime()}: job error: {e}\n" if job.job_logs else f"{self._get_current_datetime()}: job error: {e}\n"
- logger.error(f"run job error: {e}")
- biz.update_job(job.id, status=AgentBusiness.JOB_STATUS_ERROR, job_logs=job.job_logs)
- else:
- biz.update_job(job.id, status=AgentBusiness.JOB_STATUS_ERROR, job_logs=job.job_logs+f"{self._get_current_datetime()}: job category not found: {job_category_name}\n")
- logger.info(f"job category not found: {job_category_name}")
- def _format_job_log(self, job:Job):
- content = f"[job_id:{job.id}, job_name:'{job.job_name}', job_category:'{job.job_category}', executor:'{job.executor}', status:{job.status}]"
- return content
- def check_jobs(self):
- db = SessionLocal()
- biz = AgentBusiness(db)
- #这里是对数据库的检查,检查是否有任务执行超时
- logger.info("check running jobs timeout")
- jobs = db.query(Job).filter(Job.status.in_([AgentBusiness.JOB_STATUS_RUNNING])).all()
- for job in jobs:
- logger.info(f"check job timeout: {self._format_job_log(job)}")
- #超时的时长是5分钟
- if job.executor != EXECUTOR_NAME:
- logger.info(f"job is timeout, but executor not match")
- if (datetime.now() - job.updated).seconds > 300:
- logger.info(f"job timeout: {job.id} {job.job_name} {job.job_category} {job.status}")
- biz.update_job(job.id, status=AgentBusiness.JOB_STATUS_ERROR, executor="")
- #这里是对数据库的检查,检查是否有任务需要执行
- #这里没有检查任务的类型
-
- logger.info("check jobs waiting to run")
- jobs = db.query(Job).filter(Job.job_category!="SYSTEM_KB_EXTRACT",
- Job.status.in_([AgentBusiness.JOB_STATUS_READY, AgentBusiness.JOB_STATUS_RETRYING])).all()
- for job in jobs:
- logger.info(f"job is ready for launch: {self._format_job_log(job)}")
- self._run_job(job=job, db = db)
- logger.info("check jobs finished")
- db.close()
- def check_kb_extract_jobs(self):
- #这里是对数据库的检查,检查是否有任务执行超时
- db = SessionLocal()
- logger.info("check_kb_extract_jobs waiting to run")
- jobs = db.query(Job).filter(Job.job_category=="SYSTEM_KB_EXTRACT",
- Job.status.in_([AgentBusiness.JOB_STATUS_READY, AgentBusiness.JOB_STATUS_RETRYING])).all()
- for job in jobs:
- logger.info(f"job is ready for launch: {self._format_job_log(job)}")
- self._run_job(job=job, db=db)
- logger.info("check_kb_extract_jobs finished")
- db.close()
-
-
- if __name__ == "__main__":
- executor = ExecutorBase(script_args=["-a", "1", "-b", "2"])
-
- from apscheduler.schedulers.background import BlockingScheduler
- scheduler = BlockingScheduler()
- scheduler.add_job(executor.check_jobs, 'interval', seconds=10)
- scheduler.add_job(executor.check_kb_extract_jobs, 'interval', seconds=10)
- scheduler.start()
|