|
@@ -1,18 +1,16 @@
|
|
package com.qizhen.healsphere.util;
|
|
package com.qizhen.healsphere.util;
|
|
|
|
|
|
|
|
+import cn.hutool.core.util.StrUtil;
|
|
import com.qizhen.healsphere.config.Neo4jUtil;
|
|
import com.qizhen.healsphere.config.Neo4jUtil;
|
|
import lombok.extern.slf4j.Slf4j;
|
|
import lombok.extern.slf4j.Slf4j;
|
|
import org.neo4j.driver.v1.Session;
|
|
import org.neo4j.driver.v1.Session;
|
|
-import org.neo4j.driver.v1.StatementResult;
|
|
|
|
import org.neo4j.driver.v1.Values;
|
|
import org.neo4j.driver.v1.Values;
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
import org.springframework.jdbc.core.JdbcTemplate;
|
|
import org.springframework.jdbc.core.JdbcTemplate;
|
|
import org.springframework.stereotype.Component;
|
|
import org.springframework.stereotype.Component;
|
|
-import org.springframework.transaction.annotation.Transactional;
|
|
|
|
|
|
|
|
-import java.util.HashMap;
|
|
|
|
-import java.util.List;
|
|
|
|
-import java.util.Map;
|
|
|
|
|
|
+import java.io.*;
|
|
|
|
+import java.util.*;
|
|
|
|
|
|
@Slf4j
|
|
@Slf4j
|
|
@Component
|
|
@Component
|
|
@@ -27,195 +25,247 @@ public class DataMigrationUtil {
|
|
private static final int BATCH_SIZE = 1000;
|
|
private static final int BATCH_SIZE = 1000;
|
|
|
|
|
|
/**
|
|
/**
|
|
- * 执行完整的数据迁移
|
|
|
|
|
|
+ * 基于关系的数据迁移
|
|
*/
|
|
*/
|
|
- @Transactional
|
|
|
|
- public void migrateAllData() {
|
|
|
|
- try {
|
|
|
|
- log.info("开始数据迁移...");
|
|
|
|
|
|
+ public void migrateByRelations() {
|
|
|
|
+ log.info("开始基于关系的数据迁移...");
|
|
|
|
+
|
|
|
|
+ int offset = 0;
|
|
|
|
+ while (true) {
|
|
|
|
+ // 分批获取关系
|
|
|
|
+ String relationSql =
|
|
|
|
+ "SELECT DISTINCT e.src_id, e.dest_id " +
|
|
|
|
+ "FROM kg_edges e " +
|
|
|
|
+ "WHERE e.status = 0 AND (e.version IS NULL OR e.version = '1.0') " +
|
|
|
|
+ "ORDER BY e.src_id, e.dest_id " +
|
|
|
|
+ "LIMIT ? OFFSET ?";
|
|
|
|
|
|
- // 1. 迁移节点
|
|
|
|
- migrateNodes();
|
|
|
|
|
|
+ List<Map<String, Object>> relations = jdbcTemplate.queryForList(relationSql, BATCH_SIZE, offset);
|
|
|
|
+ if (relations.isEmpty()) {
|
|
|
|
+ break;
|
|
|
|
+ }
|
|
|
|
|
|
- // 2. 迁移关系
|
|
|
|
- migrateRelationships();
|
|
|
|
|
|
+ // 收集当前批次需要迁移的节点ID
|
|
|
|
+ Set<Long> nodeIds = new HashSet<>();
|
|
|
|
+ relations.forEach(rel -> {
|
|
|
|
+ nodeIds.add(((Number)rel.get("src_id")).longValue());
|
|
|
|
+ nodeIds.add(((Number)rel.get("dest_id")).longValue());
|
|
|
|
+ });
|
|
|
|
|
|
- // 3. 迁移属性
|
|
|
|
- migrateProperties();
|
|
|
|
|
|
+ // 迁移当前批次的节点及关系
|
|
|
|
+ migrateNodesWithProperties(nodeIds);
|
|
|
|
+ migrateRelationships(relations);
|
|
|
|
|
|
- log.info("数据迁移完成");
|
|
|
|
- } catch (Exception e) {
|
|
|
|
- log.error("数据迁移失败", e);
|
|
|
|
- throw new RuntimeException("数据迁移失败", e);
|
|
|
|
|
|
+ offset += BATCH_SIZE;
|
|
|
|
+ log.info("已处理 {} 条关系记录", offset);
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ log.info("基于关系的数据迁移完成");
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
- * 迁移节点数据
|
|
|
|
|
|
+ * 分批迁移节点及其属性
|
|
*/
|
|
*/
|
|
- private void migrateNodes() {
|
|
|
|
- log.info("开始迁移节点数据...");
|
|
|
|
-
|
|
|
|
- int offset = 0;
|
|
|
|
- while (true) {
|
|
|
|
- String sql = "SELECT id, name, category " +
|
|
|
|
- "FROM kg_nodes " +
|
|
|
|
- "WHERE status = 0 and (version is null and version ='1.0')" + // 只迁移有效数据
|
|
|
|
- "ORDER BY id " +
|
|
|
|
- "LIMIT ? OFFSET ?";
|
|
|
|
|
|
+ private void migrateNodesWithProperties(Set<Long> nodeIds) {
|
|
|
|
+ if (nodeIds.isEmpty()) {
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // 将节点ID列表分批处理
|
|
|
|
+ List<Long> idList = new ArrayList<>(nodeIds);
|
|
|
|
+ for (int i = 0; i < idList.size(); i += BATCH_SIZE) {
|
|
|
|
+ int endIndex = Math.min(i + BATCH_SIZE, idList.size());
|
|
|
|
+ List<Long> batchIds = idList.subList(i, endIndex);
|
|
|
|
|
|
- List<Map<String, Object>> nodes = jdbcTemplate.queryForList(sql, BATCH_SIZE, offset);
|
|
|
|
- if (nodes.isEmpty()) {
|
|
|
|
- break;
|
|
|
|
|
|
+ // 构建 IN 子句
|
|
|
|
+ StringBuilder inClause = new StringBuilder();
|
|
|
|
+ for (int j = 0; j < batchIds.size(); j++) {
|
|
|
|
+ inClause.append(j == 0 ? "?" : ",?");
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ String nodeQuery =
|
|
|
|
+ "SELECT n.id, n.name, n.category, n.status, n.version, " +
|
|
|
|
+ " p.prop_title as prop_name, p.prop_value as prop_value " +
|
|
|
|
+ "FROM kg_nodes n " +
|
|
|
|
+ "LEFT JOIN kg_props p ON n.id = p.ref_id " +
|
|
|
|
+ "WHERE n.id IN (" + inClause + ")";
|
|
|
|
|
|
try (Session session = neo4jUtil.getSession()) {
|
|
try (Session session = neo4jUtil.getSession()) {
|
|
- for (Map<String, Object> node : nodes) {
|
|
|
|
- Map<String, Object> properties = new HashMap<>();
|
|
|
|
- properties.put("pg_id", node.get("id"));
|
|
|
|
- properties.put("source_type", "postgres");
|
|
|
|
|
|
+ Map<Long, Map<String, Object>> nodeProperties = new HashMap<>();
|
|
|
|
+
|
|
|
|
+ // 处理节点属性
|
|
|
|
+ jdbcTemplate.queryForList(nodeQuery, batchIds.toArray()).forEach(row -> {
|
|
|
|
+ Long nodeId = ((Number) row.get("id")).longValue();
|
|
|
|
+ nodeProperties.computeIfAbsent(nodeId, k -> new HashMap<>());
|
|
|
|
+
|
|
|
|
+ Map<String, Object> properties = nodeProperties.get(nodeId);
|
|
|
|
+ if (properties.isEmpty()) {
|
|
|
|
+ properties.put("pg_id", nodeId);
|
|
|
|
+ properties.put("name", row.get("name"));
|
|
|
|
+ properties.put("category", row.get("category"));
|
|
|
|
+ properties.put("is_deleted", "N");
|
|
|
|
+
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ String propName = (String) row.get("prop_name");
|
|
|
|
+ Object propValue = row.get("prop_value");
|
|
|
|
+ if (propName != null && propValue != null) {
|
|
|
|
+ properties.put(propName, propValue);
|
|
|
|
+ }
|
|
|
|
+ });
|
|
|
|
|
|
- String cypher = "MERGE (n:`" + node.get("category") + "` {" +
|
|
|
|
- "name: $name" +
|
|
|
|
- "}) " +
|
|
|
|
- "SET n += $properties " +
|
|
|
|
- "RETURN n";
|
|
|
|
|
|
+ // 批量创建或更新节点
|
|
|
|
+ for (Map.Entry<Long, Map<String, Object>> entry : nodeProperties.entrySet()) {
|
|
|
|
+ String category = (String) entry.getValue().get("category");
|
|
|
|
+ String chineseCategory = CategoryMappingUtil.toChineseCategory(category);
|
|
|
|
+ if(StrUtil.equals(chineseCategory, "科室")) {
|
|
|
|
+ chineseCategory = "就诊科室";
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ String cypher =
|
|
|
|
+ "MERGE (n:`" + chineseCategory + "` {name: $name}) " +
|
|
|
|
+ "SET n += $properties " +
|
|
|
|
+ "RETURN n";
|
|
|
|
|
|
session.run(cypher, Values.parameters(
|
|
session.run(cypher, Values.parameters(
|
|
- "name", node.get("name"),
|
|
|
|
- "properties", properties
|
|
|
|
|
|
+ "name", entry.getValue().get("name"),
|
|
|
|
+ "properties", entry.getValue()
|
|
));
|
|
));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
-
|
|
|
|
- offset += BATCH_SIZE;
|
|
|
|
- log.info("已迁移 {} 个节点", offset);
|
|
|
|
|
|
+
|
|
|
|
+ log.info("已处理 {} 个节点", endIndex);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
- * 迁移关系数据
|
|
|
|
|
|
+ * 分批迁移关系
|
|
*/
|
|
*/
|
|
- private void migrateRelationships() {
|
|
|
|
- log.info("开始迁移关系数据...");
|
|
|
|
-
|
|
|
|
- int offset = 0;
|
|
|
|
- while (true) {
|
|
|
|
- String sql = "SELECT e.id, e.category as rel_category, e.src_id, e.dest_id, " +
|
|
|
|
- "e.name, e.version, e.status, " +
|
|
|
|
- "n1.category as start_category, n2.category as end_category " +
|
|
|
|
|
|
+ private void migrateRelationships(List<Map<String, Object>> relations) {
|
|
|
|
+ if (relations.isEmpty()) {
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // 分批处理关系
|
|
|
|
+ for (int i = 0; i < relations.size(); i += BATCH_SIZE) {
|
|
|
|
+ int endIndex = Math.min(i + BATCH_SIZE, relations.size());
|
|
|
|
+ List<Map<String, Object>> batchRelations = relations.subList(i, endIndex);
|
|
|
|
+
|
|
|
|
+ try (Session session = neo4jUtil.getSession()) {
|
|
|
|
+ for (Map<String, Object> relation : batchRelations) {
|
|
|
|
+ String relationQuery =
|
|
|
|
+ "SELECT e.*, " +
|
|
|
|
+ " n1.category as start_category, n1.name as start_name, " +
|
|
|
|
+ " n2.category as end_category, n2.name as end_name " +
|
|
"FROM kg_edges e " +
|
|
"FROM kg_edges e " +
|
|
"JOIN kg_nodes n1 ON e.src_id = n1.id " +
|
|
"JOIN kg_nodes n1 ON e.src_id = n1.id " +
|
|
"JOIN kg_nodes n2 ON e.dest_id = n2.id " +
|
|
"JOIN kg_nodes n2 ON e.dest_id = n2.id " +
|
|
- "WHERE e.status = 0 " + // 只迁移有效数据
|
|
|
|
- "ORDER BY e.id " +
|
|
|
|
- "LIMIT ? OFFSET ?";
|
|
|
|
-
|
|
|
|
- List<Map<String, Object>> relationships = jdbcTemplate.queryForList(sql, BATCH_SIZE, offset);
|
|
|
|
- if (relationships.isEmpty()) {
|
|
|
|
- break;
|
|
|
|
- }
|
|
|
|
|
|
+ "WHERE e.src_id = ? AND e.dest_id = ?";
|
|
|
|
|
|
- try (Session session = neo4jUtil.getSession()) {
|
|
|
|
- for (Map<String, Object> rel : relationships) {
|
|
|
|
- Map<String, Object> properties = new HashMap<>();
|
|
|
|
- properties.put("pg_id", rel.get("id"));
|
|
|
|
- properties.put("category", rel.get("rel_category"));
|
|
|
|
- properties.put("version", rel.get("version"));
|
|
|
|
- properties.put("status", rel.get("status"));
|
|
|
|
-
|
|
|
|
- String cypher = "MATCH (start:`" + rel.get("start_category") + "` {pg_id: $srcId}), " +
|
|
|
|
- "(end:`" + rel.get("end_category") + "` {pg_id: $destId}) " +
|
|
|
|
- "MERGE (start)-[r:`" + rel.get("relationship_type") + "`]->(end) " +
|
|
|
|
- "SET r += $properties " +
|
|
|
|
- "RETURN r";
|
|
|
|
|
|
+ Object[] params = new Object[]{
|
|
|
|
+ relation.get("src_id"),
|
|
|
|
+ relation.get("dest_id")
|
|
|
|
+ };
|
|
|
|
|
|
- session.run(cypher, Values.parameters(
|
|
|
|
- "srcId", rel.get("src_id"),
|
|
|
|
- "destId", rel.get("dest_id"),
|
|
|
|
- "properties", rel.get("properties")
|
|
|
|
- ));
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
|
|
+ List<Map<String, Object>> relationDetails = jdbcTemplate.queryForList(relationQuery, params);
|
|
|
|
+
|
|
|
|
+ for (Map<String, Object> rel : relationDetails) {
|
|
|
|
+ String startCategory = CategoryMappingUtil.toChineseCategory((String) rel.get("start_category"));
|
|
|
|
+ String endCategory = CategoryMappingUtil.toChineseCategory((String) rel.get("end_category"));
|
|
|
|
|
|
- offset += BATCH_SIZE;
|
|
|
|
- log.info("已迁移 {} 个关系", offset);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
|
|
+ if(StrUtil.equals(startCategory, "科室")) {
|
|
|
|
+ startCategory = "就诊科室";
|
|
|
|
+ }
|
|
|
|
|
|
- /**
|
|
|
|
- * 迁移属性数据
|
|
|
|
- */
|
|
|
|
- private void migrateProperties() {
|
|
|
|
- log.info("开始迁移属性数据...");
|
|
|
|
-
|
|
|
|
- int offset = 0;
|
|
|
|
- while (true) {
|
|
|
|
- String sql = "SELECT p.ref_id, p.name, p.value, n.category " +
|
|
|
|
- "FROM kg_props p " +
|
|
|
|
- "JOIN kg_nodes n ON p.ref_id = n.id " +
|
|
|
|
- "ORDER BY p.id " +
|
|
|
|
- "LIMIT ? OFFSET ?";
|
|
|
|
-
|
|
|
|
- List<Map<String, Object>> properties = jdbcTemplate.queryForList(sql, BATCH_SIZE, offset);
|
|
|
|
- if (properties.isEmpty()) {
|
|
|
|
- break;
|
|
|
|
- }
|
|
|
|
|
|
+ if(StrUtil.equals(endCategory, "科室")) {
|
|
|
|
+ endCategory = "就诊科室";
|
|
|
|
+ }
|
|
|
|
|
|
- try (Session session = neo4jUtil.getSession()) {
|
|
|
|
- for (Map<String, Object> prop : properties) {
|
|
|
|
- String cypher = "MATCH (n:`" + prop.get("category") + "` {pg_id: $refId}) " +
|
|
|
|
- "SET n." + prop.get("name") + " = $value " +
|
|
|
|
- "RETURN n";
|
|
|
|
|
|
+ Map<String, Object> properties = new HashMap<>();
|
|
|
|
+ properties.put("pg_id", rel.get("id"));
|
|
|
|
+// properties.put("name", rel.get("name"));
|
|
|
|
|
|
- session.run(cypher, Values.parameters(
|
|
|
|
- "refId", prop.get("ref_id"),
|
|
|
|
- "value", prop.get("value")
|
|
|
|
- ));
|
|
|
|
|
|
+ String relationType = (String) rel.get("category");
|
|
|
|
+ if(startCategory.equals("疾病") && endCategory.equals("疾病") && relationType.equals("acompany_with")) {
|
|
|
|
+ relationType = "疾病相关并发症";
|
|
|
|
+ }
|
|
|
|
+ if(startCategory.equals("疾病") && endCategory.equals("药品") && relationType.equals("common_drug")) {
|
|
|
|
+ relationType = "疾病相关治疗药物";
|
|
|
|
+ }
|
|
|
|
+ if(startCategory.equals("疾病") && endCategory.equals("症状") && relationType.equals("has_symptom")) {
|
|
|
|
+ relationType = "疾病相关症状";
|
|
|
|
+ }
|
|
|
|
+ if(startCategory.equals("疾病") && endCategory.equals("就诊科室") && relationType.equals("belongs_to")) {
|
|
|
|
+ relationType = "疾病相关就诊科室";
|
|
|
|
+ }
|
|
|
|
+ if(startCategory.equals("疾病") && endCategory.equals("传播媒介") && relationType.equals("disease_vectors")) {
|
|
|
|
+ relationType = "疾病相关传播媒介";
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ String cypher =
|
|
|
|
+ "MATCH (start:`" + startCategory + "` {name: $startName}), " +
|
|
|
|
+ " (end:`" + endCategory + "` {name: $endName}) " +
|
|
|
|
+ "MERGE (start)-[r:`" + relationType + "`]->(end) " +
|
|
|
|
+ "SET r += $properties " +
|
|
|
|
+ "RETURN r";
|
|
|
|
+
|
|
|
|
+ session.run(cypher, Values.parameters(
|
|
|
|
+ "startName", rel.get("start_name"),
|
|
|
|
+ "endName", rel.get("end_name"),
|
|
|
|
+ "properties", properties
|
|
|
|
+ ));
|
|
|
|
+ }
|
|
}
|
|
}
|
|
}
|
|
}
|
|
-
|
|
|
|
- offset += BATCH_SIZE;
|
|
|
|
- log.info("已迁移 {} 个属性", offset);
|
|
|
|
|
|
+
|
|
|
|
+ log.info("已处理 {} 条关系", endIndex);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- /**
|
|
|
|
- * 验证迁移结果
|
|
|
|
- */
|
|
|
|
- public void validateMigration() {
|
|
|
|
- log.info("开始验证数据迁移结果...");
|
|
|
|
-
|
|
|
|
- // 验证节点数量
|
|
|
|
- int pgNodeCount = jdbcTemplate.queryForObject(
|
|
|
|
- "SELECT COUNT(*) FROM kg_nodes", Integer.class);
|
|
|
|
-
|
|
|
|
- try (Session session = neo4jUtil.getSession()) {
|
|
|
|
- StatementResult result = session.run("MATCH (n) RETURN COUNT(n) as count");
|
|
|
|
- int neo4jNodeCount = result.single().get("count").asInt();
|
|
|
|
-
|
|
|
|
- log.info("PostgreSQL节点数量: {}", pgNodeCount);
|
|
|
|
- log.info("Neo4j节点数量: {}", neo4jNodeCount);
|
|
|
|
-
|
|
|
|
- if (pgNodeCount != neo4jNodeCount) {
|
|
|
|
- log.warn("节点数量不匹配!");
|
|
|
|
|
|
+ public static void main(String[] args) {
|
|
|
|
+ // 读取ICD10编码映射
|
|
|
|
+ Map<String, String[]> icdMap = new HashMap<>();
|
|
|
|
+ try (BufferedReader br = new BufferedReader(new FileReader("src/main/resources/疾病icd10编码.csv"))) {
|
|
|
|
+ String line;
|
|
|
|
+ // 跳过表头
|
|
|
|
+ br.readLine();
|
|
|
|
+ while ((line = br.readLine()) != null) {
|
|
|
|
+ String[] parts = line.split(",");
|
|
|
|
+ if (parts.length >= 3) {
|
|
|
|
+ String diseaseName = parts[0].trim();
|
|
|
|
+ String[] codes = new String[]{
|
|
|
|
+ parts[1].trim(), // 医保编码
|
|
|
|
+ parts[2].trim() // 临床编码
|
|
|
|
+ };
|
|
|
|
+ icdMap.put(diseaseName, codes);
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
+ } catch (IOException e) {
|
|
|
|
+ e.printStackTrace();
|
|
|
|
+ return;
|
|
}
|
|
}
|
|
-
|
|
|
|
- // 验证关系数量
|
|
|
|
- int pgRelCount = jdbcTemplate.queryForObject(
|
|
|
|
- "SELECT COUNT(*) FROM kg_edges", Integer.class);
|
|
|
|
-
|
|
|
|
- try (Session session = neo4jUtil.getSession()) {
|
|
|
|
- StatementResult result = session.run("MATCH ()-[r]->() RETURN COUNT(r) as count");
|
|
|
|
- int neo4jRelCount = result.single().get("count").asInt();
|
|
|
|
-
|
|
|
|
- log.info("PostgreSQL关系数量: {}", pgRelCount);
|
|
|
|
- log.info("Neo4j关系数量: {}", neo4jRelCount);
|
|
|
|
-
|
|
|
|
- if (pgRelCount != neo4jRelCount) {
|
|
|
|
- log.warn("关系数量不匹配!");
|
|
|
|
|
|
+
|
|
|
|
+ // 读取neo4j疾病名称并写入新文件
|
|
|
|
+ try (BufferedReader br = new BufferedReader(new FileReader("src/main/resources/neo4j中所有疾病名称.csv"));
|
|
|
|
+ BufferedWriter bw = new BufferedWriter(new FileWriter("src/main/resources/疾病名称与ICD编码对照.csv"))) {
|
|
|
|
+
|
|
|
|
+ // 写入表头
|
|
|
|
+ bw.write("疾病名称,医保ICD10编码,临床ICD10编码\n");
|
|
|
|
+
|
|
|
|
+ String line;
|
|
|
|
+ while ((line = br.readLine()) != null) {
|
|
|
|
+ String diseaseName = line.trim();
|
|
|
|
+ String[] codes = icdMap.getOrDefault(diseaseName, new String[]{"", ""});
|
|
|
|
+
|
|
|
|
+ // 写入数据行
|
|
|
|
+ bw.write(String.format("%s,%s,%s\n",
|
|
|
|
+ diseaseName,
|
|
|
|
+ codes[0],
|
|
|
|
+ codes[1]
|
|
|
|
+ ));
|
|
}
|
|
}
|
|
|
|
+ } catch (IOException e) {
|
|
|
|
+ e.printStackTrace();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|