import uuid import os import subprocess import logging logger = logging.getLogger(__name__) from datetime import datetime from agent.models.db.agent import Job,JobQueue,QueueJob from sqlalchemy.orm import load_only class Task: def __init__(self, script_path, log_file, args): self.task_id = str(uuid.uuid4()) self.script_path = script_path if isinstance(args, str): args = args.split() self.args = args self.status = "pending" self.start_time = datetime.now() self.end_time = None self.log_file = log_file self.thread = None def check(self): safe_dir = os.path.abspath("scripts") requested_path = os.path.abspath(self.script_path) if not requested_path.startswith(safe_dir): logger.error(f"脚本路径不合法:{requested_path}") return False if not os.path.exists(requested_path): logger.error(f"脚本文件不存在:{requested_path}") return False if not os.path.isfile(requested_path): logger.error(f"脚本路径不是文件:{requested_path}") return False return True def execute(self): self.status = "running" try: with open(self.log_file, 'w', encoding="utf-8") as log: # 添加参数支持 process = subprocess.Popen( ['python', self.script_path] + self.args, stdout=log, stderr=subprocess.STDOUT ) return_code = process.wait(timeout=3600) self.status = f"success:{return_code}" if return_code == 0 else f"error:{return_code}" except subprocess.TimeoutExpired: self.status = "error:timeout" except Exception as e: self.status = f"error:{str(e)}" finally: self.end_time = datetime.now() class AgentBusiness: JOB_STATUS_READY = 0 JOB_STATUS_RUNNING = 1 JOB_STATUS_FINISHED = 2 JOB_STATUS_ERROR = 3 JOB_STATUS_CANCELED = 4 JOB_STATUS_WAITING = 5 JOB_STATUS_PAUSED = 6 JOB_STATUS_RESUMED = 7 JOB_STATUS_RESTARTED = 8 JOB_STATUS_SKIPPED = 9 JOB_STATUS_RETRYING = 10 def __init__(self, db): self.db = db def create_job(self, **kwargs): logger.info(f"create job: {kwargs}") job = Job() job.job_category = "DEFAULT" job.created = datetime.now() job.updated = datetime.now() job.status = self.JOB_STATUS_READY self.append_job_logs(job, f"job created") try: for key, value in kwargs.items(): if hasattr(job, key): setattr(job, key, value) self.db.add(job) self.db.commit() self.db.refresh(job) return job except Exception as e: self.db.rollback() logger.error(f"create job error: {e}") return None def create_queue(self, **kwargs): logger.info(f"create queue: {kwargs}") queue = JobQueue() queue.queue_name = "DEFAULT" queue.queue_category = "DEFAULT" try: for key, value in kwargs.items(): if hasattr(queue, key): setattr(queue, key, value) self.db.add(queue) self.db.commit() self.db.refresh(queue) return queue except Exception as e: self.db.rollback() logger.error(f"create queue error: {e}") return None def get_queue(self, queue_category, queue_name, create_if_not_exist=False): if queue_category is None or queue_name is None: return None if queue_category == "" or queue_name == "": return None logger.info(f"get queue: {queue_name}") queue = self.db.query(JobQueue).filter(JobQueue.queue_name == queue_name, JobQueue.queue_category==queue_category).first() if queue: return queue if create_if_not_exist: return self.create_queue(queue_name=queue_name, queue_category=queue_category) return None def get_queues_summary(self): logger.info(f"get get_queues_summary") queue_data = [] queues = self.db.query(JobQueue).filter(JobQueue.status == 0).all() for queue in queues: job_count = self.db.query(QueueJob).filter(QueueJob.queue_id == queue.id).count() job_finished_count = self.db.query(QueueJob).join(Job, QueueJob.job_id==Job.id).filter(QueueJob.queue_id == queue.id, Job.status == self.JOB_STATUS_FINISHED).count() queue_data.append({"queue_name": queue.queue_name, "queue_category": queue.queue_category, "job_count": job_count, "job_finished_count": job_finished_count}) return queue_data def get_job(self, job_id): logger.info(f"get job: {job_id}") job = self.db.query(Job).filter(Job.id == job_id).first() if job: return job return None def get_job_status(self, job_id): logger.info(f"get job status: {job_id}") job = self.db.query(Job).filter(Job.id == job_id).options(load_only(Job.status)).first() if job: return job.status return -1 def append_job_logs(self, job:Job, logs:str): dt = datetime.now().strftime("%Y-%m-%d %H:%M:%S") if job: job.job_logs = job.job_logs + f"{dt}: {logs}\n" if job.job_logs else f"{dt}: {logs}\n" return job def put_job(self, queue:JobQueue, job:Job): logger.info(f"put job: {queue.queue_name} {job.id}") queue_job = self.db.query(QueueJob).filter(QueueJob.job_id == job.id).first() if queue_job: pass else: queue_job = QueueJob() queue_job.queue_id = queue.id queue_job.job_id = job.id job.job_category = queue.queue_category + "_" + queue.queue_name job = self.append_job_logs(job, f"put job to queue: {queue.queue_category}_{queue.queue_name}") job.status = self.JOB_STATUS_READY try: self.db.add(queue_job) self.db.add(job) self.db.commit() self.db.refresh(queue_job) return queue_job except Exception as e: self.db.rollback() logger.error(f"put job error: {e}") return None def delete_job_in_any_queue(self, job:Job): logger.info(f"delete job in any queue: {job.id}") queue_job = self.db.query(QueueJob).filter(QueueJob.job_id == job.id).first() if queue_job: try: self.db.delete(queue_job) self.db.commit() return True except Exception as e: self.db.rollback() def delete_queue_job(self, queue, job): logger.info(f"delete queue job: {queue.queue_name} {job.id}") queue_job = self.db.query(QueueJob).filter(QueueJob.queue_id == queue.id, QueueJob.job_id == job.id).first() if queue_job: try: self.db.delete(queue_job) self.db.commit() return True except Exception as e: self.db.rollback() logger.error(f"delete queue job error: {e}") return False # def append_job_logs(self, job_id, logs:str): # logger.info(f"append job logs: {job_id} {logs}") # job = self.db.query(Job).filter(Job.id == job_id).first() # if job: # job.job_logs = job.job_logs + logs + "\n" if job.job_logs else logs + "\n" # job.updated = datetime.now() # try: # self.db.commit() # self.db.refresh(job) # return job # except Exception as e: # self.db.rollback() # logger.error(f"append job logs error: {e}") # return None def update_job(self, job_id, **kwargs): logger.info(f"update job: {job_id} ") job = self.db.query(Job).filter(Job.id == job_id).first() if job: for key, value in kwargs.items(): if hasattr(job, key): setattr(job, key, value) job.updated = datetime.now() try: self.db.commit() self.db.refresh(job) return job except Exception as e: self.db.rollback() logger.error(f"update job error: {e}") return None def delete_job(self, job_id): logger.info(f"delete job: {job_id}") job = self.db.query(Job).filter(Job.id == job_id).first() if job: try: self.db.delete(job) self.db.commit() return True except Exception as e: self.db.rollback() logger.error(f"delete job error: {e}") return False def get_jobs(self, job_category, job_creator, limit=50, offset=0): logger.info(f"get jobs: {job_category} {job_creator} {limit} {offset}") if job_category == 'SYSTEM_DEFAULT': jobs = self.db.query(Job)\ .options(load_only(Job.id, Job.job_name, Job.job_category, Job.job_creator, Job.executor, Job.status, Job.created, Job.updated))\ .limit(limit).offset(offset).all() if jobs: return jobs return None jobs = self.db.query(Job)\ .options(load_only(Job.id, Job.job_name, Job.job_category, Job.job_creator, Job.executor, Job.status, Job.created, Job.updated))\ .filter(Job.job_category == job_category, Job.job_creator==job_creator)\ .limit(limit).offset(offset).all() if jobs: return jobs return None def get_job_queue(self, job_id): logger.info(f"get job queue: {job_id}") queue_job = self.db.query(QueueJob).filter(QueueJob.job_id == job_id).first() if queue_job: return queue_job return None def get_queue_jobs_count(self, queue): logger.info(f"get queue jobs count: {queue.queue_name}") count = self.db.query(QueueJob).filter(QueueJob.queue_id == queue.id).count() logger.info(f"get queue jobs count: {count}") return count def get_queue_jobs(self, queue, limit=50, offset=0): logger.info(f"get queue jobs: {queue.queue_name} {limit} {offset}") #根据QueueJob的queue_id和job_id获取Job # jobs = self.db.query(Job)\ if queue.queue_name == 'DEFAULT' and queue.queue_category == 'SYSTEM': jobs = self.db.query(Job)\ .options(load_only(Job.id, Job.job_name, Job.job_category, Job.job_creator, Job.executor, Job.status, Job.created, Job.updated))\ .limit(limit).offset(offset).all() logger.info(f"get queue jobs: {len(jobs)}") return jobs jobs = self.db.query(Job)\ .options(load_only(Job.id, Job.job_name, Job.job_category, Job.job_creator, Job.executor, Job.status, Job.created, Job.updated))\ .join(QueueJob, Job.id == QueueJob.job_id).\ filter(QueueJob.queue_id == queue.id).\ order_by(Job.id.asc()).\ limit(limit).offset(offset).all() logger.info(f"get queue jobs: {len(jobs)}") return jobs