123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145 |
- import pytest
- import requests
- BASE_URL = "http://localhost:8000"
- _job_id = 0
- def create_job():
- action = {
- "id": "1",
- "action": "create_job",
- "params": [
- {"name": "job_name", "value": "Test Job"},
- {"name": "job_category", "value": "Test Category"},
- {"name": "job_details", "value": "test details"},
- {"name": "job_creator", "value": "tester"},
- ]
- }
- response = requests.post(BASE_URL+"/agent/job", json=action)
- assert response.status_code == 200
- data = response.json()
- assert "code" in data
- assert data["code"] == 200
- return data['records'][0]['id']
- def delete_job(job_id):
- action = {
- "id": "1",
- "action": "delete_job",
- "params": [
- {"name": "job_id", "value": job_id},
- ]
- }
- response = requests.post(BASE_URL+"/agent/job", json=action)
- assert response.status_code == 200
- def create_queue(queue_category, queue_name):
- action = {
- "id": "1",
- "action": "create_queue",
- "params": [
- {"name": "queue_category", "value": queue_category},
- {"name": "queue_name", "value": queue_name},
- ]
- }
- response = requests.post(BASE_URL+"/agent/queue", json=action)
- assert response.status_code == 200
- def test_create_job():
- global _job_id
- job_id = create_job()
- _job_id = job_id
- assert job_id > 0
-
- def test_create_queue():
- create_queue("SYSTEM", "OCR")
- create_queue("SYSTEM", "OCR_RESULTS")
- def test_get_job():
- global _job_id
- job_id = _job_id
- action = {
- "id": "1",
- "action": "get_job",
- "params": [
- {"name": "job_id", "value": f"{job_id}"},
- {"name": "job_category", "value": "Test Category"},
- {"name": "job_details", "value": "test details"},
- {"name": "job_creator", "value": "tester"},
- ]
- }
- response = requests.post(BASE_URL+"/agent/job", json=action)
- assert response.status_code == 200
- data = response.json()
- assert "code" in data
- assert data["code"] == 200
- assert data['records'][0]['job_name'] == "Test Job"
- assert data['records'][0]['job_category'] == "Test Category"
- def test_queue_put_job():
- global _job_id
- job_id = _job_id
- action = {
- "id": "1",
- "action": "put_job",
- "params": [
- {"name": "job_id", "value": f"{job_id}"},
- {"name": "queue_category", "value": "Test Queue Category"},
- {"name": "queue_name", "value": "Test Queue Name"},
- ]
- }
- response = requests.post(BASE_URL+"/agent/queue", json=action)
- assert response.status_code == 200
- data = response.json()
- assert "code" in data
- assert data["code"] == 200
-
- def test_queue_get_jobs():
- global _job_id
- action = {
- "id": "1",
- "action": "get_jobs",
- "params": [
- {"name": "queue_category", "value": "Test Queue Category"},
- {"name": "queue_name", "value": "Test Queue Name"},
- ]
- }
- response = requests.post(BASE_URL+"/agent/queue", json=action)
- assert response.status_code == 200
- data = response.json()
- assert "code" in data
- assert data["code"] == 200
- assert data['records'][0]['id'] == _job_id
-
- def test_queue_delete_job():
- global _job_id
- job_id = _job_id
- action = {
- "id": "1",
- "action": "delete_job",
- "params": [
- {"name": "job_id", "value": job_id},
- {"name": "queue_category", "value": "Test Queue Category"},
- {"name": "queue_name", "value": "Test Queue Name"},
- ]
- }
- response = requests.post(BASE_URL+"/agent/queue", json=action)
- assert response.status_code == 200
- data = response.json()
- assert "code" in data
- assert data["code"] == 200
-
- def delete_job():
- global _job_id
- job_id = _job_id
- action = {
- "id": "1",
- "action": "delete_job",
- "params": [
- {"name": "job_id", "value": f"{job_id}"}
- ]
- }
- response = requests.post(BASE_URL+"/agent/job", json=action)
- assert response.status_code == 200
- data = response.json()
- assert "code" in data
- assert data["code"] == 200
-
|