123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292 |
- import uuid
- import os
- import subprocess
- import logging
- logger = logging.getLogger(__name__)
- from datetime import datetime
- from agent.models.db.agent import Job,JobQueue,QueueJob
- from sqlalchemy.orm import load_only
- class Task:
- def __init__(self, script_path, log_file, args):
- self.task_id = str(uuid.uuid4())
- self.script_path = script_path
- if isinstance(args, str):
- args = args.split()
- self.args = args
- self.status = "pending"
- self.start_time = datetime.now()
- self.end_time = None
- self.log_file = log_file
- self.thread = None
- def check(self):
- safe_dir = os.path.abspath("scripts")
- requested_path = os.path.abspath(self.script_path)
- if not requested_path.startswith(safe_dir):
- logger.error(f"脚本路径不合法:{requested_path}")
- return False
- if not os.path.exists(requested_path):
- logger.error(f"脚本文件不存在:{requested_path}")
- return False
- if not os.path.isfile(requested_path):
- logger.error(f"脚本路径不是文件:{requested_path}")
- return False
- return True
- def execute(self):
- self.status = "running"
- try:
- with open(self.log_file, 'w', encoding="utf-8") as log:
- # 添加参数支持
- process = subprocess.Popen(
- ['python', self.script_path] + self.args,
- stdout=log,
- stderr=subprocess.STDOUT
- )
- return_code = process.wait(timeout=3600)
- self.status = f"success:{return_code}" if return_code == 0 else f"error:{return_code}"
- except subprocess.TimeoutExpired:
- self.status = "error:timeout"
- except Exception as e:
- self.status = f"error:{str(e)}"
- finally:
- self.end_time = datetime.now()
-
-
- class AgentBusiness:
- JOB_STATUS_READY = 0
- JOB_STATUS_RUNNING = 1
- JOB_STATUS_FINISHED = 2
- JOB_STATUS_ERROR = 3
- JOB_STATUS_CANCELED = 4
- JOB_STATUS_WAITING = 5
- JOB_STATUS_PAUSED = 6
- JOB_STATUS_RESUMED = 7
- JOB_STATUS_RESTARTED = 8
- JOB_STATUS_SKIPPED = 9
- JOB_STATUS_RETRYING = 10
- def __init__(self, db):
- self.db = db
- def create_job(self, **kwargs):
- logger.info(f"create job: {kwargs}")
- job = Job()
- job.job_category = "DEFAULT"
- job.created = datetime.now()
- job.updated = datetime.now()
- job.status = self.JOB_STATUS_READY
- self.append_job_logs(job, f"job created")
- try:
- for key, value in kwargs.items():
- if hasattr(job, key):
- setattr(job, key, value)
- self.db.add(job)
- self.db.commit()
- self.db.refresh(job)
- return job
- except Exception as e:
- self.db.rollback()
- logger.error(f"create job error: {e}")
- return None
-
- def create_queue(self, **kwargs):
- logger.info(f"create queue: {kwargs}")
- queue = JobQueue()
- queue.queue_name = "DEFAULT"
- queue.queue_category = "DEFAULT"
- try:
- for key, value in kwargs.items():
- if hasattr(queue, key):
- setattr(queue, key, value)
- self.db.add(queue)
- self.db.commit()
- self.db.refresh(queue)
- return queue
- except Exception as e:
- self.db.rollback()
- logger.error(f"create queue error: {e}")
- return None
-
- def get_queue(self, queue_category, queue_name, create_if_not_exist=False):
- if queue_category is None or queue_name is None:
- return None
- if queue_category == "" or queue_name == "":
- return None
- logger.info(f"get queue: {queue_name}")
- queue = self.db.query(JobQueue).filter(JobQueue.queue_name == queue_name,
- JobQueue.queue_category==queue_category).first()
- if queue:
- return queue
- if create_if_not_exist:
- return self.create_queue(queue_name=queue_name, queue_category=queue_category)
- return None
-
- def get_queues_summary(self):
- logger.info(f"get get_queues_summary")
- queue_data = []
- queues = self.db.query(JobQueue).filter(JobQueue.status == 0).all()
- for queue in queues:
- job_count = self.db.query(QueueJob).filter(QueueJob.queue_id == queue.id).count()
- job_finished_count = self.db.query(QueueJob).join(Job, QueueJob.job_id==Job.id).filter(QueueJob.queue_id == queue.id, Job.status == self.JOB_STATUS_FINISHED).count()
- queue_data.append({"queue_name": queue.queue_name,
- "queue_category": queue.queue_category,
- "job_count": job_count,
- "job_finished_count": job_finished_count})
- return queue_data
-
- def get_job(self, job_id):
- logger.info(f"get job: {job_id}")
- job = self.db.query(Job).filter(Job.id == job_id).first()
- if job:
- return job
- return None
- def get_job_status(self, job_id):
- logger.info(f"get job status: {job_id}")
- job = self.db.query(Job).filter(Job.id == job_id).options(load_only(Job.status)).first()
- if job:
- return job.status
- return -1
- def append_job_logs(self, job:Job, logs:str):
- dt = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
- if job:
- job.job_logs = job.job_logs + f"{dt}: {logs}\n" if job.job_logs else f"{dt}: {logs}\n"
- return job
- def put_job(self, queue:JobQueue, job:Job):
- logger.info(f"put job: {queue.queue_name} {job.id}")
- queue_job = self.db.query(QueueJob).filter(QueueJob.job_id == job.id).first()
- if queue_job:
- pass
- else:
- queue_job = QueueJob()
- queue_job.queue_id = queue.id
- queue_job.job_id = job.id
- job.job_category = queue.queue_category + "_" + queue.queue_name
- job = self.append_job_logs(job, f"put job to queue: {queue.queue_category}_{queue.queue_name}")
- job.status = self.JOB_STATUS_READY
- try:
- self.db.add(queue_job)
- self.db.add(job)
- self.db.commit()
- self.db.refresh(queue_job)
- return queue_job
- except Exception as e:
- self.db.rollback()
- logger.error(f"put job error: {e}")
- return None
-
- def delete_job_in_any_queue(self, job:Job):
- logger.info(f"delete job in any queue: {job.id}")
- queue_job = self.db.query(QueueJob).filter(QueueJob.job_id == job.id).first()
- if queue_job:
- try:
- self.db.delete(queue_job)
- self.db.commit()
- return True
- except Exception as e:
- self.db.rollback()
- def delete_queue_job(self, queue, job):
- logger.info(f"delete queue job: {queue.queue_name} {job.id}")
- queue_job = self.db.query(QueueJob).filter(QueueJob.queue_id == queue.id, QueueJob.job_id == job.id).first()
- if queue_job:
- try:
- self.db.delete(queue_job)
- self.db.commit()
- return True
- except Exception as e:
- self.db.rollback()
- logger.error(f"delete queue job error: {e}")
- return False
-
- # def append_job_logs(self, job_id, logs:str):
- # logger.info(f"append job logs: {job_id} {logs}")
- # job = self.db.query(Job).filter(Job.id == job_id).first()
- # if job:
- # job.job_logs = job.job_logs + logs + "\n" if job.job_logs else logs + "\n"
- # job.updated = datetime.now()
- # try:
- # self.db.commit()
- # self.db.refresh(job)
- # return job
- # except Exception as e:
- # self.db.rollback()
- # logger.error(f"append job logs error: {e}")
- # return None
- def update_job(self, job_id, **kwargs):
- logger.info(f"update job: {job_id} ")
- job = self.db.query(Job).filter(Job.id == job_id).first()
- if job:
- for key, value in kwargs.items():
- if hasattr(job, key):
- setattr(job, key, value)
- job.updated = datetime.now()
- try:
- self.db.commit()
- self.db.refresh(job)
- return job
- except Exception as e:
- self.db.rollback()
- logger.error(f"update job error: {e}")
- return None
-
- def delete_job(self, job_id):
- logger.info(f"delete job: {job_id}")
- job = self.db.query(Job).filter(Job.id == job_id).first()
- if job:
- try:
- self.db.delete(job)
- self.db.commit()
- return True
- except Exception as e:
- self.db.rollback()
- logger.error(f"delete job error: {e}")
- return False
- def get_jobs(self, job_category, job_creator, limit=50, offset=0):
- logger.info(f"get jobs: {job_category} {job_creator} {limit} {offset}")
- if job_category == 'SYSTEM_DEFAULT':
- jobs = self.db.query(Job)\
- .options(load_only(Job.id, Job.job_name, Job.job_category, Job.job_creator, Job.executor, Job.status, Job.created, Job.updated))\
- .limit(limit).offset(offset).all()
- if jobs:
- return jobs
- return None
- jobs = self.db.query(Job)\
- .options(load_only(Job.id, Job.job_name, Job.job_category, Job.job_creator, Job.executor, Job.status, Job.created, Job.updated))\
- .filter(Job.job_category == job_category, Job.job_creator==job_creator)\
- .limit(limit).offset(offset).all()
- if jobs:
- return jobs
- return None
- def get_job_queue(self, job_id):
- logger.info(f"get job queue: {job_id}")
- queue_job = self.db.query(QueueJob).filter(QueueJob.job_id == job_id).first()
- if queue_job:
- return queue_job
- return None
- def get_queue_jobs_count(self, queue):
- logger.info(f"get queue jobs count: {queue.queue_name}")
- count = self.db.query(QueueJob).filter(QueueJob.queue_id == queue.id).count()
- logger.info(f"get queue jobs count: {count}")
- return count
- def get_queue_jobs(self, queue, limit=50, offset=0):
- logger.info(f"get queue jobs: {queue.queue_name} {limit} {offset}")
- #根据QueueJob的queue_id和job_id获取Job
- # jobs = self.db.query(Job)\
- if queue.queue_name == 'DEFAULT' and queue.queue_category == 'SYSTEM':
- jobs = self.db.query(Job)\
- .options(load_only(Job.id, Job.job_name, Job.job_category, Job.job_creator, Job.executor, Job.status, Job.created, Job.updated))\
- .limit(limit).offset(offset).all()
- logger.info(f"get queue jobs: {len(jobs)}")
- return jobs
- jobs = self.db.query(Job)\
- .options(load_only(Job.id, Job.job_name, Job.job_category, Job.job_creator, Job.executor, Job.status, Job.created, Job.updated))\
- .join(QueueJob, Job.id == QueueJob.job_id).\
- filter(QueueJob.queue_id == queue.id).\
- order_by(Job.id.asc()).\
- limit(limit).offset(offset).all()
-
- logger.info(f"get queue jobs: {len(jobs)}")
- return jobs
-
|