agent.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292
  1. import uuid
  2. import os
  3. import subprocess
  4. import logging
  5. logger = logging.getLogger(__name__)
  6. from datetime import datetime
  7. from agent.models.db.agent import Job,JobQueue,QueueJob
  8. from sqlalchemy.orm import load_only
  9. class Task:
  10. def __init__(self, script_path, log_file, args):
  11. self.task_id = str(uuid.uuid4())
  12. self.script_path = script_path
  13. if isinstance(args, str):
  14. args = args.split()
  15. self.args = args
  16. self.status = "pending"
  17. self.start_time = datetime.now()
  18. self.end_time = None
  19. self.log_file = log_file
  20. self.thread = None
  21. def check(self):
  22. safe_dir = os.path.abspath("scripts")
  23. requested_path = os.path.abspath(self.script_path)
  24. if not requested_path.startswith(safe_dir):
  25. logger.error(f"脚本路径不合法:{requested_path}")
  26. return False
  27. if not os.path.exists(requested_path):
  28. logger.error(f"脚本文件不存在:{requested_path}")
  29. return False
  30. if not os.path.isfile(requested_path):
  31. logger.error(f"脚本路径不是文件:{requested_path}")
  32. return False
  33. return True
  34. def execute(self):
  35. self.status = "running"
  36. try:
  37. with open(self.log_file, 'w', encoding="utf-8") as log:
  38. # 添加参数支持
  39. process = subprocess.Popen(
  40. ['python', self.script_path] + self.args,
  41. stdout=log,
  42. stderr=subprocess.STDOUT
  43. )
  44. return_code = process.wait(timeout=3600)
  45. self.status = f"success:{return_code}" if return_code == 0 else f"error:{return_code}"
  46. except subprocess.TimeoutExpired:
  47. self.status = "error:timeout"
  48. except Exception as e:
  49. self.status = f"error:{str(e)}"
  50. finally:
  51. self.end_time = datetime.now()
  52. class AgentBusiness:
  53. JOB_STATUS_READY = 0
  54. JOB_STATUS_RUNNING = 1
  55. JOB_STATUS_FINISHED = 2
  56. JOB_STATUS_ERROR = 3
  57. JOB_STATUS_CANCELED = 4
  58. JOB_STATUS_WAITING = 5
  59. JOB_STATUS_PAUSED = 6
  60. JOB_STATUS_RESUMED = 7
  61. JOB_STATUS_RESTARTED = 8
  62. JOB_STATUS_SKIPPED = 9
  63. JOB_STATUS_RETRYING = 10
  64. def __init__(self, db):
  65. self.db = db
  66. def create_job(self, **kwargs):
  67. logger.info(f"create job: {kwargs}")
  68. job = Job()
  69. job.job_category = "DEFAULT"
  70. job.created = datetime.now()
  71. job.updated = datetime.now()
  72. job.status = self.JOB_STATUS_READY
  73. self.append_job_logs(job, f"job created")
  74. try:
  75. for key, value in kwargs.items():
  76. if hasattr(job, key):
  77. setattr(job, key, value)
  78. self.db.add(job)
  79. self.db.commit()
  80. self.db.refresh(job)
  81. return job
  82. except Exception as e:
  83. self.db.rollback()
  84. logger.error(f"create job error: {e}")
  85. return None
  86. def create_queue(self, **kwargs):
  87. logger.info(f"create queue: {kwargs}")
  88. queue = JobQueue()
  89. queue.queue_name = "DEFAULT"
  90. queue.queue_category = "DEFAULT"
  91. try:
  92. for key, value in kwargs.items():
  93. if hasattr(queue, key):
  94. setattr(queue, key, value)
  95. self.db.add(queue)
  96. self.db.commit()
  97. self.db.refresh(queue)
  98. return queue
  99. except Exception as e:
  100. self.db.rollback()
  101. logger.error(f"create queue error: {e}")
  102. return None
  103. def get_queue(self, queue_category, queue_name, create_if_not_exist=False):
  104. if queue_category is None or queue_name is None:
  105. return None
  106. if queue_category == "" or queue_name == "":
  107. return None
  108. logger.info(f"get queue: {queue_name}")
  109. queue = self.db.query(JobQueue).filter(JobQueue.queue_name == queue_name,
  110. JobQueue.queue_category==queue_category).first()
  111. if queue:
  112. return queue
  113. if create_if_not_exist:
  114. return self.create_queue(queue_name=queue_name, queue_category=queue_category)
  115. return None
  116. def get_queues_summary(self):
  117. logger.info(f"get get_queues_summary")
  118. queue_data = []
  119. queues = self.db.query(JobQueue).filter(JobQueue.status == 0).all()
  120. for queue in queues:
  121. job_count = self.db.query(QueueJob).filter(QueueJob.queue_id == queue.id).count()
  122. job_finished_count = self.db.query(QueueJob).join(Job, QueueJob.job_id==Job.id).filter(QueueJob.queue_id == queue.id, Job.status == self.JOB_STATUS_FINISHED).count()
  123. queue_data.append({"queue_name": queue.queue_name,
  124. "queue_category": queue.queue_category,
  125. "job_count": job_count,
  126. "job_finished_count": job_finished_count})
  127. return queue_data
  128. def get_job(self, job_id):
  129. logger.info(f"get job: {job_id}")
  130. job = self.db.query(Job).filter(Job.id == job_id).first()
  131. if job:
  132. return job
  133. return None
  134. def get_job_status(self, job_id):
  135. logger.info(f"get job status: {job_id}")
  136. job = self.db.query(Job).filter(Job.id == job_id).options(load_only(Job.status)).first()
  137. if job:
  138. return job.status
  139. return -1
  140. def append_job_logs(self, job:Job, logs:str):
  141. dt = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
  142. if job:
  143. job.job_logs = job.job_logs + f"{dt}: {logs}\n" if job.job_logs else f"{dt}: {logs}\n"
  144. return job
  145. def put_job(self, queue:JobQueue, job:Job):
  146. logger.info(f"put job: {queue.queue_name} {job.id}")
  147. queue_job = self.db.query(QueueJob).filter(QueueJob.job_id == job.id).first()
  148. if queue_job:
  149. pass
  150. else:
  151. queue_job = QueueJob()
  152. queue_job.queue_id = queue.id
  153. queue_job.job_id = job.id
  154. job.job_category = queue.queue_category + "_" + queue.queue_name
  155. job = self.append_job_logs(job, f"put job to queue: {queue.queue_category}_{queue.queue_name}")
  156. job.status = self.JOB_STATUS_READY
  157. try:
  158. self.db.add(queue_job)
  159. self.db.add(job)
  160. self.db.commit()
  161. self.db.refresh(queue_job)
  162. return queue_job
  163. except Exception as e:
  164. self.db.rollback()
  165. logger.error(f"put job error: {e}")
  166. return None
  167. def delete_job_in_any_queue(self, job:Job):
  168. logger.info(f"delete job in any queue: {job.id}")
  169. queue_job = self.db.query(QueueJob).filter(QueueJob.job_id == job.id).first()
  170. if queue_job:
  171. try:
  172. self.db.delete(queue_job)
  173. self.db.commit()
  174. return True
  175. except Exception as e:
  176. self.db.rollback()
  177. def delete_queue_job(self, queue, job):
  178. logger.info(f"delete queue job: {queue.queue_name} {job.id}")
  179. queue_job = self.db.query(QueueJob).filter(QueueJob.queue_id == queue.id, QueueJob.job_id == job.id).first()
  180. if queue_job:
  181. try:
  182. self.db.delete(queue_job)
  183. self.db.commit()
  184. return True
  185. except Exception as e:
  186. self.db.rollback()
  187. logger.error(f"delete queue job error: {e}")
  188. return False
  189. # def append_job_logs(self, job_id, logs:str):
  190. # logger.info(f"append job logs: {job_id} {logs}")
  191. # job = self.db.query(Job).filter(Job.id == job_id).first()
  192. # if job:
  193. # job.job_logs = job.job_logs + logs + "\n" if job.job_logs else logs + "\n"
  194. # job.updated = datetime.now()
  195. # try:
  196. # self.db.commit()
  197. # self.db.refresh(job)
  198. # return job
  199. # except Exception as e:
  200. # self.db.rollback()
  201. # logger.error(f"append job logs error: {e}")
  202. # return None
  203. def update_job(self, job_id, **kwargs):
  204. logger.info(f"update job: {job_id} ")
  205. job = self.db.query(Job).filter(Job.id == job_id).first()
  206. if job:
  207. for key, value in kwargs.items():
  208. if hasattr(job, key):
  209. setattr(job, key, value)
  210. job.updated = datetime.now()
  211. try:
  212. self.db.commit()
  213. self.db.refresh(job)
  214. return job
  215. except Exception as e:
  216. self.db.rollback()
  217. logger.error(f"update job error: {e}")
  218. return None
  219. def delete_job(self, job_id):
  220. logger.info(f"delete job: {job_id}")
  221. job = self.db.query(Job).filter(Job.id == job_id).first()
  222. if job:
  223. try:
  224. self.db.delete(job)
  225. self.db.commit()
  226. return True
  227. except Exception as e:
  228. self.db.rollback()
  229. logger.error(f"delete job error: {e}")
  230. return False
  231. def get_jobs(self, job_category, job_creator, limit=50, offset=0):
  232. logger.info(f"get jobs: {job_category} {job_creator} {limit} {offset}")
  233. if job_category == 'SYSTEM_DEFAULT':
  234. jobs = self.db.query(Job)\
  235. .options(load_only(Job.id, Job.job_name, Job.job_category, Job.job_creator, Job.executor, Job.status, Job.created, Job.updated))\
  236. .limit(limit).offset(offset).all()
  237. if jobs:
  238. return jobs
  239. return None
  240. jobs = self.db.query(Job)\
  241. .options(load_only(Job.id, Job.job_name, Job.job_category, Job.job_creator, Job.executor, Job.status, Job.created, Job.updated))\
  242. .filter(Job.job_category == job_category, Job.job_creator==job_creator)\
  243. .limit(limit).offset(offset).all()
  244. if jobs:
  245. return jobs
  246. return None
  247. def get_job_queue(self, job_id):
  248. logger.info(f"get job queue: {job_id}")
  249. queue_job = self.db.query(QueueJob).filter(QueueJob.job_id == job_id).first()
  250. if queue_job:
  251. return queue_job
  252. return None
  253. def get_queue_jobs_count(self, queue):
  254. logger.info(f"get queue jobs count: {queue.queue_name}")
  255. count = self.db.query(QueueJob).filter(QueueJob.queue_id == queue.id).count()
  256. logger.info(f"get queue jobs count: {count}")
  257. return count
  258. def get_queue_jobs(self, queue, limit=50, offset=0):
  259. logger.info(f"get queue jobs: {queue.queue_name} {limit} {offset}")
  260. #根据QueueJob的queue_id和job_id获取Job
  261. # jobs = self.db.query(Job)\
  262. if queue.queue_name == 'DEFAULT' and queue.queue_category == 'SYSTEM':
  263. jobs = self.db.query(Job)\
  264. .options(load_only(Job.id, Job.job_name, Job.job_category, Job.job_creator, Job.executor, Job.status, Job.created, Job.updated))\
  265. .limit(limit).offset(offset).all()
  266. logger.info(f"get queue jobs: {len(jobs)}")
  267. return jobs
  268. jobs = self.db.query(Job)\
  269. .options(load_only(Job.id, Job.job_name, Job.job_category, Job.job_creator, Job.executor, Job.status, Job.created, Job.updated))\
  270. .join(QueueJob, Job.id == QueueJob.job_id).\
  271. filter(QueueJob.queue_id == queue.id).\
  272. order_by(Job.id.asc()).\
  273. limit(limit).offset(offset).all()
  274. logger.info(f"get queue jobs: {len(jobs)}")
  275. return jobs