es.py 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. from elasticsearch import Elasticsearch
  2. from libs.text_processor import TextProcessor
  3. from typing import List, Dict, AsyncGenerator
  4. import hashlib
  5. import os
  6. from dotenv import load_dotenv
  7. load_dotenv()
  8. ELASTICSEARCH_USER = os.getenv("ELASTICSEARCH_USER")
  9. ELASTICSEARCH_PWD = os.getenv("ELASTICSEARCH_PWD")
  10. TITLE_INDEX = os.getenv("TITLE_INDEX")
  11. CHUNC_INDEX = os.getenv("CHUNC_INDEX")
  12. ELASTICSEARCH_HOST = os.getenv("ELASTICSEARCH_HOST")
  13. class ElasticsearchOperations:
  14. def __init__(self):
  15. self.processor = TextProcessor()
  16. self.es = Elasticsearch(hosts=[ELASTICSEARCH_HOST],
  17. verify_certs=False,
  18. http_auth=(ELASTICSEARCH_USER, ELASTICSEARCH_PWD))
  19. self.index_name = "text_chunks"
  20. def add_document(self, index: str, doc_id:str, doc: any):
  21. self.es.index(index=index, id=doc_id, body=doc)
  22. def update_document(self, index: str, doc_id:str, doc: any):
  23. self.es.update(index=index, id=doc_id, body=doc)
  24. def get_document(self, index:str, doc_id:str):
  25. return self.es.get(index=index, id=doc_id)["_source"]
  26. def refresh_index(self, index: str):
  27. self.es.indices.refresh(index=index)
  28. def del_document(self, text: str, doc_id: str):
  29. chunks = self.processor.chunk_text(text)
  30. for i, chunk in enumerate(chunks):
  31. id = f"{doc_id}_chunk_{i}"
  32. query = {
  33. "query": {
  34. "match": {
  35. "id": id
  36. }
  37. }
  38. }
  39. response = self.es.search(index=self.index_name, body=query)
  40. print(response)
  41. if response.get('hits', {}).get('total', {}).get('value', 0) == 0:
  42. print(">>> document not found")
  43. continue
  44. else:
  45. print(">>> document was found")
  46. response = self.es.delete(index=self.index_name,id=id)
  47. print(">>> delete document",response)
  48. print(">>> finished delete document ")
  49. def index_document(self, text: str, title: str) ->None:
  50. md = hashlib.md5(title.encode())
  51. doc_id = md.hexdigest()
  52. doc = {
  53. 'text': text,
  54. 'title': title
  55. }
  56. response = self.es.index(index='ins_expert_docs', id=doc_id, document=doc)
  57. print(response['result'])
  58. def get_doc_id(self, title:str) -> str:
  59. md = hashlib.md5(title.encode())
  60. doc_id = md.hexdigest()
  61. return doc_id
  62. def delete_index(self, index: str):
  63. self.es.indices.delete(index=index, ignore=[400, 404])
  64. def ingest_document(self, text: str, title: str) -> None:
  65. """将文本分块并存入Elasticsearch"""
  66. doc_id = self.get_doc_id(title)
  67. chunks = self.processor.chunk_text(text)
  68. documents = []
  69. for i, chunk in enumerate(chunks):
  70. doc = {
  71. 'id': f"{doc_id}_chunk_{i}",
  72. 'text': f"{title}\n{title}\n{title}\n"+chunk,
  73. 'embedding': self.processor.generate_embeddings([chunk])[0].tolist()
  74. }
  75. print(f">>> insert id :{doc['id']}")
  76. documents.append(doc)
  77. if documents:
  78. body = []
  79. for doc in documents:
  80. body.append({"index":{}})
  81. body.append(doc)
  82. self.es.bulk(index=self.index_name, body=body)
  83. def ingest_text(self, text: str, doc_id: str) -> None:
  84. """将文本分块并存入Elasticsearch"""
  85. self.ingest_document(text, doc_id)
  86. def search_chunc_index(self, index:str, title: str, top_k:int = 3):
  87. query_embedding = self.processor.generate_embeddings([title])[0].tolist()
  88. script_query = {
  89. "script_score": {
  90. "query": {"match_all": {}},
  91. "script": {
  92. "source": "cosineSimilarity(params.query_vector, 'embedding') + 1.0",
  93. "params": {"query_vector": query_embedding}
  94. }
  95. }
  96. }
  97. response = self.es.search(
  98. index=index,
  99. body={
  100. "size": top_k,
  101. "query": script_query,
  102. "_source": ["text"]
  103. }
  104. )
  105. return [
  106. {
  107. "id": hit["_id"],
  108. "title": hit["_source"]["title"],
  109. "text": hit["_source"]["text"],
  110. "score": hit["_score"]
  111. }
  112. for hit in response["hits"]["hits"]
  113. ]
  114. def search_title_index(self, index:str, title: str, top_k:int = 3):
  115. query_embedding = self.processor.generate_embeddings([title])[0].tolist()
  116. script_query = {
  117. "script_score": {
  118. "query": {"match_all": {}},
  119. "script": {
  120. "source": "cosineSimilarity(params.query_vector, 'embedding') + 1.0",
  121. "params": {"query_vector": query_embedding}
  122. }
  123. }
  124. }
  125. response = self.es.search(
  126. index=index,
  127. body={
  128. "size": top_k,
  129. "query": script_query,
  130. "_source": ["text","title"]
  131. }
  132. )
  133. return [
  134. {
  135. "id": hit["_id"],
  136. "title": hit["_source"]["title"],
  137. "text": hit["_source"]["text"],
  138. "score": hit["_score"]
  139. }
  140. for hit in response["hits"]["hits"]
  141. ]
  142. def search_word_index(self, index:str, words: List[str], top_k: int = 1):
  143. search_request = []
  144. for w in words:
  145. search_request.append(
  146. {"index": index})
  147. search_request.append(
  148. {
  149. "query": {
  150. "match": {
  151. "word": w
  152. }
  153. },
  154. "size": 5
  155. }
  156. )
  157. #response = self.es.search(index=index,
  158. # body={"size": top_k, "query": query, "_source":["articles","word"]})
  159. response = self.es.msearch(body=search_request)
  160. result_list = []
  161. for i, result in enumerate(response["responses"]):
  162. result_tmp = [
  163. {
  164. "id": hit["_id"],
  165. "articles": hit["_source"]["articles"],
  166. "word": hit["_source"]["word"],
  167. "score": hit["_score"]
  168. }
  169. for hit in result["hits"]["hits"]]
  170. result_list = result_list + result_tmp
  171. return result_list
  172. def search_similar_texts(self, query: str, top_k: int = 5) -> List[Dict]:
  173. """查询与输入文本最相似的文本块"""
  174. query_embedding = self.processor.generate_embeddings([query])[0].tolist()
  175. script_query = {
  176. "script_score": {
  177. "query": {"match_all": {}},
  178. "script": {
  179. "source": "cosineSimilarity(params.query_vector, 'embedding') + 1.0",
  180. "params": {"query_vector": query_embedding}
  181. }
  182. }
  183. }
  184. response = self.es.search(
  185. index=self.index_name,
  186. body={
  187. "size": top_k,
  188. "query": script_query,
  189. "_source": ["text"]
  190. }
  191. )
  192. return [
  193. {
  194. "id": hit["_id"],
  195. "title": hit["_source"]["text"].split("\n")[0],
  196. "text": hit["_source"]["text"],
  197. "score": hit["_score"]
  198. }
  199. for hit in response["hits"]["hits"]
  200. ]
  201. def update_document(self, datas: list):
  202. """ 更新文档 """
  203. try:
  204. failed_list = []
  205. for data in datas:
  206. # 更新文档(实际上是重新索引)
  207. response = self.index(index=self.index_name, id=data['doc_id'], body=data['doc'])
  208. if response.meta.status != 200:
  209. failed_list.append(data)
  210. # 刷新索引以立即应用更改(可选)
  211. self.es.indices.refresh(index=self.index_name)
  212. print(f"Elasticsearch update_doc finished! failed_list -> {failed_list}")
  213. except Exception as e:
  214. print(f"Elasticsearch update_doc error: {e}")
  215. def get_doc(self, doc_id):
  216. """ 获取文档数据 """
  217. try:
  218. doc = self.es.get(index=self.index_name, id=doc_id)['_source']
  219. return doc
  220. except Exception as e:
  221. print(f"Error retrieving document {doc_id}: {e}")
  222. return None