|
@@ -1,3 +1,4 @@
|
|
|
+import regex
|
|
|
from pathlib import Path
|
|
|
|
|
|
import pytest
|
|
@@ -34,44 +35,67 @@ class TestTrunksServiceCRUD:
|
|
|
assert trunks_service.get_trunk_by_id(trunk.id) is None
|
|
|
|
|
|
class TestSearchOperations:
|
|
|
+ def test_vector_search2(self, trunks_service):
|
|
|
+ page = 1
|
|
|
+ limit = 100
|
|
|
+ file_path = '急诊医学(第2版'
|
|
|
+ while True:
|
|
|
+ results = trunks_service.paginated_search_by_type_and_filepath(
|
|
|
+ {'pageNo': page, 'limit': limit, 'type': 'trunk', 'file_path': file_path})
|
|
|
+ if not results['data']:
|
|
|
+ break
|
|
|
+ for record in results['data']:
|
|
|
+ print(f"{record['id']}{record['type']}{record['title']}{record['file_path']}")
|
|
|
+ if record['type'] != 'trunk' or file_path not in record['file_path']:
|
|
|
+ print('出现异常数据')
|
|
|
+ break
|
|
|
|
|
|
+ page_no = self.get_page_no(record['content'], trunks_service, file_path)
|
|
|
+ if page_no is None:
|
|
|
+ print(f"{record['id']}找到page_no: {page_no}")
|
|
|
+ continue
|
|
|
+ trunks_service.update_trunk(record['id'], {'page_no': page_no})
|
|
|
+ page += 1
|
|
|
|
|
|
def test_vector_search(self, trunks_service):
|
|
|
page = 1
|
|
|
limit = 100
|
|
|
+ file_path='trunk2'
|
|
|
while True:
|
|
|
- results = trunks_service.paginated_search_by_type_and_filepath({'pageNo': page, 'limit': limit, 'type': 'trunk', 'file_path': 'test_path.pdf'})
|
|
|
+ results = trunks_service.paginated_search_by_type_and_filepath({'pageNo': page, 'limit': limit, 'type': 'trunk', 'file_path': file_path})
|
|
|
if not results['data']:
|
|
|
break
|
|
|
for record in results['data']:
|
|
|
print(f"{record['id']}{record['type']}{record['title']}{record['file_path']}")
|
|
|
- if record['type'] != 'trunk' or '内科学 第10版' not in record['file_path']:
|
|
|
+ if record['type'] != 'trunk' or file_path not in record['file_path']:
|
|
|
print('出现异常数据')
|
|
|
break
|
|
|
|
|
|
- page_no = self.get_page_no(record['content'],trunks_service)
|
|
|
+ page_no = self.get_page_no(record['content'],trunks_service,file_path)
|
|
|
if page_no is None:
|
|
|
print(f"{record['id']}找到page_no: {page_no}")
|
|
|
continue
|
|
|
trunks_service.update_trunk(record['id'], {'page_no': page_no})
|
|
|
page += 1
|
|
|
|
|
|
- def get_page_no(self, text: str, trunks_service) -> int:
|
|
|
- results = trunks_service.search_by_vector(text,1000,type='page',conversation_id="1111111aaaa")
|
|
|
+ def get_page_no(self, text: str, trunks_service,file_path:str) -> int:
|
|
|
+ results = trunks_service.search_by_vector(text,1000,type='page',file_path=file_path,conversation_id="1111111aaaa")
|
|
|
sentences = self.split_text(text)
|
|
|
count = 0
|
|
|
for r in results:
|
|
|
#将r["content"]的所有空白字符去掉
|
|
|
- content = re.sub(r'[^\w\d\p{P}\p{L}]', '', r["content"])
|
|
|
+ content = regex.sub(r'[^\w\d\p{L}]', '', r["content"])
|
|
|
count+=1
|
|
|
match_count = 0
|
|
|
length = len(sentences)/2
|
|
|
for sentence in sentences:
|
|
|
- sentence = re.sub(r'[^\w\d\p{P}\p{L}]', '', sentence)
|
|
|
+ sentence = regex.sub(r'[^\w\d\p{L}]', '', sentence)
|
|
|
if sentence in content:
|
|
|
match_count += 1
|
|
|
- if match_count >= length:
|
|
|
+ if match_count >= 2:
|
|
|
return r["page_no"]
|
|
|
+
|
|
|
+
|
|
|
|
|
|
def split_text(self, text):
|
|
|
"""将文本分割成句子"""
|