123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199 |
- '''
- 这个脚本是用来从dump的图谱数据生成社区算法的
- '''
- import sys,os
- current_path = os.getcwd()
- sys.path.append(current_path)
- import networkx as nx
- import leidenalg
- import igraph as ig
- #import matplotlib.pyplot as plt
- import json
- from datetime import datetime
- from collections import Counter
- #社区报告的分辨率,数字越大,社区数量越少,数字越小,社区数量越多
- #RESOLUTION = 0.07
- #社区报告中是否包括节点的属性列表
- REPORT_INCLUDE_DETAILS = False
- # #图谱数据的缓存路径,数据从dump_graph_data.py生成
- # CACHED_DATA_PATH = f"{current_path}\\web\\cached_data"
- # #最终社区报告的输出路径
- REPORT_PATH = f"{current_path}\\web\\cached_data\\report"
- DENSITY = 0.52
- # def load_entity_data():
- # print("load entity data")
- # with open(f"{CACHED_DATA_PATH}\\entities_med.json", "r", encoding="utf-8") as f:
- # entities = json.load(f)
- # return entities
- # def load_relation_data(g):
- # for i in range(30):
- # if os.path.exists(f"{CACHED_DATA_PATH}\\relationship_med_{i}.json"):
- # print("load entity data", f"{CACHED_DATA_PATH}\\relationship_med_{i}.json")
- # with open(f"{CACHED_DATA_PATH}\\relationship_med_{i}.json", "r", encoding="utf-8") as f:
- # relations = json.load(f)
- # for item in relations:
- # g.add_edge(item[0], item[1], weight=1, **item[2])
-
-
-
- # def generate_enterprise_network():
- # G = nx.Graph()
- # ent_data = load_entity_data()
- # print("load entities completed")
- # for data in ent_data:
- # G.add_node(data[0], **data[1])
- # print("load entities into graph completed")
- # rel_data = load_relation_data(G)
- # print("load relation completed")
- # return G
- # def detect_communities(G):
- # """使用Leiden算法进行社区检测"""
- # # 转换networkx图到igraph格式
-
- # print("convert to igraph")
- # ig_graph = ig.Graph.from_networkx(G)
-
- # # 执行Leiden算法
- # partition = leidenalg.find_partition(
- # ig_graph,
- # leidenalg.CPMVertexPartition,
- # resolution_parameter=RESOLUTION,
- # n_iterations=2
- # )
-
- # # 将社区标签添加到原始图
- # for i, node in enumerate(G.nodes()):
- # G.nodes[node]['community'] = partition.membership[i]
-
- # print("convert to igraph finished")
- # return G, partition
- def generate_report(G, partition):
- """生成结构化分析报告"""
- report = []
- # 报告头信息
- report.append(f"# 疾病图谱关系社区分析报告\n")
- report.append(f"**生成时间**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
- report.append(f"**检测算法**: Leiden Algorithm\n")
- report.append(f"**算法参数**:\n")
- report.append(f"- 分辨率参数: {partition.resolution_parameter:.3f}\n")
- # report.append(f"- 迭代次数: {partition.n_iterations}\n")
- report.append(f"**社区数量**: {len(set(partition.membership))}\n")
- report.append(f"**模块度(Q)**: {partition.quality():.4f}\n")
- print("generate_report header finished")
- report.append("\n## 社区结构分析\n")
- print("generate_report community structure started")
- communities = {}
- for node in G.nodes(data=True):
- comm = node[1]['community']
- if comm not in communities:
- communities[comm] = []
- if 'type' not in node[1]:
- node[1]['type'] = '未知'
- if 'description' not in node[1]:
- node[1]['description'] = '未见描述'
-
- communities[comm].append({
- 'name': node[0],
- **node[1]
- })
-
- print("generate_report community structure finished")
- for comm_id, members in communities.items():
- print("community ", comm_id, "size: ", len(members))
- com_report = []
- com_report.append(f"### 第{comm_id+1}号社区报告 ")
- #com_report.append(f"**社区规模**: {len(members)} 个节点\n")
-
- # 行业类型分布
- type_dist = Counter([m['type'] for m in members])
- com_report.append(f"**类型分布**:")
- for industry, count in type_dist.most_common():
- com_report.append(f"- {industry}: {count} 个 ({count/len(members):.0%})")
-
- com_report.append("\n```json")
- obj_list = []
- names = []
- for member in members:
- obj = {}
- obj["name"] = member['name']
- obj["type"] = member['type']
- obj_list.append(obj)
- #com_report.append(f"'name':'{member['name']}','type':'{member['type']}'")
- names.append(member['name'])
- if REPORT_INCLUDE_DETAILS == False:
- continue
- for k in member.keys():
- if k not in ['name', 'type', 'description', 'community']:
- value = member[k]
- com_report.append(f"\t- {value}")
- com_report.append(json.dumps(obj_list, ensure_ascii=False, indent=4))
-
-
- com_report.append("```")
- com_report.append("\n**成员节点关系**:\n")
- for member in members:
- entities, relations = graph_helper.neighbor_search(member['name'], 1)
- com_report.append(f"- {member['name']} ({member['type']})")
- com_report.append(f"\t- 相关节点")
- for entity in entities:
- if entity['name'] in names:
- com_report.append(f"\t\t- {entity['name']} ({entity['type']})")
- com_report.append(f"\t- 相关关系")
- for relation in relations:
- if relation['src_name'] in names or relation['dest_name'] in names:
- com_report.append(f"\t\t- {relation['src_name']}-({relation['type']})->{relation['dest_name']}")
-
-
- # 计算社区内部连接密度
- subgraph = G.subgraph([m['name'] for m in members])
- density = nx.density(subgraph)
- com_report.append(f"\n**内部连接密度**: {density:.2f}\n")
- if density < DENSITY:
- com_report.append("**社区内部连接相对稀疏**\n")
- else:
- with open(f"{REPORT_PATH}\community_{comm_id}.md", "w", encoding="utf-8") as f:
- f.write("\n".join(com_report))
- print(f"社区 {comm_id+1} 报告文件大小:{len(''.join(com_report).encode('utf-8'))} 字节") # 添加文件生成验证
-
- # 可视化图表
- report.append("\n## 可视化分析\n")
-
- return "\n".join(report)
- if __name__ == "__main__":
- try:
- from libs.graph_helper import GraphHelper
- graph_helper = GraphHelper()
- G = graph_helper.graph
- print("graph loaded")
- # 生成企业关系网络
-
-
- # 执行社区检测
- G, partition = graph_helper.detect_communities()
-
- # 生成分析报告
- report = generate_report(G, partition)
- with open('community_report.md', 'w', encoding='utf-8') as f:
- f.write(report)
- print(f"报告文件大小:{len(report.encode('utf-8'))} 字节") # 添加文件生成验证
-
- print("社区分析报告已生成:community_report.md")
-
-
- except Exception as e:
-
- print(f"运行时错误:{str(e)}")
- raise e
|