|
@@ -0,0 +1,221 @@
|
|
|
+package com.qizhen.healsphere.util;
|
|
|
+
|
|
|
+import com.qizhen.healsphere.config.Neo4jUtil;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.neo4j.driver.v1.Session;
|
|
|
+import org.neo4j.driver.v1.StatementResult;
|
|
|
+import org.neo4j.driver.v1.Values;
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.jdbc.core.JdbcTemplate;
|
|
|
+import org.springframework.stereotype.Component;
|
|
|
+import org.springframework.transaction.annotation.Transactional;
|
|
|
+
|
|
|
+import java.util.HashMap;
|
|
|
+import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
+
|
|
|
+@Slf4j
|
|
|
+@Component
|
|
|
+public class DataMigrationUtil {
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private JdbcTemplate jdbcTemplate;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private Neo4jUtil neo4jUtil;
|
|
|
+
|
|
|
+ private static final int BATCH_SIZE = 1000;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 执行完整的数据迁移
|
|
|
+ */
|
|
|
+ @Transactional
|
|
|
+ public void migrateAllData() {
|
|
|
+ try {
|
|
|
+ log.info("开始数据迁移...");
|
|
|
+
|
|
|
+ // 1. 迁移节点
|
|
|
+ migrateNodes();
|
|
|
+
|
|
|
+ // 2. 迁移关系
|
|
|
+ migrateRelationships();
|
|
|
+
|
|
|
+ // 3. 迁移属性
|
|
|
+ migrateProperties();
|
|
|
+
|
|
|
+ log.info("数据迁移完成");
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("数据迁移失败", e);
|
|
|
+ throw new RuntimeException("数据迁移失败", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 迁移节点数据
|
|
|
+ */
|
|
|
+ private void migrateNodes() {
|
|
|
+ log.info("开始迁移节点数据...");
|
|
|
+
|
|
|
+ int offset = 0;
|
|
|
+ while (true) {
|
|
|
+ String sql = "SELECT id, name, category " +
|
|
|
+ "FROM kg_nodes " +
|
|
|
+ "WHERE status = 0 and (version is null and version ='1.0')" + // 只迁移有效数据
|
|
|
+ "ORDER BY id " +
|
|
|
+ "LIMIT ? OFFSET ?";
|
|
|
+
|
|
|
+ List<Map<String, Object>> nodes = jdbcTemplate.queryForList(sql, BATCH_SIZE, offset);
|
|
|
+ if (nodes.isEmpty()) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+
|
|
|
+ try (Session session = neo4jUtil.getSession()) {
|
|
|
+ for (Map<String, Object> node : nodes) {
|
|
|
+ Map<String, Object> properties = new HashMap<>();
|
|
|
+ properties.put("pg_id", node.get("id"));
|
|
|
+ properties.put("source_type", "postgres");
|
|
|
+
|
|
|
+ String cypher = "MERGE (n:`" + node.get("category") + "` {" +
|
|
|
+ "name: $name" +
|
|
|
+ "}) " +
|
|
|
+ "SET n += $properties " +
|
|
|
+ "RETURN n";
|
|
|
+
|
|
|
+ session.run(cypher, Values.parameters(
|
|
|
+ "name", node.get("name"),
|
|
|
+ "properties", properties
|
|
|
+ ));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ offset += BATCH_SIZE;
|
|
|
+ log.info("已迁移 {} 个节点", offset);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 迁移关系数据
|
|
|
+ */
|
|
|
+ private void migrateRelationships() {
|
|
|
+ log.info("开始迁移关系数据...");
|
|
|
+
|
|
|
+ int offset = 0;
|
|
|
+ while (true) {
|
|
|
+ String sql = "SELECT e.id, e.category as rel_category, e.src_id, e.dest_id, " +
|
|
|
+ "e.name, e.version, e.status, " +
|
|
|
+ "n1.category as start_category, n2.category as end_category " +
|
|
|
+ "FROM kg_edges e " +
|
|
|
+ "JOIN kg_nodes n1 ON e.src_id = n1.id " +
|
|
|
+ "JOIN kg_nodes n2 ON e.dest_id = n2.id " +
|
|
|
+ "WHERE e.status = 0 " + // 只迁移有效数据
|
|
|
+ "ORDER BY e.id " +
|
|
|
+ "LIMIT ? OFFSET ?";
|
|
|
+
|
|
|
+ List<Map<String, Object>> relationships = jdbcTemplate.queryForList(sql, BATCH_SIZE, offset);
|
|
|
+ if (relationships.isEmpty()) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+
|
|
|
+ try (Session session = neo4jUtil.getSession()) {
|
|
|
+ for (Map<String, Object> rel : relationships) {
|
|
|
+ Map<String, Object> properties = new HashMap<>();
|
|
|
+ properties.put("pg_id", rel.get("id"));
|
|
|
+ properties.put("category", rel.get("rel_category"));
|
|
|
+ properties.put("version", rel.get("version"));
|
|
|
+ properties.put("status", rel.get("status"));
|
|
|
+
|
|
|
+ String cypher = "MATCH (start:`" + rel.get("start_category") + "` {pg_id: $srcId}), " +
|
|
|
+ "(end:`" + rel.get("end_category") + "` {pg_id: $destId}) " +
|
|
|
+ "MERGE (start)-[r:`" + rel.get("relationship_type") + "`]->(end) " +
|
|
|
+ "SET r += $properties " +
|
|
|
+ "RETURN r";
|
|
|
+
|
|
|
+ session.run(cypher, Values.parameters(
|
|
|
+ "srcId", rel.get("src_id"),
|
|
|
+ "destId", rel.get("dest_id"),
|
|
|
+ "properties", rel.get("properties")
|
|
|
+ ));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ offset += BATCH_SIZE;
|
|
|
+ log.info("已迁移 {} 个关系", offset);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 迁移属性数据
|
|
|
+ */
|
|
|
+ private void migrateProperties() {
|
|
|
+ log.info("开始迁移属性数据...");
|
|
|
+
|
|
|
+ int offset = 0;
|
|
|
+ while (true) {
|
|
|
+ String sql = "SELECT p.ref_id, p.name, p.value, n.category " +
|
|
|
+ "FROM kg_props p " +
|
|
|
+ "JOIN kg_nodes n ON p.ref_id = n.id " +
|
|
|
+ "ORDER BY p.id " +
|
|
|
+ "LIMIT ? OFFSET ?";
|
|
|
+
|
|
|
+ List<Map<String, Object>> properties = jdbcTemplate.queryForList(sql, BATCH_SIZE, offset);
|
|
|
+ if (properties.isEmpty()) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+
|
|
|
+ try (Session session = neo4jUtil.getSession()) {
|
|
|
+ for (Map<String, Object> prop : properties) {
|
|
|
+ String cypher = "MATCH (n:`" + prop.get("category") + "` {pg_id: $refId}) " +
|
|
|
+ "SET n." + prop.get("name") + " = $value " +
|
|
|
+ "RETURN n";
|
|
|
+
|
|
|
+ session.run(cypher, Values.parameters(
|
|
|
+ "refId", prop.get("ref_id"),
|
|
|
+ "value", prop.get("value")
|
|
|
+ ));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ offset += BATCH_SIZE;
|
|
|
+ log.info("已迁移 {} 个属性", offset);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 验证迁移结果
|
|
|
+ */
|
|
|
+ public void validateMigration() {
|
|
|
+ log.info("开始验证数据迁移结果...");
|
|
|
+
|
|
|
+ // 验证节点数量
|
|
|
+ int pgNodeCount = jdbcTemplate.queryForObject(
|
|
|
+ "SELECT COUNT(*) FROM kg_nodes", Integer.class);
|
|
|
+
|
|
|
+ try (Session session = neo4jUtil.getSession()) {
|
|
|
+ StatementResult result = session.run("MATCH (n) RETURN COUNT(n) as count");
|
|
|
+ int neo4jNodeCount = result.single().get("count").asInt();
|
|
|
+
|
|
|
+ log.info("PostgreSQL节点数量: {}", pgNodeCount);
|
|
|
+ log.info("Neo4j节点数量: {}", neo4jNodeCount);
|
|
|
+
|
|
|
+ if (pgNodeCount != neo4jNodeCount) {
|
|
|
+ log.warn("节点数量不匹配!");
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // 验证关系数量
|
|
|
+ int pgRelCount = jdbcTemplate.queryForObject(
|
|
|
+ "SELECT COUNT(*) FROM kg_edges", Integer.class);
|
|
|
+
|
|
|
+ try (Session session = neo4jUtil.getSession()) {
|
|
|
+ StatementResult result = session.run("MATCH ()-[r]->() RETURN COUNT(r) as count");
|
|
|
+ int neo4jRelCount = result.single().get("count").asInt();
|
|
|
+
|
|
|
+ log.info("PostgreSQL关系数量: {}", pgRelCount);
|
|
|
+ log.info("Neo4j关系数量: {}", neo4jRelCount);
|
|
|
+
|
|
|
+ if (pgRelCount != neo4jRelCount) {
|
|
|
+ log.warn("关系数量不匹配!");
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|