import json import uuid from datetime import datetime, timedelta from elasticsearch import Elasticsearch import time import traceback class ElasticSearchService(Elasticsearch): es_service = None mappings = { "properties": { # "id": {"type": "keyword"}, "content": { "type": "text", "analyzer": "ik_max_word", "search_analyzer": "ik_smart" }, "time": {"type": "date", "format": "yyyy-MM-dd'T'HH:mm:ss"}, "qst_id": {"type": "keyword"}, "reply_type": {"type": "integer"} } } def __init__(self, index_name, addrs, *args, **kwargs): self.max_result_window = es_conf['max_result_window'] self.username = es_conf['username'] self.password = es_conf['password'] self.index_name = index_name self.addrs = addrs super().__init__(self.addrs, basic_auth=(self.username, self.password), request_timeout=3600) # 1.查询index是否存在 if not self.indices.exists(index=self.index_name): self.create_index(self.index_name) if not self.ping(): logger.error(f"ElasticSearchHandler Connection failed") logger.info( f"Connect to ElasticService successfully!!! addrs:{addrs}, index_name:{self.index_name}") def create_index(self, index_name): # 创建索引 if not self.indices.exists(index=index_name): response = self.indices.create(index=index_name, body={}) logger.info(f"Index [{index_name}] created successfully!") # 检查索引是否创建成功 if not response.get('acknowledged'): logger.info(f"Failed to create index '{index_name}'. Response: {response}") return False self.create_mapping_session_history() self.create_index_setting() if response.get('shards_acknowledged'): logger.info(f"Index '{index_name}' All shards are acknowledged.") else: logger.info(f"Index '{index_name}' Not all shards are acknowledged.") def create_mapping_session_history(self): mapping = ElasticSearchService.mappings # 将mapping添加到索引 response = self.indices.put_mapping(index=self.index_name, body=mapping) # 检查索引是否创建成功 if response.get('acknowledged'): logger.info(f"Index '{self.index_name}' created successfully with mapping.") else: logger.info(f"Failed to create index '{self.index_name}'. Response: {response}") def create_index_setting(self): setting = {"number_of_replicas": "0"} # 将setting添加到索引 response = self.indices.put_settings(index=self.index_name, body=setting) # 检查索引是否创建成功 if response.get('acknowledged'): logger.info(f"Index '{self.index_name}' created successfully with setting.") else: logger.info(f"Failed to create index setting'{self.index_name}'. Response: {response}") def delete_index(self, index_name): res = self.indices.delete(index=index_name) logger.info(f"Index [{index_name}] deleted successfully!, res: {res}") return res def insert_doc(self, hist_hash_id: str, doc_body: dict): """ 新增数据 :param hist_hash_id: :param doc: :return: """ try: self.index(index=self.index_name, id=hist_hash_id, body=doc_body) # 刷新索引以确保文档立即可见 res = self.indices.refresh(index=self.index_name) logger.info(f"Document hist_hash_id:[{hist_hash_id}] indexed successfully!") return res except Exception as e: logger.error(f"Failed to index document hist_hash_id:[{hist_hash_id}]: {e}") def bulk_insert_docs(self, session_histories: list): """ 批量新增数据 :param chitchat_list: :return: """ try: # 准备批量数据 bulk_actions = [] failed_list = [] batch_count = 1000 for i in range(0, len(session_histories), batch_count): for item in session_histories[i:i + batch_count]: doc = { # "id": item.get('id', 0), "you_feild": item.get('you_feild', ''), ... } action = { "index": { # Use "index" as the action "_index": self.index_name, # 如果需要指定文档ID,可以取消下面的注释 "_id": item.get('hist_hash_id', '') } } # 将 action 和 doc 作为元组的两个元素添加到 bulk_actions 列表中 bulk_actions.append(action) bulk_actions.append(doc) print(f"insert data -> {item}") response = self.bulk(index=self.index_name, body=bulk_actions) # 检查响应中的成功和失败项 for item in response['items']: if item['index']['status'] != 201: failed_list.append(item) logger.info(f"Elasticsearch 批量新增完成,failed_list:{failed_list}") # 刷新索引以确保文档立即可见 self.indices.refresh(index=self.index_name) return failed_list except Exception as e: traceback.print_exc() logger.error(f"Elasticsearch bulk insert doc error:{e}") def delete_doc_by_id(self, doc_ids): """ 删除文档 """ try: failed_list = [] for doc_id in doc_ids: response = self.delete(index=self.index_name, id=doc_id) # 检查响应状态 if response.meta.status != 200: failed_list.append(doc_id) logger.info(f"Document with ID {doc_id} in index {self.index_name} was deleted failed!") logger.info(f"Document with ID {doc_id} in index {self.index_name} was deleted successfully.") return failed_list except Exception as e: traceback.print_exc() logger.error(f"Elasticsearch delete_doc error:{e}") def delete_docs_by_query_body(self, query_body): # 使用_delete_by_query API 删除所有文档 # 注意:这里我们使用了一个匹配所有文档的查询:{"query": {"match_all": {}}} try: response = self.delete_by_query(index=self.index_name, body=query_body) logger.info("Deleted documents:", response['_deleted']) # 这将显示被删除的文档数量 except Exception as e: # 捕获并处理异常 logger.error(f"Deleted docs error: {e}") def update_doc(self, datas: list): """ 更新文档 """ try: failed_list = [] for data in datas: # 更新文档(实际上是重新索引) response = self.index(index=self.index_name, id=data['doc_id'], body=data['doc']) logger.info("Update Response:", response) if response.meta.status != 200: failed_list.append(data) # 刷新索引以立即应用更改(可选) self.indices.refresh(index=self.index_name) logger.info(f"Elasticsearch update_doc finished! failed_list -> {failed_list}") except Exception as e: traceback.print_exc() logger.error(f"Elasticsearch update_doc error: {e}") def get_doc(self, doc_id): """ 获取文档数据 """ try: doc = self.get(index=self.index_name, id=doc_id)['_source'] return doc except Exception as e: logger.error(f"Error retrieving document {doc_id}: {e}") return None def search_index(self, query_body): """ 检索文档 query_body:查询体(Query DSL) """ try: logger.info(f'elasticsearch search index_name={self.index_name},query_body={query_body}') response = self.search(index=self.index_name, body=query_body) logger.info(f'elasticsearch search [{self.index_name}] response.meta.status={response.meta.status}') if response.get('_shards', {}).get('successful') == response.get('_shards', {}).get('total'): logger.info( f"Search index successful! total count={response['hits']['total']['value']}, match count={len(response['hits']['hits'])}") # logger.info(f"search response -> {response}") if response['hits']['total']['value'] > 0: return response return None return None except Exception as e: traceback.print_exc() logger.error(f"ElasticService search_index error:{e}") def search_index_by_scroll_api(self, query_body): """ 分页查询 query_body:查询体(Query DSL) """ try: logger.info(f'elasticsearch search index_name={self.index_name},query_body={query_body}') response = self.search(index=self.index_name, body=query_body, scroll='1m') logger.info(f'elasticsearch search [{self.index_name}] response.meta.status={response.meta.status}') if response.get('_shards', {}).get('successful') == response.get('_shards', {}).get('total'): logger.info( f"Search index successful! total count={response['hits']['total']['value']}, match count={len(response['hits']['hits'])}") # logger.info(f"search response -> {response}") if response['hits']['total']['value'] > 0: return response return None return None except Exception as e: traceback.print_exc() logger.error(f"ElasticService search_index error:{e}") def search_by_sql(self, sql_body): try: logger.info(f'elasticsearch search index_name={self.index_name},sql_body={sql_body}') response = self.sql.query(body=sql_body) logger.info(f'elasticsearch search [{self.index_name}] response.meta.status={response.meta.status}') if response.meta.status == 200: columns = response.get('columns') rows = response.get('rows') # 提取列名 column_names = [col['name'] for col in columns] # 组织成字典格式 result_dicts = [] for row in rows: result_dict = {column_names[i]: row[i] for i in range(len(column_names))} result_dicts.append(result_dict) logger.info(f"Search index successful! match count={len(result_dicts)}") return result_dicts return [] except Exception as e: traceback.print_exc() logger.error(f"ElasticService search_by_sql error:{e}") return [] def get_elastic_instance(index_name, addrs): _es_service = None _wait_times = 0 _try_count = 5 _interval_seconds = 10 for i in range(_try_count): # 初始化后,尝试启动5次,第次间隔10秒 try: _es_service = ElasticSearchService(index_name, addrs) if _es_service: logger.info(f"ElasticService initial successfully!") print(f"ElasticService initial successfully!") return _es_service logger.warning(f"Connect to elasticServer failed, try reconnect to elasticServer [{i}]!") except Exception as e: traceback.print_exc() logger.warning( f"初始化ElasticService失败,结果:{_es_service}, 异常原因:{str(e)}, 应用将在{_interval_seconds}秒后重新尝试.") time.sleep(_interval_seconds) es_service = None port = es_conf['port'] host = es_conf['host'] addrs = [f"http://{host}:{port}", ] if config['elasticsearch']['enabled']: index_name = config['elasticsearch']['session_history_index'] es_service = get_elastic_instance(index_name, addrs) else: logger.info(f"[elasticsearch] 未启用! enabled -> {config['elasticsearch']['enabled']}") if __name__ == '__main__': index_name = config['elasticsearch']['session_history_index'] es_service = get_elastic_instance(index_name, addrs) # 添加文档 docs = [{ # "id": i + 1, "you_feild": "", ... } for i in range(5)] # 插入数据 # es_service.insert_doc('2', doc) print(es_service.bulk_insert_docs(docs)) # 删除index # print(es_service.delete_index(index_name)) # 获取文档 # print(es_service.get_doc('c2b27b31-80f8-4cf6-b3f2-36683b60d7da')) # logger.info(es_service.get_doc('2')) # 删除文档 # logger.info(es_service.delete_doc_by_id(['f554d0e5-e4cc-4556-952b-b12cdc640fe56'])) # query_body = {"query": {"match_all": {}}} # logger.info(es_service.delete_docs_by_query_body(query_body)) # 更新数据 # datas = [{'doc_id': 'c2b27b31-80f8-4cf6-b3f2-36683b60d7da', 'doc': {'qst_content': 'qqq'}}] # print(es_service.update_doc(datas)) # 查询数据 keyword = "缴清" query_body = { "query": { "multi_match": { "query": keyword, "fields": ["reply_content", "qst_content", "standard_qst"] } }, "from": 0, "size": 10, "sort": [{ "chat_qst_time": "desc" }] } # print(es_service.search_index(query_body))