''' 这个脚本是用来从dump的图谱数据生成社区算法的 ''' import sys,os current_path = os.getcwd() sys.path.append(current_path) import networkx as nx import leidenalg import igraph as ig #import matplotlib.pyplot as plt import json from datetime import datetime from collections import Counter #社区报告的分辨率,数字越大,社区数量越少,数字越小,社区数量越多 #RESOLUTION = 0.07 #社区报告中是否包括节点的属性列表 REPORT_INCLUDE_DETAILS = False # #图谱数据的缓存路径,数据从dump_graph_data.py生成 # CACHED_DATA_PATH = f"{current_path}\\web\\cached_data" # #最终社区报告的输出路径 REPORT_PATH = f"{current_path}\\web\\cached_data\\report" DENSITY = 0.52 # def load_entity_data(): # print("load entity data") # with open(f"{CACHED_DATA_PATH}\\entities_med.json", "r", encoding="utf-8") as f: # entities = json.load(f) # return entities # def load_relation_data(g): # for i in range(30): # if os.path.exists(f"{CACHED_DATA_PATH}\\relationship_med_{i}.json"): # print("load entity data", f"{CACHED_DATA_PATH}\\relationship_med_{i}.json") # with open(f"{CACHED_DATA_PATH}\\relationship_med_{i}.json", "r", encoding="utf-8") as f: # relations = json.load(f) # for item in relations: # g.add_edge(item[0], item[1], weight=1, **item[2]) # def generate_enterprise_network(): # G = nx.Graph() # ent_data = load_entity_data() # print("load entities completed") # for data in ent_data: # G.add_node(data[0], **data[1]) # print("load entities into graph completed") # rel_data = load_relation_data(G) # print("load relation completed") # return G # def detect_communities(G): # """使用Leiden算法进行社区检测""" # # 转换networkx图到igraph格式 # print("convert to igraph") # ig_graph = ig.Graph.from_networkx(G) # # 执行Leiden算法 # partition = leidenalg.find_partition( # ig_graph, # leidenalg.CPMVertexPartition, # resolution_parameter=RESOLUTION, # n_iterations=2 # ) # # 将社区标签添加到原始图 # for i, node in enumerate(G.nodes()): # G.nodes[node]['community'] = partition.membership[i] # print("convert to igraph finished") # return G, partition def generate_report(G, partition): """生成结构化分析报告""" report = [] # 报告头信息 report.append(f"# 疾病图谱关系社区分析报告\n") report.append(f"**生成时间**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") report.append(f"**检测算法**: Leiden Algorithm\n") report.append(f"**算法参数**:\n") report.append(f"- 分辨率参数: {partition.resolution_parameter:.3f}\n") # report.append(f"- 迭代次数: {partition.n_iterations}\n") report.append(f"**社区数量**: {len(set(partition.membership))}\n") report.append(f"**模块度(Q)**: {partition.quality():.4f}\n") print("generate_report header finished") report.append("\n## 社区结构分析\n") print("generate_report community structure started") communities = {} for node in G.nodes(data=True): comm = node[1]['community'] if comm not in communities: communities[comm] = [] if 'type' not in node[1]: node[1]['type'] = '未知' if 'description' not in node[1]: node[1]['description'] = '未见描述' communities[comm].append({ 'name': node[0], **node[1] }) print("generate_report community structure finished") for comm_id, members in communities.items(): print("community ", comm_id, "size: ", len(members)) com_report = [] com_report.append(f"### 第{comm_id+1}号社区报告 ") #com_report.append(f"**社区规模**: {len(members)} 个节点\n") # 行业类型分布 type_dist = Counter([m['type'] for m in members]) com_report.append(f"**类型分布**:") for industry, count in type_dist.most_common(): com_report.append(f"- {industry}: {count} 个 ({count/len(members):.0%})") com_report.append("\n```json") obj_list = [] names = [] for member in members: obj = {} obj["name"] = member['name'] obj["type"] = member['type'] obj_list.append(obj) #com_report.append(f"'name':'{member['name']}','type':'{member['type']}'") names.append(member['name']) if REPORT_INCLUDE_DETAILS == False: continue for k in member.keys(): if k not in ['name', 'type', 'description', 'community']: value = member[k] com_report.append(f"\t- {value}") com_report.append(json.dumps(obj_list, ensure_ascii=False, indent=4)) com_report.append("```") com_report.append("\n**成员节点关系**:\n") for member in members: entities, relations = graph_helper.neighbor_search(member['name'], 1) com_report.append(f"- {member['name']} ({member['type']})") com_report.append(f"\t- 相关节点") for entity in entities: if entity['name'] in names: com_report.append(f"\t\t- {entity['name']} ({entity['type']})") com_report.append(f"\t- 相关关系") for relation in relations: if relation['src_name'] in names or relation['dest_name'] in names: com_report.append(f"\t\t- {relation['src_name']}-({relation['type']})->{relation['dest_name']}") # 计算社区内部连接密度 subgraph = G.subgraph([m['name'] for m in members]) density = nx.density(subgraph) com_report.append(f"\n**内部连接密度**: {density:.2f}\n") if density < DENSITY: com_report.append("**社区内部连接相对稀疏**\n") else: with open(f"{REPORT_PATH}\community_{comm_id}.md", "w", encoding="utf-8") as f: f.write("\n".join(com_report)) print(f"社区 {comm_id+1} 报告文件大小:{len(''.join(com_report).encode('utf-8'))} 字节") # 添加文件生成验证 # 可视化图表 report.append("\n## 可视化分析\n") return "\n".join(report) if __name__ == "__main__": try: from libs.graph_helper import GraphHelper graph_helper = GraphHelper() G = graph_helper.graph print("graph loaded") # 生成企业关系网络 # 执行社区检测 G, partition = graph_helper.detect_communities() # 生成分析报告 report = generate_report(G, partition) with open('community_report.md', 'w', encoding='utf-8') as f: f.write(report) print(f"报告文件大小:{len(report.encode('utf-8'))} 字节") # 添加文件生成验证 print("社区分析报告已生成:community_report.md") except Exception as e: print(f"运行时错误:{str(e)}") raise e