es_helper.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342
  1. import json
  2. import uuid
  3. from datetime import datetime, timedelta
  4. from elasticsearch import Elasticsearch
  5. import time
  6. import traceback
  7. class ElasticSearchService(Elasticsearch):
  8. es_service = None
  9. mappings = {
  10. "properties": {
  11. # "id": {"type": "keyword"},
  12. "content": {
  13. "type": "text",
  14. "analyzer": "ik_max_word",
  15. "search_analyzer": "ik_smart"
  16. },
  17. "time": {"type": "date", "format": "yyyy-MM-dd'T'HH:mm:ss"},
  18. "qst_id": {"type": "keyword"},
  19. "reply_type": {"type": "integer"}
  20. }
  21. }
  22. def __init__(self, index_name, addrs, *args, **kwargs):
  23. self.max_result_window = es_conf['max_result_window']
  24. self.username = es_conf['username']
  25. self.password = es_conf['password']
  26. self.index_name = index_name
  27. self.addrs = addrs
  28. super().__init__(self.addrs, basic_auth=(self.username, self.password), request_timeout=3600)
  29. # 1.查询index是否存在
  30. if not self.indices.exists(index=self.index_name):
  31. self.create_index(self.index_name)
  32. if not self.ping():
  33. logger.error(f"ElasticSearchHandler Connection failed")
  34. logger.info(
  35. f"Connect to ElasticService successfully!!! addrs:{addrs}, index_name:{self.index_name}")
  36. def create_index(self, index_name):
  37. # 创建索引
  38. if not self.indices.exists(index=index_name):
  39. response = self.indices.create(index=index_name, body={})
  40. logger.info(f"Index [{index_name}] created successfully!")
  41. # 检查索引是否创建成功
  42. if not response.get('acknowledged'):
  43. logger.info(f"Failed to create index '{index_name}'. Response: {response}")
  44. return False
  45. self.create_mapping_session_history()
  46. self.create_index_setting()
  47. if response.get('shards_acknowledged'):
  48. logger.info(f"Index '{index_name}' All shards are acknowledged.")
  49. else:
  50. logger.info(f"Index '{index_name}' Not all shards are acknowledged.")
  51. def create_mapping_session_history(self):
  52. mapping = ElasticSearchService.mappings
  53. # 将mapping添加到索引
  54. response = self.indices.put_mapping(index=self.index_name, body=mapping)
  55. # 检查索引是否创建成功
  56. if response.get('acknowledged'):
  57. logger.info(f"Index '{self.index_name}' created successfully with mapping.")
  58. else:
  59. logger.info(f"Failed to create index '{self.index_name}'. Response: {response}")
  60. def create_index_setting(self):
  61. setting = {"number_of_replicas": "0"}
  62. # 将setting添加到索引
  63. response = self.indices.put_settings(index=self.index_name, body=setting)
  64. # 检查索引是否创建成功
  65. if response.get('acknowledged'):
  66. logger.info(f"Index '{self.index_name}' created successfully with setting.")
  67. else:
  68. logger.info(f"Failed to create index setting'{self.index_name}'. Response: {response}")
  69. def delete_index(self, index_name):
  70. res = self.indices.delete(index=index_name)
  71. logger.info(f"Index [{index_name}] deleted successfully!, res: {res}")
  72. return res
  73. def insert_doc(self, hist_hash_id: str, doc_body: dict):
  74. """
  75. 新增数据
  76. :param hist_hash_id:
  77. :param doc:
  78. :return:
  79. """
  80. try:
  81. self.index(index=self.index_name, id=hist_hash_id, body=doc_body)
  82. # 刷新索引以确保文档立即可见
  83. res = self.indices.refresh(index=self.index_name)
  84. logger.info(f"Document hist_hash_id:[{hist_hash_id}] indexed successfully!")
  85. return res
  86. except Exception as e:
  87. logger.error(f"Failed to index document hist_hash_id:[{hist_hash_id}]: {e}")
  88. def bulk_insert_docs(self, session_histories: list):
  89. """
  90. 批量新增数据
  91. :param chitchat_list:
  92. :return:
  93. """
  94. try:
  95. # 准备批量数据
  96. bulk_actions = []
  97. failed_list = []
  98. batch_count = 1000
  99. for i in range(0, len(session_histories), batch_count):
  100. for item in session_histories[i:i + batch_count]:
  101. doc = {
  102. # "id": item.get('id', 0),
  103. "you_feild": item.get('you_feild', ''),
  104. ...
  105. }
  106. action = {
  107. "index": { # Use "index" as the action
  108. "_index": self.index_name,
  109. # 如果需要指定文档ID,可以取消下面的注释
  110. "_id": item.get('hist_hash_id', '')
  111. }
  112. }
  113. # 将 action 和 doc 作为元组的两个元素添加到 bulk_actions 列表中
  114. bulk_actions.append(action)
  115. bulk_actions.append(doc)
  116. print(f"insert data -> {item}")
  117. response = self.bulk(index=self.index_name, body=bulk_actions)
  118. # 检查响应中的成功和失败项
  119. for item in response['items']:
  120. if item['index']['status'] != 201:
  121. failed_list.append(item)
  122. logger.info(f"Elasticsearch 批量新增完成,failed_list:{failed_list}")
  123. # 刷新索引以确保文档立即可见
  124. self.indices.refresh(index=self.index_name)
  125. return failed_list
  126. except Exception as e:
  127. traceback.print_exc()
  128. logger.error(f"Elasticsearch bulk insert doc error:{e}")
  129. def delete_doc_by_id(self, doc_ids):
  130. """ 删除文档 """
  131. try:
  132. failed_list = []
  133. for doc_id in doc_ids:
  134. response = self.delete(index=self.index_name, id=doc_id)
  135. # 检查响应状态
  136. if response.meta.status != 200:
  137. failed_list.append(doc_id)
  138. logger.info(f"Document with ID {doc_id} in index {self.index_name} was deleted failed!")
  139. logger.info(f"Document with ID {doc_id} in index {self.index_name} was deleted successfully.")
  140. return failed_list
  141. except Exception as e:
  142. traceback.print_exc()
  143. logger.error(f"Elasticsearch delete_doc error:{e}")
  144. def delete_docs_by_query_body(self, query_body):
  145. # 使用_delete_by_query API 删除所有文档
  146. # 注意:这里我们使用了一个匹配所有文档的查询:{"query": {"match_all": {}}}
  147. try:
  148. response = self.delete_by_query(index=self.index_name, body=query_body)
  149. logger.info("Deleted documents:", response['_deleted']) # 这将显示被删除的文档数量
  150. except Exception as e:
  151. # 捕获并处理异常
  152. logger.error(f"Deleted docs error: {e}")
  153. def update_doc(self, datas: list):
  154. """ 更新文档 """
  155. try:
  156. failed_list = []
  157. for data in datas:
  158. # 更新文档(实际上是重新索引)
  159. response = self.index(index=self.index_name, id=data['doc_id'], body=data['doc'])
  160. logger.info("Update Response:", response)
  161. if response.meta.status != 200:
  162. failed_list.append(data)
  163. # 刷新索引以立即应用更改(可选)
  164. self.indices.refresh(index=self.index_name)
  165. logger.info(f"Elasticsearch update_doc finished! failed_list -> {failed_list}")
  166. except Exception as e:
  167. traceback.print_exc()
  168. logger.error(f"Elasticsearch update_doc error: {e}")
  169. def get_doc(self, doc_id):
  170. """ 获取文档数据 """
  171. try:
  172. doc = self.get(index=self.index_name, id=doc_id)['_source']
  173. return doc
  174. except Exception as e:
  175. logger.error(f"Error retrieving document {doc_id}: {e}")
  176. return None
  177. def search_index(self, query_body):
  178. """
  179. 检索文档
  180. query_body:查询体(Query DSL)
  181. """
  182. try:
  183. logger.info(f'elasticsearch search index_name={self.index_name},query_body={query_body}')
  184. response = self.search(index=self.index_name, body=query_body)
  185. logger.info(f'elasticsearch search [{self.index_name}] response.meta.status={response.meta.status}')
  186. if response.get('_shards', {}).get('successful') == response.get('_shards', {}).get('total'):
  187. logger.info(
  188. f"Search index successful! total count={response['hits']['total']['value']}, match count={len(response['hits']['hits'])}")
  189. # logger.info(f"search response -> {response}")
  190. if response['hits']['total']['value'] > 0:
  191. return response
  192. return None
  193. return None
  194. except Exception as e:
  195. traceback.print_exc()
  196. logger.error(f"ElasticService search_index error:{e}")
  197. def search_index_by_scroll_api(self, query_body):
  198. """
  199. 分页查询
  200. query_body:查询体(Query DSL)
  201. """
  202. try:
  203. logger.info(f'elasticsearch search index_name={self.index_name},query_body={query_body}')
  204. response = self.search(index=self.index_name, body=query_body, scroll='1m')
  205. logger.info(f'elasticsearch search [{self.index_name}] response.meta.status={response.meta.status}')
  206. if response.get('_shards', {}).get('successful') == response.get('_shards', {}).get('total'):
  207. logger.info(
  208. f"Search index successful! total count={response['hits']['total']['value']}, match count={len(response['hits']['hits'])}")
  209. # logger.info(f"search response -> {response}")
  210. if response['hits']['total']['value'] > 0:
  211. return response
  212. return None
  213. return None
  214. except Exception as e:
  215. traceback.print_exc()
  216. logger.error(f"ElasticService search_index error:{e}")
  217. def search_by_sql(self, sql_body):
  218. try:
  219. logger.info(f'elasticsearch search index_name={self.index_name},sql_body={sql_body}')
  220. response = self.sql.query(body=sql_body)
  221. logger.info(f'elasticsearch search [{self.index_name}] response.meta.status={response.meta.status}')
  222. if response.meta.status == 200:
  223. columns = response.get('columns')
  224. rows = response.get('rows')
  225. # 提取列名
  226. column_names = [col['name'] for col in columns]
  227. # 组织成字典格式
  228. result_dicts = []
  229. for row in rows:
  230. result_dict = {column_names[i]: row[i] for i in range(len(column_names))}
  231. result_dicts.append(result_dict)
  232. logger.info(f"Search index successful! match count={len(result_dicts)}")
  233. return result_dicts
  234. return []
  235. except Exception as e:
  236. traceback.print_exc()
  237. logger.error(f"ElasticService search_by_sql error:{e}")
  238. return []
  239. def get_elastic_instance(index_name, addrs):
  240. _es_service = None
  241. _wait_times = 0
  242. _try_count = 5
  243. _interval_seconds = 10
  244. for i in range(_try_count): # 初始化后,尝试启动5次,第次间隔10秒
  245. try:
  246. _es_service = ElasticSearchService(index_name, addrs)
  247. if _es_service:
  248. logger.info(f"ElasticService initial successfully!")
  249. print(f"ElasticService initial successfully!")
  250. return _es_service
  251. logger.warning(f"Connect to elasticServer failed, try reconnect to elasticServer [{i}]!")
  252. except Exception as e:
  253. traceback.print_exc()
  254. logger.warning(
  255. f"初始化ElasticService失败,结果:{_es_service}, 异常原因:{str(e)}, 应用将在{_interval_seconds}秒后重新尝试.")
  256. time.sleep(_interval_seconds)
  257. es_service = None
  258. port = es_conf['port']
  259. host = es_conf['host']
  260. addrs = [f"http://{host}:{port}", ]
  261. if config['elasticsearch']['enabled']:
  262. index_name = config['elasticsearch']['session_history_index']
  263. es_service = get_elastic_instance(index_name, addrs)
  264. else:
  265. logger.info(f"[elasticsearch] 未启用! enabled -> {config['elasticsearch']['enabled']}")
  266. if __name__ == '__main__':
  267. index_name = config['elasticsearch']['session_history_index']
  268. es_service = get_elastic_instance(index_name, addrs)
  269. # 添加文档
  270. docs = [{
  271. # "id": i + 1,
  272. "you_feild": "",
  273. ...
  274. } for i in range(5)]
  275. # 插入数据
  276. # es_service.insert_doc('2', doc)
  277. print(es_service.bulk_insert_docs(docs))
  278. # 删除index
  279. # print(es_service.delete_index(index_name))
  280. # 获取文档
  281. # print(es_service.get_doc('c2b27b31-80f8-4cf6-b3f2-36683b60d7da'))
  282. # logger.info(es_service.get_doc('2'))
  283. # 删除文档
  284. # logger.info(es_service.delete_doc_by_id(['f554d0e5-e4cc-4556-952b-b12cdc640fe56']))
  285. # query_body = {"query": {"match_all": {}}}
  286. # logger.info(es_service.delete_docs_by_query_body(query_body))
  287. # 更新数据
  288. # datas = [{'doc_id': 'c2b27b31-80f8-4cf6-b3f2-36683b60d7da', 'doc': {'qst_content': 'qqq'}}]
  289. # print(es_service.update_doc(datas))
  290. # 查询数据
  291. keyword = "缴清"
  292. query_body = {
  293. "query": {
  294. "multi_match": {
  295. "query": keyword,
  296. "fields": ["reply_content", "qst_content", "standard_qst"]
  297. }
  298. },
  299. "from": 0,
  300. "size": 10,
  301. "sort": [{
  302. "chat_qst_time": "desc"
  303. }]
  304. }
  305. # print(es_service.search_index(query_body))