#from utils.es import ElasticsearchOperations import asyncio import os import time from utils.file import load_file from libs.text_processor import TextProcessor import json import jieba import hashlib import numpy as np import codecs from dotenv import load_dotenv load_dotenv() class ChuncHelper: def __init__(self, data_file: str, output_dir:str, user_dict:str) -> None: self.data_file = data_file #self.es = ElasticsearchOperations() self.processor = TextProcessor() self.output_dir = output_dir self.stop_words = set() if user_dict: jieba.load_userdict(user_dict) with open(os.getenv("JIEBA_STOP_DICT"),"r",encoding="utf-8") as f: for line in f: self.stop_words.add(line.strip()) self.json_data = None self.load_data() def get_doc_id(self, title:str) -> str: md = hashlib.md5(title.encode()) doc_id = md.hexdigest() return doc_id def get_url(self, title:str)->str: if self.json_data: title = title.strip() for item in self.json_data: ArticleTitle = item["meta_data"]["ArticleTitle"] if title == ArticleTitle: return item["url"] return "未找到相关内容" def get(self, title:str)->str: if self.json_data: title = title.strip() for item in self.json_data: ArticleTitle = item["meta_data"]["ArticleTitle"] if title == ArticleTitle: return item["article_text"] return "未找到相关内容" def get_article(self, title:str): result = {} if self.json_data: title = title.strip() for item in self.json_data: ArticleTitle = item["meta_data"]["ArticleTitle"] if title == ArticleTitle: result["site_name"] = item["meta_data"]["SiteName"] result["site_domain"] = item["meta_data"]["SiteDomain"] result["title"] = item["meta_data"]["ArticleTitle"] result["author"] = item["meta_data"]["author"] result["pub_date"] = item["meta_data"]["PubDate"] result["article_text"] = item["article_text"] result["url"] = item["url"] return result return None def load_data(self): with open(self.data_file,"r",encoding="utf-8") as f: lines = f.readlines() self.json_data = json.loads(''.join(lines)) def code_t(self, text): t = codecs.encode(text, "utf-8") return codecs.decode(t, "utf-8") def cut_word(self, text: str): cut_result = jieba.cut(text) filtered_words = [word for word in cut_result if word not in self.stop_words] return filtered_words def title_reverse_index(self): ''' 对文章标题分词,然后基于词建立倒排索引 ''' if self.json_data: index = 1 words_vector = {} total = len(self.json_data) for item in self.json_data: print(f"\r {index}/{total}", end="") title = item["meta_data"]["ArticleTitle"] title_cut = jieba.cut(title) for word in title_cut: if word in words_vector.keys(): if not title in words_vector[word]["articles"]: words_vector[word]["articles"].append(title) continue words_vector[word] = { "articles": [title] } index = index + 1 with open(f"{self.output_dir}/title_reverse_index.json", "w", encoding="utf-8") as f: f.write(json.dumps(words_vector,ensure_ascii=False)) def cut_title_vector(self, seperate=False): '''基于文章标题分词,每个词生成向量''' words_vector = json.loads(load_file(f"{self.output_dir}/words_vector.json")) if self.json_data: index = 1 total = len(self.json_data) for item in self.json_data: print(f"\r {index}/{total}", end="") title = item["meta_data"]["ArticleTitle"] title_cut = jieba.cut(title) for word in title_cut: if word in words_vector.keys(): continue words_vector[word] = self.processor.generate_embeddings([word])[0].tolist() index = index + 1 print("\nwriting words vector files") if seperate == False: with open(f"{self.output_dir}/words_vector.json", "w", encoding="utf-8") as f: f.write(json.dumps(cached_json,ensure_ascii=False)) return cached_json = {} count = 0 size = 100 index = 0 for k in words_vector.keys(): count += 1 cached_json[k] = words_vector[k] if count == size: with open(f"{self.output_dir}/words_vector_{index}.json", "w", encoding="utf-8") as f: f.write(json.dumps(cached_json,ensure_ascii=False)) cached_json = {} count = 0 index += 1 index += 1 if len(cached_json)>0: with open(f"{self.output_dir}/words_vector_{index}.json", "w", encoding="utf-8") as f: f.write(json.dumps(cached_json,ensure_ascii=False)) def process_data(self): '''对每个文章进行处理,生成关键词向量,标题向量,内容分片向量''' def default_converter(o): if isinstance(o, np.float32): return float(o) raise TypeError if self.json_data: total = len(self.json_data) print(f">>> total {total} documents ") index = 1 data_collection = [] for item in self.json_data: print(f">>> process {index}/{total}") data = {"title":self.code_t(item["meta_data"]['ArticleTitle']), "keywords":self.code_t(item["meta_data"]['keywords'])} author = item["meta_data"]["author"] description = item["meta_data"]["description"] keywords = item["meta_data"]["keywords"] SiteName = item["meta_data"]["SiteName"] SiteDomain = item["meta_data"]["SiteDomain"] SiteIDCode = item["meta_data"]["SiteIDCode"] ColumnName = item["meta_data"]["ColumnName"] ColumnType = item["meta_data"]["ColumnType"] ArticleTitle = item["meta_data"]["ArticleTitle"] doc_id = self.get_doc_id(ArticleTitle) if os.path.exists(f"{self.output_dir}/{doc_id}.json"): print(f"{doc_id} existed, skip") index = index + 1 continue PubDate = item["meta_data"]["PubDate"] ContentSource = item["meta_data"]["ContentSource"] article_text = item["article_text"] text_len = len(article_text) print(f"{ArticleTitle}:{text_len}") #txt = [self.code_t(keywords),self.code_t(ArticleTitle),self.code_t(article_text)] txt = [self.code_t(keywords),self.code_t(ArticleTitle)] chuncs = self.processor.chunk_text(self.code_t(article_text)) txt = txt + chuncs print(">>> start embedding...") embeded_text = self.processor.generate_embeddings(txt) #embeded_chuncs = self.processor.generate_embeddings(chuncs) title_cut = jieba.cut(ArticleTitle) keywords_cut = jieba.cut(keywords) data["title"] = ArticleTitle data["title_cut"] = list(title_cut) data["keywords_cut"] = list(keywords_cut) data["keywords_vector"] = embeded_text[0].tolist() data["title_vector"] = embeded_text[1].tolist() data['chuncs'] = chuncs #data["content_vector"] = embeded_text[2].tolist() data["chuncs_vector"] = embeded_text[2:].tolist() print(">>> write embedding...") with open(f"{self.output_dir}/{doc_id}.json", "w", encoding="utf-8") as f: f.write(json.dumps(data,ensure_ascii=False)) print(f"{doc_id} done, {index}/{total}") index = index + 1