|
@@ -34,18 +34,74 @@ class TestTrunksServiceCRUD:
|
|
|
assert trunks_service.get_trunk_by_id(trunk.id) is None
|
|
|
|
|
|
class TestSearchOperations:
|
|
|
- def test_vector_search(self, trunks_service, test_trunk_data):
|
|
|
- results = trunks_service.search_by_vector("急性胰腺炎是常见的急腹症之一,以突发上腹部剧痛伴恶心呕吐为特征。轻症预后良好,重症可并发多器官衰竭,死亡率高达20-30%。",10,conversation_id="1111111aaaa")
|
|
|
- print("搜索结果:", results[0])
|
|
|
- results = trunks_service.get_cache("1111111aaaa")
|
|
|
- print("搜索结果:", results)
|
|
|
- assert len(results) > 0
|
|
|
-
|
|
|
- # def test_fulltext_search(self, trunks_service, test_trunk_data):
|
|
|
- # trunks_service.create_trunk(test_trunk_data)
|
|
|
- # results = trunks_service.fulltext_search("测试")
|
|
|
- # assert len(results) > 0
|
|
|
|
|
|
+
|
|
|
+ def test_vector_search(self, trunks_service):
|
|
|
+ page = 1
|
|
|
+ limit = 100
|
|
|
+ while True:
|
|
|
+ results = trunks_service.paginated_search_by_type_and_filepath({'pageNo': page, 'limit': limit, 'type': 'trunk', 'file_path': 'test_path.pdf'})
|
|
|
+ if not results['data']:
|
|
|
+ break
|
|
|
+ for record in results['data']:
|
|
|
+ print(f"{record['id']}{record['type']}{record['title']}{record['file_path']}")
|
|
|
+ if record['type'] != 'trunk' or '内科学 第10版' not in record['file_path']:
|
|
|
+ print('出现异常数据')
|
|
|
+ break
|
|
|
+
|
|
|
+ page_no = self.get_page_no(record['content'],trunks_service)
|
|
|
+ if page_no is None:
|
|
|
+ print(f"{record['id']}找到page_no: {page_no}")
|
|
|
+ continue
|
|
|
+ trunks_service.update_trunk(record['id'], {'page_no': page_no})
|
|
|
+ page += 1
|
|
|
+
|
|
|
+ def get_page_no(self, text: str, trunks_service) -> int:
|
|
|
+ results = trunks_service.search_by_vector(text,1000,type='page',conversation_id="1111111aaaa")
|
|
|
+ sentences = self.split_text(text)
|
|
|
+ count = 0
|
|
|
+ for r in results:
|
|
|
+ #将r["content"]的所有空白字符去掉
|
|
|
+ content = re.sub(r'[^\w\d\p{P}\p{L}]', '', r["content"])
|
|
|
+ count+=1
|
|
|
+ match_count = 0
|
|
|
+ length = len(sentences)/2
|
|
|
+ for sentence in sentences:
|
|
|
+ sentence = re.sub(r'[^\w\d\p{P}\p{L}]', '', sentence)
|
|
|
+ if sentence in content:
|
|
|
+ match_count += 1
|
|
|
+ if match_count >= length:
|
|
|
+ return r["page_no"]
|
|
|
+
|
|
|
+ def split_text(self, text):
|
|
|
+ """将文本分割成句子"""
|
|
|
+ print(text)
|
|
|
+ # 使用常见的标点符号作为分隔符
|
|
|
+ delimiters = ['!', '?', '。', '!', '?', '\n', ';', '。', ';']
|
|
|
+ sentences = [text]
|
|
|
+ for delimiter in delimiters:
|
|
|
+ new_sentences = []
|
|
|
+ for sentence in sentences:
|
|
|
+ parts = sentence.split(delimiter)
|
|
|
+ new_sentences.extend([part + delimiter if i < len(parts) - 1 else part for i, part in enumerate(parts)])
|
|
|
+ sentences = [s.strip() for s in new_sentences if s.strip()]
|
|
|
+
|
|
|
+ # 合并短句子
|
|
|
+ merged_sentences = []
|
|
|
+ buffer = ""
|
|
|
+ for sentence in sentences:
|
|
|
+ buffer += " " + sentence if buffer else sentence
|
|
|
+ if len(buffer) >= 10:
|
|
|
+ merged_sentences.append(buffer)
|
|
|
+ buffer = ""
|
|
|
+ if buffer:
|
|
|
+ merged_sentences.append(buffer)
|
|
|
+
|
|
|
+ # 打印最终句子
|
|
|
+ for i, sentence in enumerate(merged_sentences):
|
|
|
+ print(f"句子{i+1}: {sentence.replace(" ","").replace("\u2003", "").replace("\u2002", "").replace("\u2009", "").replace("\n", "").replace("\r", "")}")
|
|
|
+
|
|
|
+ return merged_sentences
|
|
|
class TestExceptionCases:
|
|
|
def test_duplicate_id(self, trunks_service, test_trunk_data):
|
|
|
with pytest.raises(IntegrityError):
|