import re import io import os import time import glob import shutil import logging import subprocess from datetime import datetime from typing import List, Optional from minio import Minio import urllib3 from sqlalchemy.orm import Session from sqlalchemy import and_ from fastapi import HTTPException from agent.models.web.knowledge_base import KnowledgeBase, KnowledgeFile from agent.models.db.graph import DbUserDataRelation as UserDataRelation from config.site import settings # 配置Office文件转换日志 office_logger = logging.getLogger('office_conversion') office_logger.setLevel(logging.INFO) if not office_logger.handlers: handler = logging.StreamHandler() formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') handler.setFormatter(formatter) office_logger.addHandler(handler) class DatabaseUtils: @staticmethod def validate_knowledge_base_name(name: str) -> bool: pattern = r'^[a-zA-Z0-9\u4e00-\u9fa5_\-\.]+$' return bool(re.match(pattern, name)) @staticmethod def create_knowledge_base(db: Session, name: str, creator: Optional[str] = None, description: Optional[str] = None, tags: Optional[str] = None) -> KnowledgeBase: if not DatabaseUtils.validate_knowledge_base_name(name): raise HTTPException(status_code=400, detail="知识库名称格式不正确") if description and len(description) > 400: raise HTTPException(status_code=400, detail="知识库备注不能超过400字") db_kb = KnowledgeBase(name=name, description=description, creator = creator, tags=tags, file_count=0) db.add(db_kb) db.commit() db.refresh(db_kb) return db_kb @staticmethod def update_knowledge_base(db: Session, kb_id: int, name: str, description: Optional[str] = None, tags: Optional[str] = None) -> KnowledgeBase: db_kb = db.query(KnowledgeBase).filter(KnowledgeBase.id == kb_id, KnowledgeBase.is_deleted == 0).first() if not db_kb: raise HTTPException(status_code=404, detail="知识库不存在") if name and not DatabaseUtils.validate_knowledge_base_name(name): raise HTTPException(status_code=400, detail="知识库名称格式不正确") if description and len(description) > 400: raise HTTPException(status_code=400, detail="知识库备注不能超过400字") db_kb.name = name db_kb.description = description db_kb.tags = tags db_kb.updated_at = datetime.utcnow() db.commit() db.refresh(db_kb) return db_kb @staticmethod def delete_knowledge_base(db: Session, kb_id: int) -> bool: db_kb = db.query(KnowledgeBase).filter(KnowledgeBase.id == kb_id, KnowledgeBase.is_deleted == 0).first() if not db_kb: raise HTTPException(status_code=404, detail="知识库不存在") # 删除知识库时将文件计数清零 db_kb.file_count = 0 db_kb.is_deleted = 1 db_kb.updated_at = datetime.utcnow() db.commit() return True @staticmethod def get_knowledge_bases(db: Session, skip: int = 0, limit: int = 10, name: Optional[str] = None) -> tuple[List[KnowledgeBase], int]: query = db.query( KnowledgeBase, UserDataRelation.user_name ).outerjoin( UserDataRelation, and_( UserDataRelation.data_category == 'KnowledgeBase', UserDataRelation.data_id == KnowledgeBase.id ) ).filter(KnowledgeBase.is_deleted == 0) if name: query = query.filter(KnowledgeBase.name.ilike(f"%{name}%")) total = query.count() results = query.offset(skip).limit(limit).all() # 将user_name赋值给KnowledgeBase对象 knowledge_bases = [] for kb, user_name in results: kb.user_name = user_name knowledge_bases.append(kb) return knowledge_bases, total @staticmethod def get_knowledge_base_by_name(db: Session, name: str) -> Optional[KnowledgeBase]: return db.query(KnowledgeBase).filter(KnowledgeBase.name == name, KnowledgeBase.is_deleted == 0).first() @staticmethod def increment_file_count(db: Session, kb_id: int) -> None: db_kb = db.query(KnowledgeBase).filter(KnowledgeBase.id == kb_id, KnowledgeBase.is_deleted == 0).first() if db_kb: db_kb.file_count += 1 db.commit() @staticmethod def decrement_file_count(db: Session, kb_id: int) -> None: db_kb = db.query(KnowledgeBase).filter(KnowledgeBase.id == kb_id, KnowledgeBase.is_deleted == 0).first() if db_kb and db_kb.file_count > 0: db_kb.file_count -= 1 db.commit() class MinioUtils: def __init__(self): self.client = Minio( settings.MINIO_ENDPOINT, access_key=settings.MINIO_ACCESS_KEY, secret_key=settings.MINIO_SECRET_KEY, secure=settings.MINIO_SECURE, http_client=urllib3.PoolManager( timeout=urllib3.Timeout(connect=10, read=60), maxsize=50, retries=urllib3.Retry( total=5, backoff_factor=0.5, status_forcelist=[500, 502, 503, 504] ) ) ) self._ensure_bucket_exists() def _ensure_bucket_exists(self): if not self.client.bucket_exists(settings.MINIO_BUCKET_NAME): self.client.make_bucket(settings.MINIO_BUCKET_NAME) def upload_file(self, file_data: bytes, file_name: str, content_type: str, part_size: int = 15 * 1024 * 1024) -> str: import tempfile import os object_name = file_name try: # 创建临时文件 with tempfile.NamedTemporaryFile(delete=False) as temp_file: temp_file.write(file_data) temp_file_path = temp_file.name # 使用fput_object进行上传,内部已实现分片上传 self.client.fput_object( bucket_name=settings.MINIO_BUCKET_NAME, object_name=object_name, file_path=temp_file_path, content_type=content_type, part_size=part_size # 使用更大的分片大小,提高上传效率 ) return f"http://{settings.MINIO_ENDPOINT}/{settings.MINIO_BUCKET_NAME}/{object_name}" except Exception as e: raise HTTPException(status_code=500, detail=f"文件上传失败: {str(e)}") finally: # 清理临时文件 try: os.unlink(temp_file_path) except: pass def download_file(self, object_name: str) -> bytes: try: response = self.client.get_object(settings.MINIO_BUCKET_NAME, object_name) return response.read() finally: response.close() response.release_conn() def delete_file(self, object_name: str) -> bool: try: self.client.remove_object(settings.MINIO_BUCKET_NAME, object_name) return True except: return False class FileUtils: @staticmethod def convert_office_file(input_path, output_dir, target_format): """使用LibreOffice转换Office文件格式 Args: input_path (str): 输入文件路径 output_dir (str): 输出目录 target_format (str): 目标格式,如docx、pptx等 Returns: str: 转换后的文件路径,转换失败则返回None """ # 检查输入文件是否存在 if not os.path.exists(input_path): office_logger.error(f"输入文件不存在: {input_path}") return None # 检查输出目录是否存在,不存在则创建 if not os.path.exists(output_dir): try: os.makedirs(output_dir) office_logger.info(f"创建输出目录: {output_dir}") except OSError as e: office_logger.error(f"创建输出目录失败: {e}") return None # 检查输出目录权限 if not os.access(output_dir, os.W_OK): office_logger.error(f"输出目录没有写入权限: {output_dir}") return None # 检查LibreOffice是否安装 libreoffice_cmd = "soffice" # Linux/macOS if os.name == 'nt': # Windows libreoffice_cmd = r"C:\Program Files\LibreOffice\program\soffice.exe" # 检查LibreOffice命令是否可用 try: version_cmd = [libreoffice_cmd, "--version"] version_result = subprocess.run(version_cmd, check=True, capture_output=True, text=True) office_logger.info(f"LibreOffice版本: {version_result.stdout.strip()}") except (subprocess.SubprocessError, FileNotFoundError) as e: office_logger.error(f"LibreOffice未安装或不可用: {e}") return None # 获取输入文件的文件名(不含路径和扩展名) filename = os.path.basename(input_path) base_name = os.path.splitext(filename)[0] input_ext = os.path.splitext(filename)[1][1:].lower() office_logger.info(f"原始文件名: {filename}, 基本名称: {base_name}, 扩展名: {input_ext}") # 如果输入文件扩展名与目标格式相同,直接复制文件 if input_ext == target_format.lower(): office_logger.info(f"输入文件已经是目标格式,直接复制文件") final_output_path = os.path.join(output_dir, f"{base_name}.{target_format}") try: shutil.copy2(input_path, final_output_path) office_logger.info(f"复制文件到最终位置: {final_output_path}") return final_output_path except (shutil.Error, IOError) as e: office_logger.error(f"复制文件失败: {e}") return None # 创建临时工作目录,避免中文路径问题 temp_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), f"temp_convert_{int(time.time())}") try: os.makedirs(temp_dir) office_logger.info(f"创建临时工作目录: {temp_dir}") except OSError as e: office_logger.error(f"创建临时工作目录失败: {e}") return None # 复制原文件到临时目录,使用英文文件名 temp_input_file = os.path.join(temp_dir, f"input.{input_ext}") try: shutil.copy2(input_path, temp_input_file) office_logger.info(f"复制文件到临时目录: {temp_input_file}") except (shutil.Error, IOError) as e: office_logger.error(f"复制文件失败: {e}") shutil.rmtree(temp_dir, ignore_errors=True) return None # 记录转换前输出目录中的文件 before_files = set(os.listdir(temp_dir)) office_logger.debug(f"转换前临时目录内容: {before_files}") # 构建转换命令 cmd = [ libreoffice_cmd, "--headless" ] # 根据文件类型选择合适的转换参数 if input_ext == 'doc' and target_format.lower() == 'docx': cmd.extend(["--convert-to", "docx:MS Word 2007 XML"]) elif input_ext == 'ppt' and target_format.lower() == 'pptx': cmd.extend(["--convert-to", "pptx:Impress MS PowerPoint 2007 XML"]) else: cmd.extend(["--convert-to", target_format]) # 添加输出目录和输入文件 cmd.extend(["--outdir", temp_dir, temp_input_file]) office_logger.info(f"开始转换文件: {temp_input_file} -> {target_format}") office_logger.info(f"执行命令: {' '.join(cmd)}") # 切换到临时目录执行命令,避免路径问题 current_dir = os.getcwd() os.chdir(temp_dir) try: result = subprocess.run(cmd, check=True, capture_output=True, text=True) office_logger.info(f"转换命令输出: {result.stdout}") if result.stderr: office_logger.warning(f"转换命令错误输出: {result.stderr}") # 切回原目录 os.chdir(current_dir) # 等待一小段时间确保文件写入完成 time.sleep(1) # 记录转换后输出目录中的文件 after_files = set(os.listdir(temp_dir)) office_logger.debug(f"转换后临时目录内容: {after_files}") # 找出新增的文件 new_files = after_files - before_files office_logger.info(f"新增文件: {new_files}") # 预期的输出文件名 expected_output_filename = f"input.{target_format}" # 预期的输出文件路径(在临时目录中) expected_output_path = os.path.join(temp_dir, expected_output_filename) # 最终的输出文件路径(在目标目录中) final_output_path = os.path.join(output_dir, f"{base_name}.{target_format}") # 检查预期的输出文件是否存在 if os.path.exists(expected_output_path): # 复制转换后的文件到最终目标位置 try: shutil.copy2(expected_output_path, final_output_path) office_logger.info(f"复制转换后的文件到最终位置: {final_output_path}") # 清理临时目录 shutil.rmtree(temp_dir, ignore_errors=True) return final_output_path except (shutil.Error, IOError) as e: office_logger.error(f"复制转换后的文件失败: {e}") elif new_files: # 如果有新文件生成,使用第一个新文件 new_file_path = os.path.join(temp_dir, list(new_files)[0]) try: shutil.copy2(new_file_path, final_output_path) office_logger.info(f"复制新生成的文件到最终位置: {final_output_path}") # 清理临时目录 shutil.rmtree(temp_dir, ignore_errors=True) return final_output_path except (shutil.Error, IOError) as e: office_logger.error(f"复制新生成的文件失败: {e}") else: # 尝试在临时目录中查找匹配的文件 pattern = os.path.join(temp_dir, f"*.{target_format}") matching_files = glob.glob(pattern) office_logger.info(f"匹配的文件列表: {matching_files}") if matching_files: # 按修改时间排序,获取最新的文件 newest_file = max(matching_files, key=os.path.getmtime) try: shutil.copy2(newest_file, final_output_path) office_logger.info(f"复制匹配的文件到最终位置: {final_output_path}") # 清理临时目录 shutil.rmtree(temp_dir, ignore_errors=True) return final_output_path except (shutil.Error, IOError) as e: office_logger.error(f"复制匹配的文件失败: {e}") # 如果所有尝试都失败,清理临时目录并返回None office_logger.error(f"转换后的文件不存在或无法复制") shutil.rmtree(temp_dir, ignore_errors=True) return None except subprocess.CalledProcessError as e: # 切回原目录 os.chdir(current_dir) office_logger.error(f"转换失败: {e.stderr if hasattr(e, 'stderr') else str(e)}") # 清理临时目录 shutil.rmtree(temp_dir, ignore_errors=True) return None except Exception as e: # 切回原目录 os.chdir(current_dir) office_logger.error(f"转换过程中发生未知错误: {str(e)}") # 清理临时目录 shutil.rmtree(temp_dir, ignore_errors=True) return None