123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252 |
- import os
- import numpy as np
- import re
- from typing import List, Dict, Tuple, Optional, Union, Callable
- from sklearn.feature_extraction.text import TfidfVectorizer, CountVectorizer
- from sklearn.metrics.pairwise import cosine_similarity
- import jieba
- import argparse
- class TextSimilarityFinder:
- """文本相似度查找器,用于从多个文本中找到与目标文本最相似的文本"""
-
- def __init__(self, method: str = 'tfidf', use_jieba: bool = True, ngram_range: Tuple[int, int] = (1, 1)):
- """
- 初始化文本相似度查找器
-
- Args:
- method: 相似度计算方法,可选值为 'tfidf'、'count'、'jaccard'
- use_jieba: 是否使用结巴分词(中文文本建议开启)
- ngram_range: N-gram范围,例如(1,2)表示同时使用unigram和bigram特征
- """
- self.method = method
- self.use_jieba = use_jieba
- self.ngram_range = ngram_range
- self.vectorizer = None
- self.corpus_vectors = None
- self.corpus_texts = None
- self.corpus_paths = None
-
- def _tokenize_text(self, text: str) -> str:
- """对文本进行分词处理
-
- Args:
- text: 输入文本
-
- Returns:
- str: 分词后的文本
- """
- if not self.use_jieba:
- return text
-
- # 使用结巴分词,返回空格分隔的词语
- return " ".join(jieba.cut(text))
-
- def _preprocess_texts(self, texts: List[str]) -> List[str]:
- """预处理文本列表
-
- Args:
- texts: 文本列表
-
- Returns:
- List[str]: 预处理后的文本列表
- """
- processed_texts = []
- for text in texts:
- # 去除标点符号、特殊字符和多余空白字符
- # 匹配中文标点符号(包括圆点•)
- text = re.sub(r'[\u3000-\u303F]|[\uFF00-\uFFEF]|[•]', ' ', text)
- # 匹配英文标点符号和特殊字符
- text = re.sub(r'[!"#$%&\'()*+,-./:;<=>?@\[\\\]^_`{|}~]', ' ', text)
- # 匹配其他特殊字符(如制表符、换行符等)
- text = re.sub(r'[\t\n\r\f\v]', ' ', text)
- # 去除多余空白字符
- text = re.sub(r'\s+', ' ', text).strip()
- # 分词处理
- text = self._tokenize_text(text)
- processed_texts.append(text)
- return processed_texts
-
- def _initialize_vectorizer(self):
- """初始化文本向量化器"""
- if self.method == 'tfidf':
- self.vectorizer = TfidfVectorizer(ngram_range=self.ngram_range)
- elif self.method == 'count':
- self.vectorizer = CountVectorizer(ngram_range=self.ngram_range)
- elif self.method == 'jaccard':
- # Jaccard相似度不需要向量化器,使用集合操作
- self.vectorizer = None
- else:
- raise ValueError(f"不支持的相似度计算方法: {self.method}")
-
- def _calculate_jaccard_similarity(self, text1: str, text2: str) -> float:
- """计算两个文本的Jaccard相似度
-
- Args:
- text1: 第一个文本
- text2: 第二个文本
-
- Returns:
- float: Jaccard相似度
- """
- # 将文本转换为词集合
- set1 = set(text1.split())
- set2 = set(text2.split())
-
- # 计算Jaccard相似度: 交集大小 / 并集大小
- intersection = len(set1.intersection(set2))
- union = len(set1.union(set2))
-
- # 避免除以零
- if union == 0:
- return 0.0
-
- return intersection / union
-
- def load_corpus_from_directory(self, directory: str, file_pattern: str = '*.txt'):
- """从目录加载语料库
-
- Args:
- directory: 目录路径
- file_pattern: 文件匹配模式
- """
- import glob
-
- # 获取所有匹配的文件路径
- file_paths = glob.glob(os.path.join(directory, file_pattern))
-
- if not file_paths:
- raise ValueError(f"在目录 {directory} 中没有找到匹配 {file_pattern} 的文件")
-
- # 读取所有文件内容
- texts = []
- for file_path in file_paths:
- try:
- with open(file_path, 'r', encoding='utf-8') as f:
- texts.append(f.read())
- except Exception as e:
- print(f"读取文件 {file_path} 失败: {str(e)}")
-
- self.load_corpus(texts, file_paths)
-
- def load_corpus(self, texts: List[str], paths: Optional[List[str]] = None):
- """加载语料库
-
- Args:
- texts: 文本列表
- paths: 文本对应的路径或标识符列表
- """
- if not texts:
- raise ValueError("文本列表不能为空")
-
- # 预处理文本
- processed_texts = self._preprocess_texts(texts)
-
- # 初始化向量化器
- self._initialize_vectorizer()
-
- # 存储原始文本和路径
- self.corpus_texts = texts
- self.corpus_paths = paths if paths else [f"text_{i}" for i in range(len(texts))]
-
- # 向量化语料库
- if self.method in ['tfidf', 'count']:
- self.corpus_vectors = self.vectorizer.fit_transform(processed_texts)
- else:
- # 对于Jaccard相似度,直接存储处理后的文本
- self.corpus_vectors = processed_texts
-
- def find_most_similar(self, query_text: str, top_n: int = 1) -> List[Dict]:
- """查找与查询文本最相似的文本
-
- Args:
- query_text: 查询文本
- top_n: 返回最相似的前N个结果
-
- Returns:
- List[Dict]: 包含相似度信息的结果列表
- """
- if self.corpus_vectors is None:
- raise ValueError("请先加载语料库")
-
- # 预处理查询文本
- processed_query = self._preprocess_texts([query_text])[0]
-
- # 计算相似度
- similarities = []
-
- if self.method in ['tfidf', 'count']:
- # 向量化查询文本
- query_vector = self.vectorizer.transform([processed_query])
-
- # 计算余弦相似度
- similarity_matrix = cosine_similarity(query_vector, self.corpus_vectors)
- similarities = similarity_matrix[0]
- else: # Jaccard相似度
- for corpus_text in self.corpus_vectors:
- similarity = self._calculate_jaccard_similarity(processed_query, corpus_text)
- similarities.append(similarity)
-
- # 获取相似度排序的索引
- top_indices = np.argsort(similarities)[::-1][:top_n]
-
- # 构建结果
- results = []
- for idx in top_indices:
- results.append({
- 'text': self.corpus_texts[idx],
- 'path': self.corpus_paths[idx],
- 'similarity': similarities[idx]
- })
-
- return results
- def main():
- # 解析命令行参数
- parser = argparse.ArgumentParser(description='文本相似度查找工具')
- parser.add_argument('--query', type=str, required=True, help='查询文本或查询文本文件路径')
- parser.add_argument('--dir', type=str, required=True, help='语料库目录路径')
- parser.add_argument('--pattern', type=str, default='*.txt', help='文件匹配模式')
- parser.add_argument('--method', type=str, default='tfidf', choices=['tfidf', 'count', 'jaccard'], help='相似度计算方法')
- parser.add_argument('--top_n', type=int, default=3, help='返回最相似的前N个结果')
- parser.add_argument('--no_jieba', action='store_true', help='不使用结巴分词')
- parser.add_argument('--ngram_min', type=int, default=1, help='N-gram最小值')
- parser.add_argument('--ngram_max', type=int, default=1, help='N-gram最大值')
-
- args = parser.parse_args()
-
- # 初始化文本相似度查找器
- finder = TextSimilarityFinder(
- method=args.method,
- use_jieba=not args.no_jieba,
- ngram_range=(args.ngram_min, args.ngram_max)
- )
-
- # 加载语料库
- finder.load_corpus_from_directory(args.dir, args.pattern)
-
- # 获取查询文本
- query_text = args.query
- if os.path.isfile(query_text):
- try:
- with open(query_text, 'r', encoding='utf-8') as f:
- query_text = f.read()
- except Exception as e:
- print(f"读取查询文件失败: {str(e)}")
- return
-
- # 查找最相似的文本
- results = finder.find_most_similar(query_text, args.top_n)
-
- # 打印结果
- print(f"\n查询文本: {query_text[:100]}..." if len(query_text) > 100 else f"\n查询文本: {query_text}")
- print(f"\n找到 {len(results)} 个最相似的文本:")
-
- for i, result in enumerate(results):
- print(f"\n{i+1}. 相似度: {result['similarity']:.4f}")
- print(f" 路径: {result['path']}")
- text_preview = result['text'][:200] + '...' if len(result['text']) > 200 else result['text']
- print(f" 内容预览: {text_preview}")
- if __name__ == "__main__":
- main()
|