123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342 |
- import json
- import uuid
- from datetime import datetime, timedelta
- from elasticsearch import Elasticsearch
- import time
- import traceback
- class ElasticSearchService(Elasticsearch):
- es_service = None
- mappings = {
- "properties": {
- # "id": {"type": "keyword"},
- "content": {
- "type": "text",
- "analyzer": "ik_max_word",
- "search_analyzer": "ik_smart"
- },
- "time": {"type": "date", "format": "yyyy-MM-dd'T'HH:mm:ss"},
- "qst_id": {"type": "keyword"},
- "reply_type": {"type": "integer"}
- }
- }
- def __init__(self, index_name, addrs, *args, **kwargs):
- self.max_result_window = es_conf['max_result_window']
- self.username = es_conf['username']
- self.password = es_conf['password']
- self.index_name = index_name
- self.addrs = addrs
- super().__init__(self.addrs, basic_auth=(self.username, self.password), request_timeout=3600)
-
- # 1.查询index是否存在
- if not self.indices.exists(index=self.index_name):
- self.create_index(self.index_name)
- if not self.ping():
- logger.error(f"ElasticSearchHandler Connection failed")
- logger.info(
- f"Connect to ElasticService successfully!!! addrs:{addrs}, index_name:{self.index_name}")
- def create_index(self, index_name):
- # 创建索引
- if not self.indices.exists(index=index_name):
- response = self.indices.create(index=index_name, body={})
- logger.info(f"Index [{index_name}] created successfully!")
- # 检查索引是否创建成功
- if not response.get('acknowledged'):
- logger.info(f"Failed to create index '{index_name}'. Response: {response}")
- return False
- self.create_mapping_session_history()
- self.create_index_setting()
- if response.get('shards_acknowledged'):
- logger.info(f"Index '{index_name}' All shards are acknowledged.")
- else:
- logger.info(f"Index '{index_name}' Not all shards are acknowledged.")
- def create_mapping_session_history(self):
- mapping = ElasticSearchService.mappings
- # 将mapping添加到索引
- response = self.indices.put_mapping(index=self.index_name, body=mapping)
- # 检查索引是否创建成功
- if response.get('acknowledged'):
- logger.info(f"Index '{self.index_name}' created successfully with mapping.")
- else:
- logger.info(f"Failed to create index '{self.index_name}'. Response: {response}")
- def create_index_setting(self):
- setting = {"number_of_replicas": "0"}
- # 将setting添加到索引
- response = self.indices.put_settings(index=self.index_name, body=setting)
- # 检查索引是否创建成功
- if response.get('acknowledged'):
- logger.info(f"Index '{self.index_name}' created successfully with setting.")
- else:
- logger.info(f"Failed to create index setting'{self.index_name}'. Response: {response}")
- def delete_index(self, index_name):
- res = self.indices.delete(index=index_name)
- logger.info(f"Index [{index_name}] deleted successfully!, res: {res}")
- return res
- def insert_doc(self, hist_hash_id: str, doc_body: dict):
- """
- 新增数据
- :param hist_hash_id:
- :param doc:
- :return:
- """
- try:
- self.index(index=self.index_name, id=hist_hash_id, body=doc_body)
- # 刷新索引以确保文档立即可见
- res = self.indices.refresh(index=self.index_name)
- logger.info(f"Document hist_hash_id:[{hist_hash_id}] indexed successfully!")
- return res
- except Exception as e:
- logger.error(f"Failed to index document hist_hash_id:[{hist_hash_id}]: {e}")
- def bulk_insert_docs(self, session_histories: list):
- """
- 批量新增数据
- :param chitchat_list:
- :return:
- """
- try:
- # 准备批量数据
- bulk_actions = []
- failed_list = []
- batch_count = 1000
- for i in range(0, len(session_histories), batch_count):
- for item in session_histories[i:i + batch_count]:
- doc = {
- # "id": item.get('id', 0),
- "you_feild": item.get('you_feild', ''),
- ...
- }
- action = {
- "index": { # Use "index" as the action
- "_index": self.index_name,
- # 如果需要指定文档ID,可以取消下面的注释
- "_id": item.get('hist_hash_id', '')
- }
- }
- # 将 action 和 doc 作为元组的两个元素添加到 bulk_actions 列表中
- bulk_actions.append(action)
- bulk_actions.append(doc)
- print(f"insert data -> {item}")
- response = self.bulk(index=self.index_name, body=bulk_actions)
- # 检查响应中的成功和失败项
- for item in response['items']:
- if item['index']['status'] != 201:
- failed_list.append(item)
- logger.info(f"Elasticsearch 批量新增完成,failed_list:{failed_list}")
- # 刷新索引以确保文档立即可见
- self.indices.refresh(index=self.index_name)
- return failed_list
- except Exception as e:
- traceback.print_exc()
- logger.error(f"Elasticsearch bulk insert doc error:{e}")
- def delete_doc_by_id(self, doc_ids):
- """ 删除文档 """
- try:
- failed_list = []
- for doc_id in doc_ids:
- response = self.delete(index=self.index_name, id=doc_id)
- # 检查响应状态
- if response.meta.status != 200:
- failed_list.append(doc_id)
- logger.info(f"Document with ID {doc_id} in index {self.index_name} was deleted failed!")
- logger.info(f"Document with ID {doc_id} in index {self.index_name} was deleted successfully.")
- return failed_list
- except Exception as e:
- traceback.print_exc()
- logger.error(f"Elasticsearch delete_doc error:{e}")
- def delete_docs_by_query_body(self, query_body):
- # 使用_delete_by_query API 删除所有文档
- # 注意:这里我们使用了一个匹配所有文档的查询:{"query": {"match_all": {}}}
- try:
- response = self.delete_by_query(index=self.index_name, body=query_body)
- logger.info("Deleted documents:", response['_deleted']) # 这将显示被删除的文档数量
- except Exception as e:
- # 捕获并处理异常
- logger.error(f"Deleted docs error: {e}")
- def update_doc(self, datas: list):
- """ 更新文档 """
- try:
- failed_list = []
- for data in datas:
- # 更新文档(实际上是重新索引)
- response = self.index(index=self.index_name, id=data['doc_id'], body=data['doc'])
- logger.info("Update Response:", response)
- if response.meta.status != 200:
- failed_list.append(data)
- # 刷新索引以立即应用更改(可选)
- self.indices.refresh(index=self.index_name)
- logger.info(f"Elasticsearch update_doc finished! failed_list -> {failed_list}")
- except Exception as e:
- traceback.print_exc()
- logger.error(f"Elasticsearch update_doc error: {e}")
- def get_doc(self, doc_id):
- """ 获取文档数据 """
- try:
- doc = self.get(index=self.index_name, id=doc_id)['_source']
- return doc
- except Exception as e:
- logger.error(f"Error retrieving document {doc_id}: {e}")
- return None
- def search_index(self, query_body):
- """
- 检索文档
- query_body:查询体(Query DSL)
- """
- try:
- logger.info(f'elasticsearch search index_name={self.index_name},query_body={query_body}')
- response = self.search(index=self.index_name, body=query_body)
- logger.info(f'elasticsearch search [{self.index_name}] response.meta.status={response.meta.status}')
- if response.get('_shards', {}).get('successful') == response.get('_shards', {}).get('total'):
- logger.info(
- f"Search index successful! total count={response['hits']['total']['value']}, match count={len(response['hits']['hits'])}")
- # logger.info(f"search response -> {response}")
- if response['hits']['total']['value'] > 0:
- return response
- return None
- return None
- except Exception as e:
- traceback.print_exc()
- logger.error(f"ElasticService search_index error:{e}")
- def search_index_by_scroll_api(self, query_body):
- """
- 分页查询
- query_body:查询体(Query DSL)
- """
- try:
- logger.info(f'elasticsearch search index_name={self.index_name},query_body={query_body}')
- response = self.search(index=self.index_name, body=query_body, scroll='1m')
- logger.info(f'elasticsearch search [{self.index_name}] response.meta.status={response.meta.status}')
- if response.get('_shards', {}).get('successful') == response.get('_shards', {}).get('total'):
- logger.info(
- f"Search index successful! total count={response['hits']['total']['value']}, match count={len(response['hits']['hits'])}")
- # logger.info(f"search response -> {response}")
- if response['hits']['total']['value'] > 0:
- return response
- return None
- return None
- except Exception as e:
- traceback.print_exc()
- logger.error(f"ElasticService search_index error:{e}")
- def search_by_sql(self, sql_body):
- try:
- logger.info(f'elasticsearch search index_name={self.index_name},sql_body={sql_body}')
- response = self.sql.query(body=sql_body)
- logger.info(f'elasticsearch search [{self.index_name}] response.meta.status={response.meta.status}')
- if response.meta.status == 200:
- columns = response.get('columns')
- rows = response.get('rows')
- # 提取列名
- column_names = [col['name'] for col in columns]
- # 组织成字典格式
- result_dicts = []
- for row in rows:
- result_dict = {column_names[i]: row[i] for i in range(len(column_names))}
- result_dicts.append(result_dict)
- logger.info(f"Search index successful! match count={len(result_dicts)}")
- return result_dicts
- return []
- except Exception as e:
- traceback.print_exc()
- logger.error(f"ElasticService search_by_sql error:{e}")
- return []
- def get_elastic_instance(index_name, addrs):
- _es_service = None
- _wait_times = 0
- _try_count = 5
- _interval_seconds = 10
- for i in range(_try_count): # 初始化后,尝试启动5次,第次间隔10秒
- try:
- _es_service = ElasticSearchService(index_name, addrs)
- if _es_service:
- logger.info(f"ElasticService initial successfully!")
- print(f"ElasticService initial successfully!")
- return _es_service
- logger.warning(f"Connect to elasticServer failed, try reconnect to elasticServer [{i}]!")
- except Exception as e:
- traceback.print_exc()
- logger.warning(
- f"初始化ElasticService失败,结果:{_es_service}, 异常原因:{str(e)}, 应用将在{_interval_seconds}秒后重新尝试.")
- time.sleep(_interval_seconds)
- es_service = None
- port = es_conf['port']
- host = es_conf['host']
- addrs = [f"http://{host}:{port}", ]
- if config['elasticsearch']['enabled']:
- index_name = config['elasticsearch']['session_history_index']
- es_service = get_elastic_instance(index_name, addrs)
- else:
- logger.info(f"[elasticsearch] 未启用! enabled -> {config['elasticsearch']['enabled']}")
- if __name__ == '__main__':
- index_name = config['elasticsearch']['session_history_index']
- es_service = get_elastic_instance(index_name, addrs)
- # 添加文档
- docs = [{
- # "id": i + 1,
- "you_feild": "",
- ...
- } for i in range(5)]
- # 插入数据
- # es_service.insert_doc('2', doc)
- print(es_service.bulk_insert_docs(docs))
- # 删除index
- # print(es_service.delete_index(index_name))
- # 获取文档
- # print(es_service.get_doc('c2b27b31-80f8-4cf6-b3f2-36683b60d7da'))
- # logger.info(es_service.get_doc('2'))
- # 删除文档
- # logger.info(es_service.delete_doc_by_id(['f554d0e5-e4cc-4556-952b-b12cdc640fe56']))
- # query_body = {"query": {"match_all": {}}}
- # logger.info(es_service.delete_docs_by_query_body(query_body))
- # 更新数据
- # datas = [{'doc_id': 'c2b27b31-80f8-4cf6-b3f2-36683b60d7da', 'doc': {'qst_content': 'qqq'}}]
- # print(es_service.update_doc(datas))
- # 查询数据
- keyword = "缴清"
- query_body = {
- "query": {
- "multi_match": {
- "query": keyword,
- "fields": ["reply_content", "qst_content", "standard_qst"]
- }
- },
- "from": 0,
- "size": 10,
- "sort": [{
- "chat_qst_time": "desc"
- }]
- }
- # print(es_service.search_index(query_body))
|