123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202 |
- #from utils.es import ElasticsearchOperations
- import asyncio
- import os
- import time
- from utils.file import load_file
- from libs.text_processor import TextProcessor
- import json
- import jieba
- import hashlib
- import numpy as np
- import codecs
- from dotenv import load_dotenv
- load_dotenv()
- class ChuncHelper:
- def __init__(self, data_file: str, output_dir:str, user_dict:str) -> None:
- self.data_file = data_file
- #self.es = ElasticsearchOperations()
- self.processor = TextProcessor()
- self.output_dir = output_dir
- self.stop_words = set()
- if user_dict:
- jieba.load_userdict(user_dict)
- with open(os.getenv("JIEBA_STOP_DICT"),"r",encoding="utf-8") as f:
- for line in f:
- self.stop_words.add(line.strip())
-
- self.json_data = None
- self.load_data()
- def get_doc_id(self, title:str) -> str:
- md = hashlib.md5(title.encode())
- doc_id = md.hexdigest()
- return doc_id
-
- def get_url(self, title:str)->str:
- if self.json_data:
- title = title.strip()
- for item in self.json_data:
- ArticleTitle = item["meta_data"]["ArticleTitle"]
- if title == ArticleTitle:
- return item["url"]
- return "未找到相关内容"
-
- def get(self, title:str)->str:
- if self.json_data:
- title = title.strip()
- for item in self.json_data:
- ArticleTitle = item["meta_data"]["ArticleTitle"]
- if title == ArticleTitle:
- return item["article_text"]
- return "未找到相关内容"
-
- def get_article(self, title:str):
- result = {}
- if self.json_data:
- title = title.strip()
- for item in self.json_data:
- ArticleTitle = item["meta_data"]["ArticleTitle"]
- if title == ArticleTitle:
- result["site_name"] = item["meta_data"]["SiteName"]
- result["site_domain"] = item["meta_data"]["SiteDomain"]
- result["title"] = item["meta_data"]["ArticleTitle"]
- result["author"] = item["meta_data"]["author"]
- result["pub_date"] = item["meta_data"]["PubDate"]
- result["article_text"] = item["article_text"]
- result["url"] = item["url"]
- return result
- return None
- def load_data(self):
- with open(self.data_file,"r",encoding="utf-8") as f:
- lines = f.readlines()
- self.json_data = json.loads(''.join(lines))
- def code_t(self, text):
- t = codecs.encode(text, "utf-8")
- return codecs.decode(t, "utf-8")
-
- def cut_word(self, text: str):
- cut_result = jieba.cut(text)
- filtered_words = [word for word in cut_result if word not in self.stop_words]
- return filtered_words
- def title_reverse_index(self):
- ''' 对文章标题分词,然后基于词建立倒排索引 '''
- if self.json_data:
- index = 1
- words_vector = {}
- total = len(self.json_data)
- for item in self.json_data:
- print(f"\r {index}/{total}", end="")
- title = item["meta_data"]["ArticleTitle"]
- title_cut = jieba.cut(title)
- for word in title_cut:
- if word in words_vector.keys():
- if not title in words_vector[word]["articles"]:
- words_vector[word]["articles"].append(title)
- continue
- words_vector[word] = { "articles": [title] }
- index = index + 1
- with open(f"{self.output_dir}/title_reverse_index.json", "w", encoding="utf-8") as f:
- f.write(json.dumps(words_vector,ensure_ascii=False))
-
- def cut_title_vector(self, seperate=False):
- '''基于文章标题分词,每个词生成向量'''
- words_vector = json.loads(load_file(f"{self.output_dir}/words_vector.json"))
- if self.json_data:
- index = 1
- total = len(self.json_data)
- for item in self.json_data:
- print(f"\r {index}/{total}", end="")
- title = item["meta_data"]["ArticleTitle"]
- title_cut = jieba.cut(title)
- for word in title_cut:
- if word in words_vector.keys():
- continue
- words_vector[word] = self.processor.generate_embeddings([word])[0].tolist()
- index = index + 1
- print("\nwriting words vector files")
-
- if seperate == False:
- with open(f"{self.output_dir}/words_vector.json", "w", encoding="utf-8") as f:
- f.write(json.dumps(cached_json,ensure_ascii=False))
- return
-
- cached_json = {}
- count = 0
- size = 100
- index = 0
- for k in words_vector.keys():
- count += 1
- cached_json[k] = words_vector[k]
- if count == size:
- with open(f"{self.output_dir}/words_vector_{index}.json", "w", encoding="utf-8") as f:
- f.write(json.dumps(cached_json,ensure_ascii=False))
- cached_json = {}
- count = 0
- index += 1
- index += 1
- if len(cached_json)>0:
- with open(f"{self.output_dir}/words_vector_{index}.json", "w", encoding="utf-8") as f:
- f.write(json.dumps(cached_json,ensure_ascii=False))
- def process_data(self):
- '''对每个文章进行处理,生成关键词向量,标题向量,内容分片向量'''
- def default_converter(o):
- if isinstance(o, np.float32):
- return float(o)
- raise TypeError
-
- if self.json_data:
- total = len(self.json_data)
- print(f">>> total {total} documents ")
- index = 1
- data_collection = []
- for item in self.json_data:
- print(f">>> process {index}/{total}")
- data = {"title":self.code_t(item["meta_data"]['ArticleTitle']),
- "keywords":self.code_t(item["meta_data"]['keywords'])}
-
- author = item["meta_data"]["author"]
- description = item["meta_data"]["description"]
- keywords = item["meta_data"]["keywords"]
- SiteName = item["meta_data"]["SiteName"]
- SiteDomain = item["meta_data"]["SiteDomain"]
- SiteIDCode = item["meta_data"]["SiteIDCode"]
- ColumnName = item["meta_data"]["ColumnName"]
- ColumnType = item["meta_data"]["ColumnType"]
- ArticleTitle = item["meta_data"]["ArticleTitle"]
- doc_id = self.get_doc_id(ArticleTitle)
-
- if os.path.exists(f"{self.output_dir}/{doc_id}.json"):
- print(f"{doc_id} existed, skip")
- index = index + 1
- continue
- PubDate = item["meta_data"]["PubDate"]
- ContentSource = item["meta_data"]["ContentSource"]
- article_text = item["article_text"]
- text_len = len(article_text)
- print(f"{ArticleTitle}:{text_len}")
- #txt = [self.code_t(keywords),self.code_t(ArticleTitle),self.code_t(article_text)]
- txt = [self.code_t(keywords),self.code_t(ArticleTitle)]
- chuncs = self.processor.chunk_text(self.code_t(article_text))
- txt = txt + chuncs
- print(">>> start embedding...")
- embeded_text = self.processor.generate_embeddings(txt)
- #embeded_chuncs = self.processor.generate_embeddings(chuncs)
- title_cut = jieba.cut(ArticleTitle)
- keywords_cut = jieba.cut(keywords)
- data["title"] = ArticleTitle
- data["title_cut"] = list(title_cut)
- data["keywords_cut"] = list(keywords_cut)
- data["keywords_vector"] = embeded_text[0].tolist()
- data["title_vector"] = embeded_text[1].tolist()
- data['chuncs'] = chuncs
- #data["content_vector"] = embeded_text[2].tolist()
- data["chuncs_vector"] = embeded_text[2:].tolist()
-
- print(">>> write embedding...")
- with open(f"{self.output_dir}/{doc_id}.json", "w", encoding="utf-8") as f:
- f.write(json.dumps(data,ensure_ascii=False))
-
- print(f"{doc_id} done, {index}/{total}")
- index = index + 1
-
|