123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247 |
- import sys,os
- current_path = os.getcwd()
- sys.path.append(current_path)
- from config.site import SiteConfig
- from fastapi import APIRouter, Depends, Query
- from db.database import get_db
- from sqlalchemy.orm import Session
- from agent.models.web.response import StandardResponse,FAILED,SUCCESS
- from agent.models.web.request import BasicRequest
- from agent.libs.agent import AgentBusiness
- from agent.libs.auth import verify_session_id, SessionValues
- import logging
- import json
- router = APIRouter(prefix="/agent", tags=["agent job interface"])
- logger = logging.getLogger(__name__)
- config = SiteConfig()
- LOG_DIR = config.get_config("TASK_LOG_DIR", current_path)
- # job_category = Column(String(64), nullable=False)
- # job_name = Column(String(64))
- # job_details = Column(Text, nullable=False)
- # job_creator = Column(String(64), nullable=False)
- # job_logs = Column(Text, nullable=True)
- # job_files = Column(String(300), nullable=True)
- @router.post('/job', response_model=StandardResponse)
- def submit_job(request:BasicRequest, db: Session = Depends(get_db), sess:SessionValues = Depends(verify_session_id))->StandardResponse:
- logger.info("recieve request: " + request.action)
- if (request.action == "create_job"):
- job_name = request.get_param("job_name")
- job_category = request.get_param("job_category","")
- job_details = request.get_param("job_details","")
- job_creator = request.get_param("job_creator","")
- job_creator = f"{sess.full_name}/{sess.user_id}"
- biz = AgentBusiness(db)
- job = biz.create_job(job_category=job_category, job_name=job_name, job_details=job_details, job_creator=job_creator)
- if job:
- if request.get_param("queue_category", None) and request.get_param("queue_name", None):
- queue_category = request.get_param("queue_category")
- queue_name = request.get_param("queue_name")
- logger.info(f"put job to queue: {queue_category} {queue_name} ")
- queue = biz.get_queue(queue_category=queue_category, queue_name=queue_name, create_if_not_exist=True)
- if queue:
- qjob = biz.put_job(queue=queue, job=job)
- if qjob:
- logger.info(f"job created and put to queue: {job.id} {job.job_name} {job.job_category} {job.job_details} {job.job_creator}")
- return StandardResponse(code=SUCCESS, message="job created and put to queue", records=[job])
- logger.info(f"job created: {job.id} {job.job_name} {job.job_category} {job.job_details} {job.job_creator}")
- return StandardResponse(code=SUCCESS, message="job created", records=[job])
- else:
- return StandardResponse(code=FAILED, message="job creation failed")
- elif (request.action == "get_job"):
- job_id = request.get_param("job_id")
- biz = AgentBusiness(db)
- job = biz.get_job(job_id)
- if job:
- return StandardResponse(code=SUCCESS, message="job found", records=[job])
- else:
- return StandardResponse(code=FAILED, message="job not found")
- elif (request.action == "update_job"):
- job_id = request.get_param("job_id")
- job_name = request.get_param("job_name")
- job_category = request.get_param("job_category")
- job_details = request.get_param("job_details")
- #job_creator = request.get_param("job_creator")
- #job_logs = request.get_param("job_logs")
- job_files = request.get_param("job_files")
- status = request.get_param("status")
- biz = AgentBusiness(db)
- job = biz.update_job(job_id, job_name=job_name, job_category=job_category, job_details=job_details, job_creator=job_creator, job_files=job_files, status=status)
- if job:
- logger.info(f"job updated: {job.id} {job.job_name} {job.job_category} {job.job_details} {job.job_creator}")
- return StandardResponse(code=SUCCESS, message="job updated", records=[job])
- else:
- return StandardResponse(code=FAILED, message="job update failed")
- elif (request.action == "update_job_status"):
- job_id = request.get_param("job_id")
- status = request.get_param("status")
- biz = AgentBusiness(db)
- job = biz.get_job(job_id)
- if job:
- job = biz.append_job_logs(job, f"status updated from {job.status} to {status}")
- job = biz.update_job(job_id, status=status,job_logs = job.job_logs)
- if job:
- logger.info(f"job status updated: {job.id} {job.job_name} {job.job_category} {job.job_details} {job.job_creator}")
- return StandardResponse(code=SUCCESS, message="job status updated", records=[job])
- else:
- return StandardResponse(code=FAILED, message="job status update failed",records=[])
- else:
- return StandardResponse(code=FAILED, message="job not found",records=[])
- elif (request.action == "append_job_logs"):
- job_id = request.get_param("job_id")
- job_logs = request.get_param("job_logs")
- biz = AgentBusiness(db)
- job = biz.get_job(job_id)
- job = biz.append_job_logs(job, job_logs)
- job = biz.update_job(job_id, job_logs=job.job_logs)
- if job:
- logger.info(f"job logs appended: {job.id} {job.job_name} {job.job_category} {job.job_details} {job.job_creator}")
- return StandardResponse(code=SUCCESS, message="job logs appended", records=[job])
- else:
- return StandardResponse(code=FAILED, message="job logs append failed", records=[])
- elif (request.action == "delete_job"):
- job_id = request.get_param("job_id")
- biz = AgentBusiness(db)
- job = biz.get_job(job_id)
- if job:
- biz.delete_job_in_any_queue(job=job)
- job = biz.delete_job(job_id)
- if job:
- return StandardResponse(code=SUCCESS, message="job deleted", records=[])
- else:
- return StandardResponse(code=FAILED, message="job delete failed")
- elif (request.action == "get_jobs"):
- job_category = request.get_param("job_category")
- job_creator = request.get_param("job_creator")
- biz = AgentBusiness(db)
- jobs = biz.get_jobs(job_category=job_category, job_creator=job_creator)
- if jobs:
- return StandardResponse(code=SUCCESS, message="jobs found", records=jobs)
- else:
- return StandardResponse(code=FAILED, message="jobs not found")
- elif (request.action == "put_job"):
- job_id = request.get_param("job_id")
- queue_category = request.get_param("queue_category")
- queue_name = request.get_param("queue_name")
-
- biz = AgentBusiness(db)
- queue = biz.get_queue(queue_category=queue_category, queue_name=queue_name, create_if_not_exist=True)
- job = biz.get_job(job_id = job_id)
- if queue and job:
- job = biz.put_job(queue=queue, job=job)
- if job:
- return StandardResponse(code=SUCCESS, message="job put to queue", records=[job])
- else:
- return StandardResponse(code=FAILED, message="job put to queue failed")
- else:
- return StandardResponse(code=FAILED, message="queue or job not found")
- return StandardResponse(code=FAILED, message="invalid action")
-
- @router.post('/queue', response_model=StandardResponse)
- def submit_queue(request:BasicRequest, db: Session = Depends(get_db), sess:SessionValues = Depends(verify_session_id))->StandardResponse:
- if (request.action == "put_job"):
- job_id = request.get_param("job_id")
- queue_category = request.get_param("queue_category")
- queue_name = request.get_param("queue_name")
-
- biz = AgentBusiness(db)
- queue = biz.get_queue(queue_category=queue_category, queue_name=queue_name, create_if_not_exist=True)
- job = biz.get_job(job_id = job_id)
- if queue and job:
- job = biz.put_job(queue=queue, job=job)
- if job:
- return StandardResponse(code=SUCCESS, message="job put to queue", records=[job])
- else:
- return StandardResponse(code=FAILED, message="job put to queue failed")
- else:
- return StandardResponse(code=FAILED, message="queue or job not found")
- elif (request.action == "create_queue"):
- queue_category = request.get_param("queue_category")
- queue_name = request.get_param("queue_name")
- biz = AgentBusiness(db)
- queue = biz.get_queue(queue_category=queue_category, queue_name=queue_name)
- if queue:
- return StandardResponse(code=SUCCESS, message="queue found", records=[queue])
- queue = biz.create_queue(queue_category=queue_category, queue_name=queue_name)
- if queue:
- return StandardResponse(code=SUCCESS, message="queue created", records=[queue])
- else:
- return StandardResponse(code=FAILED, message="queue creation failed")
- elif (request.action == "get_queue"):
- queue_category = request.get_param("queue_category")
- queue_name = request.get_param("queue_name")
- biz = AgentBusiness(db)
- queue = biz.get_queue(queue_category=queue_category, queue_name=queue_name)
- if queue:
- return StandardResponse(code=SUCCESS, message="queue found", records=[queue])
- else:
- return StandardResponse(code=FAILED, message="queue not found")
- elif (request.action == "get_queue_summary"):
- biz = AgentBusiness(db)
- queues = biz.get_queues_summary()
- if queues:
- return StandardResponse(code=SUCCESS, message="queues summary", records=queues)
- elif (request.action == "update_queue"):
- queue_category = request.get_param("queue_category")
- queue_name = request.get_param("queue_name")
- biz = AgentBusiness(db)
- queue = biz.update_queue(queue_category=queue_category, queue_name=queue_name)
- if queue:
- return StandardResponse(code=SUCCESS, message="queue updated", records=[queue])
- else:
- return StandardResponse(code=FAILED, message="queue update failed")
- elif (request.action == "delete_queue"):
- queue_category = request.get_param("queue_category")
- queue_name = request.get_param("queue_name")
- biz = AgentBusiness(db)
- queue = biz.delete_queue(queue_category=queue_category, queue_name=queue_name)
- if queue:
- return StandardResponse(code=SUCCESS, message="queue deleted", records=[])
- else:
- return StandardResponse(code=FAILED, message="queue delete failed")
- elif (request.action == "get_queues"):
- queue_category = request.get_param("queue_category")
- biz = AgentBusiness(db)
- queues = biz.get_queues(queue_category=queue_category)
- if queues:
- return StandardResponse(code=SUCCESS, message="queues found", records=queues)
- else:
- return StandardResponse(code=FAILED, message="queues not found")
- elif (request.action == "get_jobs"):
- queue_category = request.get_param("queue_category")
- queue_name = request.get_param("queue_name")
- page_size = request.get_param("page_size",10)
- page = request.get_param("page",1)
-
- biz = AgentBusiness(db)
- queue = biz.get_queue(queue_category=queue_category, queue_name=queue_name)
- total = biz.get_queue_jobs_count(queue=queue)
- limit = page_size
- offset = (page - 1) * page_size
- jobs = biz.get_queue_jobs(queue=queue, limit=limit, offset=offset)
- if len(jobs)>=0:
- pages = total // limit + 1 if total % limit > 0 else total // limit
- return StandardResponse(code=SUCCESS, message="jobs found", meta={"page":page,"pages":pages,"total":total}, records=jobs)
- else:
- return StandardResponse(code=SUCCESS, message="jobs not found", records=[])
- elif (request.action == "delete_job"):
- queue_category = request.get_param("queue_category")
- queue_name = request.get_param("queue_name")
- job_id = request.get_param("job_id")
- biz = AgentBusiness(db)
- queue = biz.get_queue(queue_category=queue_category, queue_name=queue_name)
- job = biz.get_job(job_id = job_id)
- result = biz.delete_queue_job(queue, job)
- if result:
- return StandardResponse(code=SUCCESS, message="job deleted", records=[])
- else:
- return StandardResponse(code=FAILED, message="job delete failed")
- return StandardResponse(code=FAILED, message="invalid action")
-
-
-
- task_router = router
|