from elasticsearch import Elasticsearch from libs.text_processor import TextProcessor from typing import List, Dict, AsyncGenerator import hashlib import os from dotenv import load_dotenv load_dotenv() ELASTICSEARCH_USER = os.getenv("ELASTICSEARCH_USER") ELASTICSEARCH_PWD = os.getenv("ELASTICSEARCH_PWD") TITLE_INDEX = os.getenv("TITLE_INDEX") CHUNC_INDEX = os.getenv("CHUNC_INDEX") ELASTICSEARCH_HOST = os.getenv("ELASTICSEARCH_HOST") class ElasticsearchOperations: def __init__(self): self.processor = TextProcessor() self.es = Elasticsearch(hosts=[ELASTICSEARCH_HOST], verify_certs=False, http_auth=(ELASTICSEARCH_USER, ELASTICSEARCH_PWD)) self.index_name = "text_chunks" def add_document(self, index: str, doc_id:str, doc: any): self.es.index(index=index, id=doc_id, body=doc) def update_document(self, index: str, doc_id:str, doc: any): self.es.update(index=index, id=doc_id, body=doc) def get_document(self, index:str, doc_id:str): return self.es.get(index=index, id=doc_id)["_source"] def refresh_index(self, index: str): self.es.indices.refresh(index=index) def del_document(self, text: str, doc_id: str): chunks = self.processor.chunk_text(text) for i, chunk in enumerate(chunks): id = f"{doc_id}_chunk_{i}" query = { "query": { "match": { "id": id } } } response = self.es.search(index=self.index_name, body=query) print(response) if response.get('hits', {}).get('total', {}).get('value', 0) == 0: print(">>> document not found") continue else: print(">>> document was found") response = self.es.delete(index=self.index_name,id=id) print(">>> delete document",response) print(">>> finished delete document ") def index_document(self, text: str, title: str) ->None: md = hashlib.md5(title.encode()) doc_id = md.hexdigest() doc = { 'text': text, 'title': title } response = self.es.index(index='ins_expert_docs', id=doc_id, document=doc) print(response['result']) def get_doc_id(self, title:str) -> str: md = hashlib.md5(title.encode()) doc_id = md.hexdigest() return doc_id def delete_index(self, index: str): self.es.indices.delete(index=index, ignore=[400, 404]) def ingest_document(self, text: str, title: str) -> None: """将文本分块并存入Elasticsearch""" doc_id = self.get_doc_id(title) chunks = self.processor.chunk_text(text) documents = [] for i, chunk in enumerate(chunks): doc = { 'id': f"{doc_id}_chunk_{i}", 'text': f"{title}\n{title}\n{title}\n"+chunk, 'embedding': self.processor.generate_embeddings([chunk])[0].tolist() } print(f">>> insert id :{doc['id']}") documents.append(doc) if documents: body = [] for doc in documents: body.append({"index":{}}) body.append(doc) self.es.bulk(index=self.index_name, body=body) def ingest_text(self, text: str, doc_id: str) -> None: """将文本分块并存入Elasticsearch""" self.ingest_document(text, doc_id) def search_chunc_index(self, index:str, title: str, top_k:int = 3): query_embedding = self.processor.generate_embeddings([title])[0].tolist() script_query = { "script_score": { "query": {"match_all": {}}, "script": { "source": "cosineSimilarity(params.query_vector, 'embedding') + 1.0", "params": {"query_vector": query_embedding} } } } response = self.es.search( index=index, body={ "size": top_k, "query": script_query, "_source": ["text"] } ) return [ { "id": hit["_id"], "title": hit["_source"]["title"], "text": hit["_source"]["text"], "score": hit["_score"] } for hit in response["hits"]["hits"] ] def search_title_index(self, index:str, title: str, top_k:int = 3): query_embedding = self.processor.generate_embeddings([title])[0].tolist() script_query = { "script_score": { "query": {"match_all": {}}, "script": { "source": "cosineSimilarity(params.query_vector, 'embedding') + 1.0", "params": {"query_vector": query_embedding} } } } response = self.es.search( index=index, body={ "size": top_k, "query": script_query, "_source": ["text","title"] } ) return [ { "id": hit["_id"], "title": hit["_source"]["title"], "text": hit["_source"]["text"], "score": hit["_score"] } for hit in response["hits"]["hits"] ] def search_word_index(self, index:str, words: List[str], top_k: int = 1): search_request = [] for w in words: search_request.append( {"index": index}) search_request.append( { "query": { "match": { "word": w } }, "size": 5 } ) #response = self.es.search(index=index, # body={"size": top_k, "query": query, "_source":["articles","word"]}) response = self.es.msearch(body=search_request) result_list = [] for i, result in enumerate(response["responses"]): result_tmp = [ { "id": hit["_id"], "articles": hit["_source"]["articles"], "word": hit["_source"]["word"], "score": hit["_score"] } for hit in result["hits"]["hits"]] result_list = result_list + result_tmp return result_list def search_similar_texts(self, query: str, top_k: int = 5) -> List[Dict]: """查询与输入文本最相似的文本块""" query_embedding = self.processor.generate_embeddings([query])[0].tolist() script_query = { "script_score": { "query": {"match_all": {}}, "script": { "source": "cosineSimilarity(params.query_vector, 'embedding') + 1.0", "params": {"query_vector": query_embedding} } } } response = self.es.search( index=self.index_name, body={ "size": top_k, "query": script_query, "_source": ["text"] } ) return [ { "id": hit["_id"], "title": hit["_source"]["text"].split("\n")[0], "text": hit["_source"]["text"], "score": hit["_score"] } for hit in response["hits"]["hits"] ] def update_document(self, datas: list): """ 更新文档 """ try: failed_list = [] for data in datas: # 更新文档(实际上是重新索引) response = self.index(index=self.index_name, id=data['doc_id'], body=data['doc']) if response.meta.status != 200: failed_list.append(data) # 刷新索引以立即应用更改(可选) self.es.indices.refresh(index=self.index_name) print(f"Elasticsearch update_doc finished! failed_list -> {failed_list}") except Exception as e: print(f"Elasticsearch update_doc error: {e}") def get_doc(self, doc_id): """ 获取文档数据 """ try: doc = self.es.get(index=self.index_name, id=doc_id)['_source'] return doc except Exception as e: print(f"Error retrieving document {doc_id}: {e}") return None