task_router.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247
  1. import sys,os
  2. current_path = os.getcwd()
  3. sys.path.append(current_path)
  4. from config.site import SiteConfig
  5. from fastapi import APIRouter, Depends, Query
  6. from db.database import get_db
  7. from sqlalchemy.orm import Session
  8. from agent.models.web.response import StandardResponse,FAILED,SUCCESS
  9. from agent.models.web.request import BasicRequest
  10. from agent.libs.agent import AgentBusiness
  11. from agent.libs.auth import verify_session_id, SessionValues
  12. import logging
  13. import json
  14. router = APIRouter(prefix="/agent", tags=["agent job interface"])
  15. logger = logging.getLogger(__name__)
  16. config = SiteConfig()
  17. LOG_DIR = config.get_config("TASK_LOG_DIR", current_path)
  18. # job_category = Column(String(64), nullable=False)
  19. # job_name = Column(String(64))
  20. # job_details = Column(Text, nullable=False)
  21. # job_creator = Column(String(64), nullable=False)
  22. # job_logs = Column(Text, nullable=True)
  23. # job_files = Column(String(300), nullable=True)
  24. @router.post('/job', response_model=StandardResponse)
  25. def submit_job(request:BasicRequest, db: Session = Depends(get_db), sess:SessionValues = Depends(verify_session_id))->StandardResponse:
  26. logger.info("recieve request: " + request.action)
  27. if (request.action == "create_job"):
  28. job_name = request.get_param("job_name")
  29. job_category = request.get_param("job_category","")
  30. job_details = request.get_param("job_details","")
  31. job_creator = request.get_param("job_creator","")
  32. job_creator = f"{sess.full_name}/{sess.user_id}"
  33. biz = AgentBusiness(db)
  34. job = biz.create_job(job_category=job_category, job_name=job_name, job_details=job_details, job_creator=job_creator)
  35. if job:
  36. if request.get_param("queue_category", None) and request.get_param("queue_name", None):
  37. queue_category = request.get_param("queue_category")
  38. queue_name = request.get_param("queue_name")
  39. logger.info(f"put job to queue: {queue_category} {queue_name} ")
  40. queue = biz.get_queue(queue_category=queue_category, queue_name=queue_name, create_if_not_exist=True)
  41. if queue:
  42. qjob = biz.put_job(queue=queue, job=job)
  43. if qjob:
  44. logger.info(f"job created and put to queue: {job.id} {job.job_name} {job.job_category} {job.job_details} {job.job_creator}")
  45. return StandardResponse(code=SUCCESS, message="job created and put to queue", records=[job])
  46. logger.info(f"job created: {job.id} {job.job_name} {job.job_category} {job.job_details} {job.job_creator}")
  47. return StandardResponse(code=SUCCESS, message="job created", records=[job])
  48. else:
  49. return StandardResponse(code=FAILED, message="job creation failed")
  50. elif (request.action == "get_job"):
  51. job_id = request.get_param("job_id")
  52. biz = AgentBusiness(db)
  53. job = biz.get_job(job_id)
  54. if job:
  55. return StandardResponse(code=SUCCESS, message="job found", records=[job])
  56. else:
  57. return StandardResponse(code=FAILED, message="job not found")
  58. elif (request.action == "update_job"):
  59. job_id = request.get_param("job_id")
  60. job_name = request.get_param("job_name")
  61. job_category = request.get_param("job_category")
  62. job_details = request.get_param("job_details")
  63. #job_creator = request.get_param("job_creator")
  64. #job_logs = request.get_param("job_logs")
  65. job_files = request.get_param("job_files")
  66. status = request.get_param("status")
  67. biz = AgentBusiness(db)
  68. job = biz.update_job(job_id, job_name=job_name, job_category=job_category, job_details=job_details, job_creator=job_creator, job_files=job_files, status=status)
  69. if job:
  70. logger.info(f"job updated: {job.id} {job.job_name} {job.job_category} {job.job_details} {job.job_creator}")
  71. return StandardResponse(code=SUCCESS, message="job updated", records=[job])
  72. else:
  73. return StandardResponse(code=FAILED, message="job update failed")
  74. elif (request.action == "update_job_status"):
  75. job_id = request.get_param("job_id")
  76. status = request.get_param("status")
  77. biz = AgentBusiness(db)
  78. job = biz.get_job(job_id)
  79. if job:
  80. job = biz.append_job_logs(job, f"status updated from {job.status} to {status}")
  81. job = biz.update_job(job_id, status=status,job_logs = job.job_logs)
  82. if job:
  83. logger.info(f"job status updated: {job.id} {job.job_name} {job.job_category} {job.job_details} {job.job_creator}")
  84. return StandardResponse(code=SUCCESS, message="job status updated", records=[job])
  85. else:
  86. return StandardResponse(code=FAILED, message="job status update failed",records=[])
  87. else:
  88. return StandardResponse(code=FAILED, message="job not found",records=[])
  89. elif (request.action == "append_job_logs"):
  90. job_id = request.get_param("job_id")
  91. job_logs = request.get_param("job_logs")
  92. biz = AgentBusiness(db)
  93. job = biz.get_job(job_id)
  94. job = biz.append_job_logs(job, job_logs)
  95. job = biz.update_job(job_id, job_logs=job.job_logs)
  96. if job:
  97. logger.info(f"job logs appended: {job.id} {job.job_name} {job.job_category} {job.job_details} {job.job_creator}")
  98. return StandardResponse(code=SUCCESS, message="job logs appended", records=[job])
  99. else:
  100. return StandardResponse(code=FAILED, message="job logs append failed", records=[])
  101. elif (request.action == "delete_job"):
  102. job_id = request.get_param("job_id")
  103. biz = AgentBusiness(db)
  104. job = biz.get_job(job_id)
  105. if job:
  106. biz.delete_job_in_any_queue(job=job)
  107. job = biz.delete_job(job_id)
  108. if job:
  109. return StandardResponse(code=SUCCESS, message="job deleted", records=[])
  110. else:
  111. return StandardResponse(code=FAILED, message="job delete failed")
  112. elif (request.action == "get_jobs"):
  113. job_category = request.get_param("job_category")
  114. job_creator = request.get_param("job_creator")
  115. biz = AgentBusiness(db)
  116. jobs = biz.get_jobs(job_category=job_category, job_creator=job_creator)
  117. if jobs:
  118. return StandardResponse(code=SUCCESS, message="jobs found", records=jobs)
  119. else:
  120. return StandardResponse(code=FAILED, message="jobs not found")
  121. elif (request.action == "put_job"):
  122. job_id = request.get_param("job_id")
  123. queue_category = request.get_param("queue_category")
  124. queue_name = request.get_param("queue_name")
  125. biz = AgentBusiness(db)
  126. queue = biz.get_queue(queue_category=queue_category, queue_name=queue_name, create_if_not_exist=True)
  127. job = biz.get_job(job_id = job_id)
  128. if queue and job:
  129. job = biz.put_job(queue=queue, job=job)
  130. if job:
  131. return StandardResponse(code=SUCCESS, message="job put to queue", records=[job])
  132. else:
  133. return StandardResponse(code=FAILED, message="job put to queue failed")
  134. else:
  135. return StandardResponse(code=FAILED, message="queue or job not found")
  136. return StandardResponse(code=FAILED, message="invalid action")
  137. @router.post('/queue', response_model=StandardResponse)
  138. def submit_queue(request:BasicRequest, db: Session = Depends(get_db), sess:SessionValues = Depends(verify_session_id))->StandardResponse:
  139. if (request.action == "put_job"):
  140. job_id = request.get_param("job_id")
  141. queue_category = request.get_param("queue_category")
  142. queue_name = request.get_param("queue_name")
  143. biz = AgentBusiness(db)
  144. queue = biz.get_queue(queue_category=queue_category, queue_name=queue_name, create_if_not_exist=True)
  145. job = biz.get_job(job_id = job_id)
  146. if queue and job:
  147. job = biz.put_job(queue=queue, job=job)
  148. if job:
  149. return StandardResponse(code=SUCCESS, message="job put to queue", records=[job])
  150. else:
  151. return StandardResponse(code=FAILED, message="job put to queue failed")
  152. else:
  153. return StandardResponse(code=FAILED, message="queue or job not found")
  154. elif (request.action == "create_queue"):
  155. queue_category = request.get_param("queue_category")
  156. queue_name = request.get_param("queue_name")
  157. biz = AgentBusiness(db)
  158. queue = biz.get_queue(queue_category=queue_category, queue_name=queue_name)
  159. if queue:
  160. return StandardResponse(code=SUCCESS, message="queue found", records=[queue])
  161. queue = biz.create_queue(queue_category=queue_category, queue_name=queue_name)
  162. if queue:
  163. return StandardResponse(code=SUCCESS, message="queue created", records=[queue])
  164. else:
  165. return StandardResponse(code=FAILED, message="queue creation failed")
  166. elif (request.action == "get_queue"):
  167. queue_category = request.get_param("queue_category")
  168. queue_name = request.get_param("queue_name")
  169. biz = AgentBusiness(db)
  170. queue = biz.get_queue(queue_category=queue_category, queue_name=queue_name)
  171. if queue:
  172. return StandardResponse(code=SUCCESS, message="queue found", records=[queue])
  173. else:
  174. return StandardResponse(code=FAILED, message="queue not found")
  175. elif (request.action == "get_queue_summary"):
  176. biz = AgentBusiness(db)
  177. queues = biz.get_queues_summary()
  178. if queues:
  179. return StandardResponse(code=SUCCESS, message="queues summary", records=queues)
  180. elif (request.action == "update_queue"):
  181. queue_category = request.get_param("queue_category")
  182. queue_name = request.get_param("queue_name")
  183. biz = AgentBusiness(db)
  184. queue = biz.update_queue(queue_category=queue_category, queue_name=queue_name)
  185. if queue:
  186. return StandardResponse(code=SUCCESS, message="queue updated", records=[queue])
  187. else:
  188. return StandardResponse(code=FAILED, message="queue update failed")
  189. elif (request.action == "delete_queue"):
  190. queue_category = request.get_param("queue_category")
  191. queue_name = request.get_param("queue_name")
  192. biz = AgentBusiness(db)
  193. queue = biz.delete_queue(queue_category=queue_category, queue_name=queue_name)
  194. if queue:
  195. return StandardResponse(code=SUCCESS, message="queue deleted", records=[])
  196. else:
  197. return StandardResponse(code=FAILED, message="queue delete failed")
  198. elif (request.action == "get_queues"):
  199. queue_category = request.get_param("queue_category")
  200. biz = AgentBusiness(db)
  201. queues = biz.get_queues(queue_category=queue_category)
  202. if queues:
  203. return StandardResponse(code=SUCCESS, message="queues found", records=queues)
  204. else:
  205. return StandardResponse(code=FAILED, message="queues not found")
  206. elif (request.action == "get_jobs"):
  207. queue_category = request.get_param("queue_category")
  208. queue_name = request.get_param("queue_name")
  209. page_size = request.get_param("page_size",10)
  210. page = request.get_param("page",1)
  211. biz = AgentBusiness(db)
  212. queue = biz.get_queue(queue_category=queue_category, queue_name=queue_name)
  213. total = biz.get_queue_jobs_count(queue=queue)
  214. limit = page_size
  215. offset = (page - 1) * page_size
  216. jobs = biz.get_queue_jobs(queue=queue, limit=limit, offset=offset)
  217. if len(jobs)>=0:
  218. pages = total // limit + 1 if total % limit > 0 else total // limit
  219. return StandardResponse(code=SUCCESS, message="jobs found", meta={"page":page,"pages":pages,"total":total}, records=jobs)
  220. else:
  221. return StandardResponse(code=SUCCESS, message="jobs not found", records=[])
  222. elif (request.action == "delete_job"):
  223. queue_category = request.get_param("queue_category")
  224. queue_name = request.get_param("queue_name")
  225. job_id = request.get_param("job_id")
  226. biz = AgentBusiness(db)
  227. queue = biz.get_queue(queue_category=queue_category, queue_name=queue_name)
  228. job = biz.get_job(job_id = job_id)
  229. result = biz.delete_queue_job(queue, job)
  230. if result:
  231. return StandardResponse(code=SUCCESS, message="job deleted", records=[])
  232. else:
  233. return StandardResponse(code=FAILED, message="job delete failed")
  234. return StandardResponse(code=FAILED, message="invalid action")
  235. task_router = router