123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141 |
- import time
- import json
- import csv
- import os
- from datetime import datetime
- from sqlalchemy.orm import Session
- from db.database import SessionLocal
- from db.models import DbKgNode, DbKgEdge, DbKgTask
- from utils.files import zip_files
- def process_export_task(db: Session, task: DbKgTask):
- try:
- # 解析任务参数
- params = json.loads(task.task_content)
-
- graph_id = params["graph_id"]
-
- # 更新任务状态为执行中
- task.status = 1
- task.updated = datetime.now()
- db.commit()
-
- # 根据任务类型执行导出
- if task.task_category == "data_export":
- # 确保导出目录存在
- export_dir = "/home/tmp"
- os.makedirs(export_dir, exist_ok=True)
- # export nodes data
- filename = f"nodes_{task.id}.csv"
- fieldnames = ["category", "name"]
-
- filepath1 = os.path.join(export_dir, filename)
- with open(filepath1, "w", newline="") as f:
- writer = csv.DictWriter(f, fieldnames=fieldnames)
- writer.writeheader()
-
- start = 1
- page_size = 100
- count = db.query(DbKgNode).filter(DbKgNode.graph_id == graph_id, DbKgNode.status ==0).count()
- results = db.query(DbKgNode).filter(DbKgNode.graph_id == graph_id, DbKgNode.status ==0).limit(page_size).offset(start).all()
- rows = []
- while (len(results) > 0):
- print (f"process {start}/{count}")
- for node in results:
- row = {
- "category": node.category,
- "name": node.name
- }
- rows.append(row)
- writer.writerows(rows)
- start = start + len(results)
- results = db.query(DbKgNode).filter(DbKgNode.graph_id == graph_id, DbKgNode.status ==0).limit(page_size).offset(start).all()
- rows = []
-
- # export edges data
- filename = f"edges_{task.id}.csv"
- fieldnames = [
- "src_category", "src_name",
- "dest_category", "dest_name",
- "category", "name", "graph_id"
- ]
-
- filepath2 = os.path.join(export_dir, filename)
- with open(filepath2, "w", newline="") as f:
- writer = csv.DictWriter(f, fieldnames=fieldnames)
- writer.writeheader()
-
- start = 1
- page_size = 100
- count = db.query(DbKgEdge).filter(DbKgEdge.graph_id == graph_id, DbKgEdge.status ==0).count()
- results = db.query(DbKgEdge).filter(DbKgEdge.graph_id == graph_id, DbKgEdge.status ==0).limit(page_size).offset(start).all()
- rows = []
- while (len(results) > 0):
- print (f"process {start}/{count}")
- for edge in results:
- src_node = edge.src_node
- dest_node = edge.dest_node
- rows.append({
- "src_category": src_node.category,
- "src_name": src_node.name,
- "dest_category": dest_node.category,
- "dest_name": dest_node.name,
- "category": edge.category,
- "name": edge.name,
- })
- writer.writerows(rows)
- start = start + len(results)
- results = db.query(DbKgEdge).filter(DbKgEdge.graph_id == graph_id, DbKgEdge.status ==0).limit(page_size).offset(start).all()
- rows = []
-
- results = db.query(DbKgNode).limit(page_size).offset(start).all()
-
- # 更新任务状态为完成
- task.status = 2
- task.updated = datetime.now()
- db.commit()
- filename = f"nodes_{task.id}.zip"
- filepath3 = os.path.join(export_dir, filename)
- if (zip_files(file_paths=[filepath1, filepath2], output_zip_path=filepath3)):
- task.status = 999
- params['output_file'] = filename
- task.task_content = json.dumps(params)
- task.updated = datetime.now()
- db.commit()
-
- except Exception as e:
- # 任务失败处理
- task.status = -1
- task.task_content = json.dumps({
- "error": str(e),
- **json.loads(task.task_content)
- })
- task.update_time = datetime.now()
- db.commit()
- raise
- def task_worker():
- print("connect to database")
- db = SessionLocal()
- try:
- while True:
- # 查询待处理任务
- tasks = db.query(DbKgTask).filter(
- DbKgTask.proj_id == 1,
- DbKgTask.status == 0
- ).all()
-
- for task in tasks:
- print(f"process task {task.id}:{task.task_category}")
- try:
- process_export_task(db, task)
- except Exception as e:
- print(f"任务处理失败: {e}")
- continue
- print("sleep")
- time.sleep(10)
-
- finally:
- db.close()
- if __name__ == "__main__":
- task_worker()
|