123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240 |
- from elasticsearch import Elasticsearch
- from libs.text_processor import TextProcessor
- from typing import List, Dict, AsyncGenerator
- import hashlib
- import os
- from dotenv import load_dotenv
- load_dotenv()
- ELASTICSEARCH_USER = os.getenv("ELASTICSEARCH_USER")
- ELASTICSEARCH_PWD = os.getenv("ELASTICSEARCH_PWD")
- TITLE_INDEX = os.getenv("TITLE_INDEX")
- CHUNC_INDEX = os.getenv("CHUNC_INDEX")
- ELASTICSEARCH_HOST = os.getenv("ELASTICSEARCH_HOST")
- class ElasticsearchOperations:
- def __init__(self):
- self.processor = TextProcessor()
- self.es = Elasticsearch(hosts=[ELASTICSEARCH_HOST],
- verify_certs=False,
- http_auth=(ELASTICSEARCH_USER, ELASTICSEARCH_PWD))
- self.index_name = "text_chunks"
-
- def add_document(self, index: str, doc_id:str, doc: any):
- self.es.index(index=index, id=doc_id, body=doc)
- def update_document(self, index: str, doc_id:str, doc: any):
- self.es.update(index=index, id=doc_id, body=doc)
- def get_document(self, index:str, doc_id:str):
- return self.es.get(index=index, id=doc_id)["_source"]
- def refresh_index(self, index: str):
- self.es.indices.refresh(index=index)
- def del_document(self, text: str, doc_id: str):
- chunks = self.processor.chunk_text(text)
- for i, chunk in enumerate(chunks):
- id = f"{doc_id}_chunk_{i}"
- query = {
- "query": {
- "match": {
- "id": id
- }
- }
- }
- response = self.es.search(index=self.index_name, body=query)
- print(response)
- if response.get('hits', {}).get('total', {}).get('value', 0) == 0:
- print(">>> document not found")
- continue
- else:
- print(">>> document was found")
- response = self.es.delete(index=self.index_name,id=id)
- print(">>> delete document",response)
- print(">>> finished delete document ")
-
- def index_document(self, text: str, title: str) ->None:
- md = hashlib.md5(title.encode())
- doc_id = md.hexdigest()
- doc = {
- 'text': text,
- 'title': title
- }
- response = self.es.index(index='ins_expert_docs', id=doc_id, document=doc)
- print(response['result'])
- def get_doc_id(self, title:str) -> str:
- md = hashlib.md5(title.encode())
- doc_id = md.hexdigest()
- return doc_id
- def delete_index(self, index: str):
- self.es.indices.delete(index=index, ignore=[400, 404])
- def ingest_document(self, text: str, title: str) -> None:
- """将文本分块并存入Elasticsearch"""
- doc_id = self.get_doc_id(title)
- chunks = self.processor.chunk_text(text)
- documents = []
-
- for i, chunk in enumerate(chunks):
- doc = {
- 'id': f"{doc_id}_chunk_{i}",
- 'text': f"{title}\n{title}\n{title}\n"+chunk,
- 'embedding': self.processor.generate_embeddings([chunk])[0].tolist()
- }
- print(f">>> insert id :{doc['id']}")
- documents.append(doc)
-
- if documents:
- body = []
- for doc in documents:
- body.append({"index":{}})
- body.append(doc)
-
- self.es.bulk(index=self.index_name, body=body)
- def ingest_text(self, text: str, doc_id: str) -> None:
- """将文本分块并存入Elasticsearch"""
- self.ingest_document(text, doc_id)
-
- def search_chunc_index(self, index:str, title: str, top_k:int = 3):
- query_embedding = self.processor.generate_embeddings([title])[0].tolist()
-
- script_query = {
- "script_score": {
- "query": {"match_all": {}},
- "script": {
- "source": "cosineSimilarity(params.query_vector, 'embedding') + 1.0",
- "params": {"query_vector": query_embedding}
- }
- }
- }
-
- response = self.es.search(
- index=index,
- body={
- "size": top_k,
- "query": script_query,
- "_source": ["text"]
- }
- )
- return [
- {
- "id": hit["_id"],
- "title": hit["_source"]["title"],
- "text": hit["_source"]["text"],
- "score": hit["_score"]
- }
- for hit in response["hits"]["hits"]
- ]
- def search_title_index(self, index:str, title: str, top_k:int = 3):
- query_embedding = self.processor.generate_embeddings([title])[0].tolist()
-
- script_query = {
- "script_score": {
- "query": {"match_all": {}},
- "script": {
- "source": "cosineSimilarity(params.query_vector, 'embedding') + 1.0",
- "params": {"query_vector": query_embedding}
- }
- }
- }
-
- response = self.es.search(
- index=index,
- body={
- "size": top_k,
- "query": script_query,
- "_source": ["text","title"]
- }
- )
- return [
- {
- "id": hit["_id"],
- "title": hit["_source"]["title"],
- "text": hit["_source"]["text"],
- "score": hit["_score"]
- }
- for hit in response["hits"]["hits"]
- ]
- def search_word_index(self, index:str, words: List[str], top_k: int = 1):
- search_request = []
- for w in words:
- search_request.append(
- {"index": index})
- search_request.append(
- {
- "query": {
- "match": {
- "word": w
- }
- },
- "size": 5
- }
- )
- #response = self.es.search(index=index,
- # body={"size": top_k, "query": query, "_source":["articles","word"]})
- response = self.es.msearch(body=search_request)
- result_list = []
- for i, result in enumerate(response["responses"]):
- result_tmp = [
- {
- "id": hit["_id"],
- "articles": hit["_source"]["articles"],
- "word": hit["_source"]["word"],
- "score": hit["_score"]
- }
- for hit in result["hits"]["hits"]]
-
- result_list = result_list + result_tmp
- return result_list
-
- def search_similar_texts(self, query: str, top_k: int = 5) -> List[Dict]:
- """查询与输入文本最相似的文本块"""
- query_embedding = self.processor.generate_embeddings([query])[0].tolist()
-
- script_query = {
- "script_score": {
- "query": {"match_all": {}},
- "script": {
- "source": "cosineSimilarity(params.query_vector, 'embedding') + 1.0",
- "params": {"query_vector": query_embedding}
- }
- }
- }
-
- response = self.es.search(
- index=self.index_name,
- body={
- "size": top_k,
- "query": script_query,
- "_source": ["text"]
- }
- )
- return [
- {
- "id": hit["_id"],
- "title": hit["_source"]["text"].split("\n")[0],
- "text": hit["_source"]["text"],
- "score": hit["_score"]
- }
- for hit in response["hits"]["hits"]
- ]
- def update_document(self, datas: list):
- """ 更新文档 """
- try:
- failed_list = []
- for data in datas:
- # 更新文档(实际上是重新索引)
- response = self.index(index=self.index_name, id=data['doc_id'], body=data['doc'])
- if response.meta.status != 200:
- failed_list.append(data)
- # 刷新索引以立即应用更改(可选)
- self.es.indices.refresh(index=self.index_name)
- print(f"Elasticsearch update_doc finished! failed_list -> {failed_list}")
- except Exception as e:
- print(f"Elasticsearch update_doc error: {e}")
- def get_doc(self, doc_id):
- """ 获取文档数据 """
- try:
- doc = self.es.get(index=self.index_name, id=doc_id)['_source']
- return doc
- except Exception as e:
- print(f"Error retrieving document {doc_id}: {e}")
- return None
|