import sys,os current_path = os.getcwd() sys.path.append(current_path) from config.site import SiteConfig from fastapi import APIRouter, Depends, Query from db.database import get_db from sqlalchemy.orm import Session from agent.models.web.response import StandardResponse,FAILED,SUCCESS from agent.models.web.request import BasicRequest from agent.libs.agent import AgentBusiness from agent.libs.auth import verify_session_id, SessionValues import logging import json router = APIRouter(prefix="/agent", tags=["agent job interface"]) logger = logging.getLogger(__name__) config = SiteConfig() LOG_DIR = config.get_config("TASK_LOG_DIR", current_path) # job_category = Column(String(64), nullable=False) # job_name = Column(String(64)) # job_details = Column(Text, nullable=False) # job_creator = Column(String(64), nullable=False) # job_logs = Column(Text, nullable=True) # job_files = Column(String(300), nullable=True) @router.post('/job', response_model=StandardResponse) def submit_job(request:BasicRequest, db: Session = Depends(get_db), sess:SessionValues = Depends(verify_session_id))->StandardResponse: logger.info("recieve request: " + request.action) if (request.action == "create_job"): job_name = request.get_param("job_name") job_category = request.get_param("job_category","") job_details = request.get_param("job_details","") job_creator = request.get_param("job_creator","") job_creator = f"{sess.full_name}/{sess.user_id}" biz = AgentBusiness(db) job = biz.create_job(job_category=job_category, job_name=job_name, job_details=job_details, job_creator=job_creator) if job: if request.get_param("queue_category", None) and request.get_param("queue_name", None): queue_category = request.get_param("queue_category") queue_name = request.get_param("queue_name") logger.info(f"put job to queue: {queue_category} {queue_name} ") queue = biz.get_queue(queue_category=queue_category, queue_name=queue_name, create_if_not_exist=True) if queue: qjob = biz.put_job(queue=queue, job=job) if qjob: logger.info(f"job created and put to queue: {job.id} {job.job_name} {job.job_category} {job.job_details} {job.job_creator}") return StandardResponse(code=SUCCESS, message="job created and put to queue", records=[job]) logger.info(f"job created: {job.id} {job.job_name} {job.job_category} {job.job_details} {job.job_creator}") return StandardResponse(code=SUCCESS, message="job created", records=[job]) else: return StandardResponse(code=FAILED, message="job creation failed") elif (request.action == "get_job"): job_id = request.get_param("job_id") biz = AgentBusiness(db) job = biz.get_job(job_id) if job: return StandardResponse(code=SUCCESS, message="job found", records=[job]) else: return StandardResponse(code=FAILED, message="job not found") elif (request.action == "update_job"): job_id = request.get_param("job_id") job_name = request.get_param("job_name") job_category = request.get_param("job_category") job_details = request.get_param("job_details") #job_creator = request.get_param("job_creator") #job_logs = request.get_param("job_logs") job_files = request.get_param("job_files") status = request.get_param("status") biz = AgentBusiness(db) job = biz.update_job(job_id, job_name=job_name, job_category=job_category, job_details=job_details, job_creator=job_creator, job_files=job_files, status=status) if job: logger.info(f"job updated: {job.id} {job.job_name} {job.job_category} {job.job_details} {job.job_creator}") return StandardResponse(code=SUCCESS, message="job updated", records=[job]) else: return StandardResponse(code=FAILED, message="job update failed") elif (request.action == "update_job_status"): job_id = request.get_param("job_id") status = request.get_param("status") biz = AgentBusiness(db) job = biz.get_job(job_id) if job: job = biz.append_job_logs(job, f"status updated from {job.status} to {status}") job = biz.update_job(job_id, status=status,job_logs = job.job_logs) if job: logger.info(f"job status updated: {job.id} {job.job_name} {job.job_category} {job.job_details} {job.job_creator}") return StandardResponse(code=SUCCESS, message="job status updated", records=[job]) else: return StandardResponse(code=FAILED, message="job status update failed",records=[]) else: return StandardResponse(code=FAILED, message="job not found",records=[]) elif (request.action == "append_job_logs"): job_id = request.get_param("job_id") job_logs = request.get_param("job_logs") biz = AgentBusiness(db) job = biz.get_job(job_id) job = biz.append_job_logs(job, job_logs) job = biz.update_job(job_id, job_logs=job.job_logs) if job: logger.info(f"job logs appended: {job.id} {job.job_name} {job.job_category} {job.job_details} {job.job_creator}") return StandardResponse(code=SUCCESS, message="job logs appended", records=[job]) else: return StandardResponse(code=FAILED, message="job logs append failed", records=[]) elif (request.action == "delete_job"): job_id = request.get_param("job_id") biz = AgentBusiness(db) job = biz.get_job(job_id) if job: biz.delete_job_in_any_queue(job=job) job = biz.delete_job(job_id) if job: return StandardResponse(code=SUCCESS, message="job deleted", records=[]) else: return StandardResponse(code=FAILED, message="job delete failed") elif (request.action == "get_jobs"): job_category = request.get_param("job_category") job_creator = request.get_param("job_creator") biz = AgentBusiness(db) jobs = biz.get_jobs(job_category=job_category, job_creator=job_creator) if jobs: return StandardResponse(code=SUCCESS, message="jobs found", records=jobs) else: return StandardResponse(code=FAILED, message="jobs not found") elif (request.action == "put_job"): job_id = request.get_param("job_id") queue_category = request.get_param("queue_category") queue_name = request.get_param("queue_name") biz = AgentBusiness(db) queue = biz.get_queue(queue_category=queue_category, queue_name=queue_name, create_if_not_exist=True) job = biz.get_job(job_id = job_id) if queue and job: job = biz.put_job(queue=queue, job=job) if job: return StandardResponse(code=SUCCESS, message="job put to queue", records=[job]) else: return StandardResponse(code=FAILED, message="job put to queue failed") else: return StandardResponse(code=FAILED, message="queue or job not found") return StandardResponse(code=FAILED, message="invalid action") @router.post('/queue', response_model=StandardResponse) def submit_queue(request:BasicRequest, db: Session = Depends(get_db), sess:SessionValues = Depends(verify_session_id))->StandardResponse: if (request.action == "put_job"): job_id = request.get_param("job_id") queue_category = request.get_param("queue_category") queue_name = request.get_param("queue_name") biz = AgentBusiness(db) queue = biz.get_queue(queue_category=queue_category, queue_name=queue_name, create_if_not_exist=True) job = biz.get_job(job_id = job_id) if queue and job: job = biz.put_job(queue=queue, job=job) if job: return StandardResponse(code=SUCCESS, message="job put to queue", records=[job]) else: return StandardResponse(code=FAILED, message="job put to queue failed") else: return StandardResponse(code=FAILED, message="queue or job not found") elif (request.action == "create_queue"): queue_category = request.get_param("queue_category") queue_name = request.get_param("queue_name") biz = AgentBusiness(db) queue = biz.get_queue(queue_category=queue_category, queue_name=queue_name) if queue: return StandardResponse(code=SUCCESS, message="queue found", records=[queue]) queue = biz.create_queue(queue_category=queue_category, queue_name=queue_name) if queue: return StandardResponse(code=SUCCESS, message="queue created", records=[queue]) else: return StandardResponse(code=FAILED, message="queue creation failed") elif (request.action == "get_queue"): queue_category = request.get_param("queue_category") queue_name = request.get_param("queue_name") biz = AgentBusiness(db) queue = biz.get_queue(queue_category=queue_category, queue_name=queue_name) if queue: return StandardResponse(code=SUCCESS, message="queue found", records=[queue]) else: return StandardResponse(code=FAILED, message="queue not found") elif (request.action == "get_queue_summary"): biz = AgentBusiness(db) queues = biz.get_queues_summary() if queues: return StandardResponse(code=SUCCESS, message="queues summary", records=queues) elif (request.action == "update_queue"): queue_category = request.get_param("queue_category") queue_name = request.get_param("queue_name") biz = AgentBusiness(db) queue = biz.update_queue(queue_category=queue_category, queue_name=queue_name) if queue: return StandardResponse(code=SUCCESS, message="queue updated", records=[queue]) else: return StandardResponse(code=FAILED, message="queue update failed") elif (request.action == "delete_queue"): queue_category = request.get_param("queue_category") queue_name = request.get_param("queue_name") biz = AgentBusiness(db) queue = biz.delete_queue(queue_category=queue_category, queue_name=queue_name) if queue: return StandardResponse(code=SUCCESS, message="queue deleted", records=[]) else: return StandardResponse(code=FAILED, message="queue delete failed") elif (request.action == "get_queues"): queue_category = request.get_param("queue_category") biz = AgentBusiness(db) queues = biz.get_queues(queue_category=queue_category) if queues: return StandardResponse(code=SUCCESS, message="queues found", records=queues) else: return StandardResponse(code=FAILED, message="queues not found") elif (request.action == "get_jobs"): queue_category = request.get_param("queue_category") queue_name = request.get_param("queue_name") page_size = request.get_param("page_size",10) page = request.get_param("page",1) biz = AgentBusiness(db) queue = biz.get_queue(queue_category=queue_category, queue_name=queue_name) total = biz.get_queue_jobs_count(queue=queue) limit = page_size offset = (page - 1) * page_size jobs = biz.get_queue_jobs(queue=queue, limit=limit, offset=offset) if len(jobs)>=0: pages = total // limit + 1 if total % limit > 0 else total // limit return StandardResponse(code=SUCCESS, message="jobs found", meta={"page":page,"pages":pages,"total":total}, records=jobs) else: return StandardResponse(code=SUCCESS, message="jobs not found", records=[]) elif (request.action == "delete_job"): queue_category = request.get_param("queue_category") queue_name = request.get_param("queue_name") job_id = request.get_param("job_id") biz = AgentBusiness(db) queue = biz.get_queue(queue_category=queue_category, queue_name=queue_name) job = biz.get_job(job_id = job_id) result = biz.delete_queue_job(queue, job) if result: return StandardResponse(code=SUCCESS, message="job deleted", records=[]) else: return StandardResponse(code=FAILED, message="job delete failed") return StandardResponse(code=FAILED, message="invalid action") task_router = router