utils.py 16 KB


  1. import re
  2. import io
  3. import os
  4. import time
  5. import glob
  6. import shutil
  7. import logging
  8. import subprocess
  9. from datetime import datetime
  10. from typing import List, Optional
  11. from minio import Minio
  12. import urllib3
  13. from sqlalchemy.orm import Session
  14. from sqlalchemy import and_
  15. from fastapi import HTTPException
  16. from agent.models.web.knowledge_base import KnowledgeBase, KnowledgeFile
  17. from agent.models.db.graph import DbUserDataRelation as UserDataRelation
  18. from config.site import settings
  19. # 配置Office文件转换日志
  20. office_logger = logging.getLogger('office_conversion')
  21. office_logger.setLevel(logging.INFO)
  22. if not office_logger.handlers:
  23. handler = logging.StreamHandler()
  24. formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  25. handler.setFormatter(formatter)
  26. office_logger.addHandler(handler)
  27. class DatabaseUtils:
  28. @staticmethod
  29. def validate_knowledge_base_name(name: str) -> bool:
  30. pattern = r'^[a-zA-Z0-9\u4e00-\u9fa5_\-\.]+$'
  31. return bool(re.match(pattern, name))
  32. @staticmethod
  33. def create_knowledge_base(db: Session, name: str, creator: Optional[str] = None, description: Optional[str] = None, tags: Optional[str] = None) -> KnowledgeBase:
  34. if not DatabaseUtils.validate_knowledge_base_name(name):
  35. raise HTTPException(status_code=400, detail="知识库名称格式不正确")
  36. if description and len(description) > 400:
  37. raise HTTPException(status_code=400, detail="知识库备注不能超过400字")
  38. db_kb = KnowledgeBase(name=name, description=description, creator = creator, tags=tags, file_count=0)
  39. db.add(db_kb)
  40. db.commit()
  41. db.refresh(db_kb)
  42. return db_kb
  43. @staticmethod
  44. def update_knowledge_base(db: Session, kb_id: int, name: str, description: Optional[str] = None, tags: Optional[str] = None) -> KnowledgeBase:
  45. db_kb = db.query(KnowledgeBase).filter(KnowledgeBase.id == kb_id, KnowledgeBase.is_deleted == 0).first()
  46. if not db_kb:
  47. raise HTTPException(status_code=404, detail="知识库不存在")
  48. if name and not DatabaseUtils.validate_knowledge_base_name(name):
  49. raise HTTPException(status_code=400, detail="知识库名称格式不正确")
  50. if description and len(description) > 400:
  51. raise HTTPException(status_code=400, detail="知识库备注不能超过400字")
  52. db_kb.name = name
  53. db_kb.description = description
  54. db_kb.tags = tags
  55. db_kb.updated_at = datetime.utcnow()
  56. db.commit()
  57. db.refresh(db_kb)
  58. return db_kb
  59. @staticmethod
  60. def delete_knowledge_base(db: Session, kb_id: int) -> bool:
  61. db_kb = db.query(KnowledgeBase).filter(KnowledgeBase.id == kb_id, KnowledgeBase.is_deleted == 0).first()
  62. if not db_kb:
  63. raise HTTPException(status_code=404, detail="知识库不存在")
  64. # 删除知识库时将文件计数清零
  65. db_kb.file_count = 0
  66. db_kb.is_deleted = 1
  67. db_kb.updated_at = datetime.utcnow()
  68. db.commit()
  69. return True
  70. @staticmethod
  71. def get_knowledge_bases(db: Session, skip: int = 0, limit: int = 10, name: Optional[str] = None) -> tuple[List[KnowledgeBase], int]:
  72. query = db.query(
  73. KnowledgeBase,
  74. UserDataRelation.user_name
  75. ).outerjoin(
  76. UserDataRelation,
  77. and_(
  78. UserDataRelation.data_category == 'KnowledgeBase',
  79. UserDataRelation.data_id == KnowledgeBase.id
  80. )
  81. ).filter(KnowledgeBase.is_deleted == 0)
  82. if name:
  83. query = query.filter(KnowledgeBase.name.ilike(f"%{name}%"))
  84. total = query.count()
  85. results = query.offset(skip).limit(limit).all()
  86. # 将user_name赋值给KnowledgeBase对象
  87. knowledge_bases = []
  88. for kb, user_name in results:
  89. kb.user_name = user_name
  90. knowledge_bases.append(kb)
  91. return knowledge_bases, total
  92. @staticmethod
  93. def get_knowledge_base_by_name(db: Session, name: str) -> Optional[KnowledgeBase]:
  94. return db.query(KnowledgeBase).filter(KnowledgeBase.name == name, KnowledgeBase.is_deleted == 0).first()
  95. @staticmethod
  96. def increment_file_count(db: Session, kb_id: int) -> None:
  97. db_kb = db.query(KnowledgeBase).filter(KnowledgeBase.id == kb_id, KnowledgeBase.is_deleted == 0).first()
  98. if db_kb:
  99. db_kb.file_count += 1
  100. db.commit()
  101. @staticmethod
  102. def decrement_file_count(db: Session, kb_id: int) -> None:
  103. db_kb = db.query(KnowledgeBase).filter(KnowledgeBase.id == kb_id, KnowledgeBase.is_deleted == 0).first()
  104. if db_kb and db_kb.file_count > 0:
  105. db_kb.file_count -= 1
  106. db.commit()
  107. class MinioUtils:
  108. def __init__(self):
  109. self.client = Minio(
  110. settings.MINIO_ENDPOINT,
  111. access_key=settings.MINIO_ACCESS_KEY,
  112. secret_key=settings.MINIO_SECRET_KEY,
  113. secure=settings.MINIO_SECURE,
  114. http_client=urllib3.PoolManager(
  115. timeout=urllib3.Timeout(connect=10, read=60),
  116. maxsize=50,
  117. retries=urllib3.Retry(
  118. total=5,
  119. backoff_factor=0.5,
  120. status_forcelist=[500, 502, 503, 504]
  121. )
  122. )
  123. )
  124. self._ensure_bucket_exists()
  125. def _ensure_bucket_exists(self):
  126. if not self.client.bucket_exists(settings.MINIO_BUCKET_NAME):
  127. self.client.make_bucket(settings.MINIO_BUCKET_NAME)
  128. def upload_file(self, file_data: bytes, file_name: str, content_type: str, part_size: int = 15 * 1024 * 1024) -> str:
  129. import tempfile
  130. import os
  131. object_name = file_name
  132. try:
  133. # 创建临时文件
  134. with tempfile.NamedTemporaryFile(delete=False) as temp_file:
  135. temp_file.write(file_data)
  136. temp_file_path = temp_file.name
  137. # 使用fput_object进行上传,内部已实现分片上传
  138. self.client.fput_object(
  139. bucket_name=settings.MINIO_BUCKET_NAME,
  140. object_name=object_name,
  141. file_path=temp_file_path,
  142. content_type=content_type,
  143. part_size=part_size # 使用更大的分片大小,提高上传效率
  144. )
  145. return f"http://{settings.MINIO_ENDPOINT}/{settings.MINIO_BUCKET_NAME}/{object_name}"
  146. except Exception as e:
  147. raise HTTPException(status_code=500, detail=f"文件上传失败: {str(e)}")
  148. finally:
  149. # 清理临时文件
  150. try:
  151. os.unlink(temp_file_path)
  152. except:
  153. pass
  154. def download_file(self, object_name: str) -> bytes:
  155. try:
  156. response = self.client.get_object(settings.MINIO_BUCKET_NAME, object_name)
  157. return response.read()
  158. finally:
  159. response.close()
  160. response.release_conn()
  161. def delete_file(self, object_name: str) -> bool:
  162. try:
  163. self.client.remove_object(settings.MINIO_BUCKET_NAME, object_name)
  164. return True
  165. except:
  166. return False
  167. class FileUtils:
  168. @staticmethod
  169. def convert_office_file(input_path, output_dir, target_format):
  170. """使用LibreOffice转换Office文件格式
  171. Args:
  172. input_path (str): 输入文件路径
  173. output_dir (str): 输出目录
  174. target_format (str): 目标格式,如docx、pptx等
  175. Returns:
  176. str: 转换后的文件路径,转换失败则返回None
  177. """
  178. # 检查输入文件是否存在
  179. if not os.path.exists(input_path):
  180. office_logger.error(f"输入文件不存在: {input_path}")
  181. return None
  182. # 检查输出目录是否存在,不存在则创建
  183. if not os.path.exists(output_dir):
  184. try:
  185. os.makedirs(output_dir)
  186. office_logger.info(f"创建输出目录: {output_dir}")
  187. except OSError as e:
  188. office_logger.error(f"创建输出目录失败: {e}")
  189. return None
  190. # 检查输出目录权限
  191. if not os.access(output_dir, os.W_OK):
  192. office_logger.error(f"输出目录没有写入权限: {output_dir}")
  193. return None
  194. # 检查LibreOffice是否安装
  195. libreoffice_cmd = "soffice" # Linux/macOS
  196. if os.name == 'nt': # Windows
  197. libreoffice_cmd = r"C:\Program Files\LibreOffice\program\soffice.exe"
  198. # 检查LibreOffice命令是否可用
  199. try:
  200. version_cmd = [libreoffice_cmd, "--version"]
  201. version_result = subprocess.run(version_cmd, check=True, capture_output=True, text=True)
  202. office_logger.info(f"LibreOffice版本: {version_result.stdout.strip()}")
  203. except (subprocess.SubprocessError, FileNotFoundError) as e:
  204. office_logger.error(f"LibreOffice未安装或不可用: {e}")
  205. return None
  206. # 获取输入文件的文件名(不含路径和扩展名)
  207. filename = os.path.basename(input_path)
  208. base_name = os.path.splitext(filename)[0]
  209. input_ext = os.path.splitext(filename)[1][1:].lower()
  210. office_logger.info(f"原始文件名: {filename}, 基本名称: {base_name}, 扩展名: {input_ext}")
  211. # 如果输入文件扩展名与目标格式相同,直接复制文件
  212. if input_ext == target_format.lower():
  213. office_logger.info(f"输入文件已经是目标格式,直接复制文件")
  214. final_output_path = os.path.join(output_dir, f"{base_name}.{target_format}")
  215. try:
  216. shutil.copy2(input_path, final_output_path)
  217. office_logger.info(f"复制文件到最终位置: {final_output_path}")
  218. return final_output_path
  219. except (shutil.Error, IOError) as e:
  220. office_logger.error(f"复制文件失败: {e}")
  221. return None
  222. # 创建临时工作目录,避免中文路径问题
  223. temp_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), f"temp_convert_{int(time.time())}")
  224. try:
  225. os.makedirs(temp_dir)
  226. office_logger.info(f"创建临时工作目录: {temp_dir}")
  227. except OSError as e:
  228. office_logger.error(f"创建临时工作目录失败: {e}")
  229. return None
  230. # 复制原文件到临时目录,使用英文文件名
  231. temp_input_file = os.path.join(temp_dir, f"input.{input_ext}")
  232. try:
  233. shutil.copy2(input_path, temp_input_file)
  234. office_logger.info(f"复制文件到临时目录: {temp_input_file}")
  235. except (shutil.Error, IOError) as e:
  236. office_logger.error(f"复制文件失败: {e}")
  237. shutil.rmtree(temp_dir, ignore_errors=True)
  238. return None
  239. # 记录转换前输出目录中的文件
  240. before_files = set(os.listdir(temp_dir))
  241. office_logger.debug(f"转换前临时目录内容: {before_files}")
  242. # 构建转换命令
  243. cmd = [
  244. libreoffice_cmd,
  245. "--headless"
  246. ]
  247. # 根据文件类型选择合适的转换参数
  248. if input_ext == 'doc' and target_format.lower() == 'docx':
  249. cmd.extend(["--convert-to", "docx:MS Word 2007 XML"])
  250. elif input_ext == 'ppt' and target_format.lower() == 'pptx':
  251. cmd.extend(["--convert-to", "pptx:Impress MS PowerPoint 2007 XML"])
  252. else:
  253. cmd.extend(["--convert-to", target_format])
  254. # 添加输出目录和输入文件
  255. cmd.extend(["--outdir", temp_dir, temp_input_file])
  256. office_logger.info(f"开始转换文件: {temp_input_file} -> {target_format}")
  257. office_logger.info(f"执行命令: {' '.join(cmd)}")
  258. # 切换到临时目录执行命令,避免路径问题
  259. current_dir = os.getcwd()
  260. os.chdir(temp_dir)
  261. try:
  262. result = subprocess.run(cmd, check=True, capture_output=True, text=True)
  263. office_logger.info(f"转换命令输出: {result.stdout}")
  264. if result.stderr:
  265. office_logger.warning(f"转换命令错误输出: {result.stderr}")
  266. # 切回原目录
  267. os.chdir(current_dir)
  268. # 等待一小段时间确保文件写入完成
  269. time.sleep(1)
  270. # 记录转换后输出目录中的文件
  271. after_files = set(os.listdir(temp_dir))
  272. office_logger.debug(f"转换后临时目录内容: {after_files}")
  273. # 找出新增的文件
  274. new_files = after_files - before_files
  275. office_logger.info(f"新增文件: {new_files}")
  276. # 预期的输出文件名
  277. expected_output_filename = f"input.{target_format}"
  278. # 预期的输出文件路径(在临时目录中)
  279. expected_output_path = os.path.join(temp_dir, expected_output_filename)
  280. # 最终的输出文件路径(在目标目录中)
  281. final_output_path = os.path.join(output_dir, f"{base_name}.{target_format}")
  282. # 检查预期的输出文件是否存在
  283. if os.path.exists(expected_output_path):
  284. # 复制转换后的文件到最终目标位置
  285. try:
  286. shutil.copy2(expected_output_path, final_output_path)
  287. office_logger.info(f"复制转换后的文件到最终位置: {final_output_path}")
  288. # 清理临时目录
  289. shutil.rmtree(temp_dir, ignore_errors=True)
  290. return final_output_path
  291. except (shutil.Error, IOError) as e:
  292. office_logger.error(f"复制转换后的文件失败: {e}")
  293. elif new_files:
  294. # 如果有新文件生成,使用第一个新文件
  295. new_file_path = os.path.join(temp_dir, list(new_files)[0])
  296. try:
  297. shutil.copy2(new_file_path, final_output_path)
  298. office_logger.info(f"复制新生成的文件到最终位置: {final_output_path}")
  299. # 清理临时目录
  300. shutil.rmtree(temp_dir, ignore_errors=True)
  301. return final_output_path
  302. except (shutil.Error, IOError) as e:
  303. office_logger.error(f"复制新生成的文件失败: {e}")
  304. else:
  305. # 尝试在临时目录中查找匹配的文件
  306. pattern = os.path.join(temp_dir, f"*.{target_format}")
  307. matching_files = glob.glob(pattern)
  308. office_logger.info(f"匹配的文件列表: {matching_files}")
  309. if matching_files:
  310. # 按修改时间排序,获取最新的文件
  311. newest_file = max(matching_files, key=os.path.getmtime)
  312. try:
  313. shutil.copy2(newest_file, final_output_path)
  314. office_logger.info(f"复制匹配的文件到最终位置: {final_output_path}")
  315. # 清理临时目录
  316. shutil.rmtree(temp_dir, ignore_errors=True)
  317. return final_output_path
  318. except (shutil.Error, IOError) as e:
  319. office_logger.error(f"复制匹配的文件失败: {e}")
  320. # 如果所有尝试都失败,清理临时目录并返回None
  321. office_logger.error(f"转换后的文件不存在或无法复制")
  322. shutil.rmtree(temp_dir, ignore_errors=True)
  323. return None
  324. except subprocess.CalledProcessError as e:
  325. # 切回原目录
  326. os.chdir(current_dir)
  327. office_logger.error(f"转换失败: {e.stderr if hasattr(e, 'stderr') else str(e)}")
  328. # 清理临时目录
  329. shutil.rmtree(temp_dir, ignore_errors=True)
  330. return None
  331. except Exception as e:
  332. # 切回原目录
  333. os.chdir(current_dir)
  334. office_logger.error(f"转换过程中发生未知错误: {str(e)}")
  335. # 清理临时目录
  336. shutil.rmtree(temp_dir, ignore_errors=True)
  337. return None