import time import json import csv import os from datetime import datetime from sqlalchemy.orm import Session from db.database import SessionLocal from db.models import DbKgNode, DbKgEdge, DbKgTask from utils.files import zip_files def process_export_task(db: Session, task: DbKgTask): try: # 解析任务参数 params = json.loads(task.task_content) graph_id = params["graph_id"] # 更新任务状态为执行中 task.status = 1 task.updated = datetime.now() db.commit() # 根据任务类型执行导出 if task.task_category == "data_export": # 确保导出目录存在 export_dir = "/home/tmp" os.makedirs(export_dir, exist_ok=True) # export nodes data filename = f"nodes_{task.id}.csv" fieldnames = ["category", "name"] filepath1 = os.path.join(export_dir, filename) with open(filepath1, "w", newline="") as f: writer = csv.DictWriter(f, fieldnames=fieldnames) writer.writeheader() start = 1 page_size = 100 count = db.query(DbKgNode).filter(DbKgNode.graph_id == graph_id, DbKgNode.status ==0).count() results = db.query(DbKgNode).filter(DbKgNode.graph_id == graph_id, DbKgNode.status ==0).limit(page_size).offset(start).all() rows = [] while (len(results) > 0): print (f"process {start}/{count}") for node in results: row = { "category": node.category, "name": node.name } rows.append(row) writer.writerows(rows) start = start + len(results) results = db.query(DbKgNode).filter(DbKgNode.graph_id == graph_id, DbKgNode.status ==0).limit(page_size).offset(start).all() rows = [] # export edges data filename = f"edges_{task.id}.csv" fieldnames = [ "src_category", "src_name", "dest_category", "dest_name", "category", "name", "graph_id" ] filepath2 = os.path.join(export_dir, filename) with open(filepath2, "w", newline="") as f: writer = csv.DictWriter(f, fieldnames=fieldnames) writer.writeheader() start = 1 page_size = 100 count = db.query(DbKgEdge).filter(DbKgEdge.graph_id == graph_id, DbKgEdge.status ==0).count() results = db.query(DbKgEdge).filter(DbKgEdge.graph_id == graph_id, DbKgEdge.status ==0).limit(page_size).offset(start).all() rows = [] while (len(results) > 0): print (f"process {start}/{count}") for edge in results: src_node = edge.src_node dest_node = edge.dest_node rows.append({ "src_category": src_node.category, "src_name": src_node.name, "dest_category": dest_node.category, "dest_name": dest_node.name, "category": edge.category, "name": edge.name, }) writer.writerows(rows) start = start + len(results) results = db.query(DbKgEdge).filter(DbKgEdge.graph_id == graph_id, DbKgEdge.status ==0).limit(page_size).offset(start).all() rows = [] results = db.query(DbKgNode).limit(page_size).offset(start).all() # 更新任务状态为完成 task.status = 2 task.updated = datetime.now() db.commit() filename = f"nodes_{task.id}.zip" filepath3 = os.path.join(export_dir, filename) if (zip_files(file_paths=[filepath1, filepath2], output_zip_path=filepath3)): task.status = 999 params['output_file'] = filename task.task_content = json.dumps(params) task.updated = datetime.now() db.commit() except Exception as e: # 任务失败处理 task.status = -1 task.task_content = json.dumps({ "error": str(e), **json.loads(task.task_content) }) task.update_time = datetime.now() db.commit() raise def task_worker(): print("connect to database") db = SessionLocal() try: while True: # 查询待处理任务 tasks = db.query(DbKgTask).filter( DbKgTask.proj_id == 1, DbKgTask.status == 0 ).all() for task in tasks: print(f"process task {task.id}:{task.task_category}") try: process_export_task(db, task) except Exception as e: print(f"任务处理失败: {e}") continue print("sleep") time.sleep(10) finally: db.close() if __name__ == "__main__": task_worker()