123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374 |
- import re
- import io
- import os
- import time
- import glob
- import shutil
- import logging
- import subprocess
- from datetime import datetime
- from typing import List, Optional
- from minio import Minio
- import urllib3
- from sqlalchemy.orm import Session
- from fastapi import HTTPException
- from agent.models.web.knowledge_base import KnowledgeBase, KnowledgeFile
- from config.site import settings
- # 配置Office文件转换日志
- office_logger = logging.getLogger('office_conversion')
- office_logger.setLevel(logging.INFO)
- if not office_logger.handlers:
- handler = logging.StreamHandler()
- formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
- handler.setFormatter(formatter)
- office_logger.addHandler(handler)
- class DatabaseUtils:
- @staticmethod
- def validate_knowledge_base_name(name: str) -> bool:
- pattern = r'^[a-zA-Z0-9\u4e00-\u9fa5_\-\.]+$'
- return bool(re.match(pattern, name))
- @staticmethod
- def create_knowledge_base(db: Session, name: str, description: Optional[str] = None, tags: Optional[str] = None) -> KnowledgeBase:
- if not DatabaseUtils.validate_knowledge_base_name(name):
- raise HTTPException(status_code=400, detail="知识库名称格式不正确")
-
- if description and len(description) > 400:
- raise HTTPException(status_code=400, detail="知识库备注不能超过400字")
-
- db_kb = KnowledgeBase(name=name, description=description, tags=tags, file_count=0)
- db.add(db_kb)
- db.commit()
- db.refresh(db_kb)
- return db_kb
- @staticmethod
- def update_knowledge_base(db: Session, kb_id: int, name: str, description: Optional[str] = None, tags: Optional[str] = None) -> KnowledgeBase:
- db_kb = db.query(KnowledgeBase).filter(KnowledgeBase.id == kb_id, KnowledgeBase.is_deleted == 0).first()
- if not db_kb:
- raise HTTPException(status_code=404, detail="知识库不存在")
-
- if name and not DatabaseUtils.validate_knowledge_base_name(name):
- raise HTTPException(status_code=400, detail="知识库名称格式不正确")
-
- if description and len(description) > 400:
- raise HTTPException(status_code=400, detail="知识库备注不能超过400字")
-
- db_kb.name = name
- db_kb.description = description
- db_kb.tags = tags
- db_kb.updated_at = datetime.utcnow()
-
- db.commit()
- db.refresh(db_kb)
- return db_kb
- @staticmethod
- def delete_knowledge_base(db: Session, kb_id: int) -> bool:
- db_kb = db.query(KnowledgeBase).filter(KnowledgeBase.id == kb_id, KnowledgeBase.is_deleted == 0).first()
- if not db_kb:
- raise HTTPException(status_code=404, detail="知识库不存在")
-
- # 删除知识库时将文件计数清零
- db_kb.file_count = 0
- db_kb.is_deleted = 1
- db_kb.updated_at = datetime.utcnow()
- db.commit()
- return True
- @staticmethod
- def get_knowledge_bases(db: Session, skip: int = 0, limit: int = 10, name: Optional[str] = None) -> tuple[List[KnowledgeBase], int]:
- query = db.query(KnowledgeBase).filter(KnowledgeBase.is_deleted == 0)
- if name:
- query = query.filter(KnowledgeBase.name.ilike(f"%{name}%"))
- total = query.count()
- knowledge_bases = query.offset(skip).limit(limit).all()
- return knowledge_bases, total
- @staticmethod
- def get_knowledge_base_by_name(db: Session, name: str) -> Optional[KnowledgeBase]:
- return db.query(KnowledgeBase).filter(KnowledgeBase.name == name, KnowledgeBase.is_deleted == 0).first()
- @staticmethod
- def increment_file_count(db: Session, kb_id: int) -> None:
- db_kb = db.query(KnowledgeBase).filter(KnowledgeBase.id == kb_id, KnowledgeBase.is_deleted == 0).first()
- if db_kb:
- db_kb.file_count += 1
- db.commit()
- @staticmethod
- def decrement_file_count(db: Session, kb_id: int) -> None:
- db_kb = db.query(KnowledgeBase).filter(KnowledgeBase.id == kb_id, KnowledgeBase.is_deleted == 0).first()
- if db_kb and db_kb.file_count > 0:
- db_kb.file_count -= 1
- db.commit()
- class MinioUtils:
- def __init__(self):
- self.client = Minio(
- settings.MINIO_ENDPOINT,
- access_key=settings.MINIO_ACCESS_KEY,
- secret_key=settings.MINIO_SECRET_KEY,
- secure=settings.MINIO_SECURE,
- http_client=urllib3.PoolManager(
- timeout=urllib3.Timeout(connect=10, read=60),
- maxsize=50,
- retries=urllib3.Retry(
- total=5,
- backoff_factor=0.5,
- status_forcelist=[500, 502, 503, 504]
- )
- )
- )
- self._ensure_bucket_exists()
- def _ensure_bucket_exists(self):
- if not self.client.bucket_exists(settings.MINIO_BUCKET_NAME):
- self.client.make_bucket(settings.MINIO_BUCKET_NAME)
- def upload_file(self, file_data: bytes, file_name: str, content_type: str, part_size: int = 15 * 1024 * 1024) -> str:
- import tempfile
- import os
-
- object_name = file_name
- try:
- # 创建临时文件
- with tempfile.NamedTemporaryFile(delete=False) as temp_file:
- temp_file.write(file_data)
- temp_file_path = temp_file.name
- # 使用fput_object进行上传,内部已实现分片上传
- self.client.fput_object(
- bucket_name=settings.MINIO_BUCKET_NAME,
- object_name=object_name,
- file_path=temp_file_path,
- content_type=content_type,
- part_size=part_size # 使用更大的分片大小,提高上传效率
- )
- return f"http://{settings.MINIO_ENDPOINT}/{settings.MINIO_BUCKET_NAME}/{object_name}"
- except Exception as e:
- raise HTTPException(status_code=500, detail=f"文件上传失败: {str(e)}")
- finally:
- # 清理临时文件
- try:
- os.unlink(temp_file_path)
- except:
- pass
- def download_file(self, object_name: str) -> bytes:
- try:
- response = self.client.get_object(settings.MINIO_BUCKET_NAME, object_name)
- return response.read()
- finally:
- response.close()
- response.release_conn()
- def delete_file(self, object_name: str) -> bool:
- try:
- self.client.remove_object(settings.MINIO_BUCKET_NAME, object_name)
- return True
- except:
- return False
- class FileUtils:
- @staticmethod
- def convert_office_file(input_path, output_dir, target_format):
- """使用LibreOffice转换Office文件格式
-
- Args:
- input_path (str): 输入文件路径
- output_dir (str): 输出目录
- target_format (str): 目标格式,如docx、pptx等
-
- Returns:
- str: 转换后的文件路径,转换失败则返回None
- """
- # 检查输入文件是否存在
- if not os.path.exists(input_path):
- office_logger.error(f"输入文件不存在: {input_path}")
- return None
-
- # 检查输出目录是否存在,不存在则创建
- if not os.path.exists(output_dir):
- try:
- os.makedirs(output_dir)
- office_logger.info(f"创建输出目录: {output_dir}")
- except OSError as e:
- office_logger.error(f"创建输出目录失败: {e}")
- return None
-
- # 检查输出目录权限
- if not os.access(output_dir, os.W_OK):
- office_logger.error(f"输出目录没有写入权限: {output_dir}")
- return None
-
- # 检查LibreOffice是否安装
- libreoffice_cmd = "soffice" # Linux/macOS
- if os.name == 'nt': # Windows
- libreoffice_cmd = r"C:\Program Files\LibreOffice\program\soffice.exe"
-
- # 检查LibreOffice命令是否可用
- try:
- version_cmd = [libreoffice_cmd, "--version"]
- version_result = subprocess.run(version_cmd, check=True, capture_output=True, text=True)
- office_logger.info(f"LibreOffice版本: {version_result.stdout.strip()}")
- except (subprocess.SubprocessError, FileNotFoundError) as e:
- office_logger.error(f"LibreOffice未安装或不可用: {e}")
- return None
-
- # 获取输入文件的文件名(不含路径和扩展名)
- filename = os.path.basename(input_path)
- base_name = os.path.splitext(filename)[0]
- input_ext = os.path.splitext(filename)[1][1:].lower()
- office_logger.info(f"原始文件名: {filename}, 基本名称: {base_name}, 扩展名: {input_ext}")
-
- # 如果输入文件扩展名与目标格式相同,直接复制文件
- if input_ext == target_format.lower():
- office_logger.info(f"输入文件已经是目标格式,直接复制文件")
- final_output_path = os.path.join(output_dir, f"{base_name}.{target_format}")
- try:
- shutil.copy2(input_path, final_output_path)
- office_logger.info(f"复制文件到最终位置: {final_output_path}")
- return final_output_path
- except (shutil.Error, IOError) as e:
- office_logger.error(f"复制文件失败: {e}")
- return None
-
- # 创建临时工作目录,避免中文路径问题
- temp_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), f"temp_convert_{int(time.time())}")
- try:
- os.makedirs(temp_dir)
- office_logger.info(f"创建临时工作目录: {temp_dir}")
- except OSError as e:
- office_logger.error(f"创建临时工作目录失败: {e}")
- return None
-
- # 复制原文件到临时目录,使用英文文件名
- temp_input_file = os.path.join(temp_dir, f"input.{input_ext}")
- try:
- shutil.copy2(input_path, temp_input_file)
- office_logger.info(f"复制文件到临时目录: {temp_input_file}")
- except (shutil.Error, IOError) as e:
- office_logger.error(f"复制文件失败: {e}")
- shutil.rmtree(temp_dir, ignore_errors=True)
- return None
-
- # 记录转换前输出目录中的文件
- before_files = set(os.listdir(temp_dir))
- office_logger.debug(f"转换前临时目录内容: {before_files}")
-
- # 构建转换命令
- cmd = [
- libreoffice_cmd,
- "--headless"
- ]
-
- # 根据文件类型选择合适的转换参数
- if input_ext == 'doc' and target_format.lower() == 'docx':
- cmd.extend(["--convert-to", "docx:MS Word 2007 XML"])
- elif input_ext == 'ppt' and target_format.lower() == 'pptx':
- cmd.extend(["--convert-to", "pptx:Impress MS PowerPoint 2007 XML"])
- else:
- cmd.extend(["--convert-to", target_format])
-
- # 添加输出目录和输入文件
- cmd.extend(["--outdir", temp_dir, temp_input_file])
-
- office_logger.info(f"开始转换文件: {temp_input_file} -> {target_format}")
- office_logger.info(f"执行命令: {' '.join(cmd)}")
-
- # 切换到临时目录执行命令,避免路径问题
- current_dir = os.getcwd()
- os.chdir(temp_dir)
-
- try:
- result = subprocess.run(cmd, check=True, capture_output=True, text=True)
- office_logger.info(f"转换命令输出: {result.stdout}")
- if result.stderr:
- office_logger.warning(f"转换命令错误输出: {result.stderr}")
-
- # 切回原目录
- os.chdir(current_dir)
-
- # 等待一小段时间确保文件写入完成
- time.sleep(1)
-
- # 记录转换后输出目录中的文件
- after_files = set(os.listdir(temp_dir))
- office_logger.debug(f"转换后临时目录内容: {after_files}")
-
- # 找出新增的文件
- new_files = after_files - before_files
- office_logger.info(f"新增文件: {new_files}")
-
- # 预期的输出文件名
- expected_output_filename = f"input.{target_format}"
-
- # 预期的输出文件路径(在临时目录中)
- expected_output_path = os.path.join(temp_dir, expected_output_filename)
-
- # 最终的输出文件路径(在目标目录中)
- final_output_path = os.path.join(output_dir, f"{base_name}.{target_format}")
-
- # 检查预期的输出文件是否存在
- if os.path.exists(expected_output_path):
- # 复制转换后的文件到最终目标位置
- try:
- shutil.copy2(expected_output_path, final_output_path)
- office_logger.info(f"复制转换后的文件到最终位置: {final_output_path}")
- # 清理临时目录
- shutil.rmtree(temp_dir, ignore_errors=True)
- return final_output_path
- except (shutil.Error, IOError) as e:
- office_logger.error(f"复制转换后的文件失败: {e}")
- elif new_files:
- # 如果有新文件生成,使用第一个新文件
- new_file_path = os.path.join(temp_dir, list(new_files)[0])
- try:
- shutil.copy2(new_file_path, final_output_path)
- office_logger.info(f"复制新生成的文件到最终位置: {final_output_path}")
- # 清理临时目录
- shutil.rmtree(temp_dir, ignore_errors=True)
- return final_output_path
- except (shutil.Error, IOError) as e:
- office_logger.error(f"复制新生成的文件失败: {e}")
- else:
- # 尝试在临时目录中查找匹配的文件
- pattern = os.path.join(temp_dir, f"*.{target_format}")
- matching_files = glob.glob(pattern)
- office_logger.info(f"匹配的文件列表: {matching_files}")
-
- if matching_files:
- # 按修改时间排序,获取最新的文件
- newest_file = max(matching_files, key=os.path.getmtime)
- try:
- shutil.copy2(newest_file, final_output_path)
- office_logger.info(f"复制匹配的文件到最终位置: {final_output_path}")
- # 清理临时目录
- shutil.rmtree(temp_dir, ignore_errors=True)
- return final_output_path
- except (shutil.Error, IOError) as e:
- office_logger.error(f"复制匹配的文件失败: {e}")
-
- # 如果所有尝试都失败,清理临时目录并返回None
- office_logger.error(f"转换后的文件不存在或无法复制")
- shutil.rmtree(temp_dir, ignore_errors=True)
- return None
- except subprocess.CalledProcessError as e:
- # 切回原目录
- os.chdir(current_dir)
- office_logger.error(f"转换失败: {e.stderr if hasattr(e, 'stderr') else str(e)}")
- # 清理临时目录
- shutil.rmtree(temp_dir, ignore_errors=True)
- return None
- except Exception as e:
- # 切回原目录
- os.chdir(current_dir)
- office_logger.error(f"转换过程中发生未知错误: {str(e)}")
- # 清理临时目录
- shutil.rmtree(temp_dir, ignore_errors=True)
- return None
|