Jelajahi Sumber

postgresql数据迁移修改

yuchengwei 4 bulan lalu
induk
melakukan
52c904b175

+ 46 - 16
src/main/java/com/qizhen/healsphere/util/DataMigrationUtil.java

@@ -30,34 +30,44 @@ public class DataMigrationUtil {
     public void migrateByRelations() {
         log.info("开始基于关系的数据迁移...");
         
+        // 从104000条记录后继续
         int offset = 0;
+        log.info("从偏移量 {} 继续处理", offset);
+        
         while (true) {
-            // 分批获取关系
+            // 分批获取关系,使用OFFSET和LIMIT进行分页
             String relationSql = 
                 "SELECT DISTINCT e.src_id, e.dest_id " +
                 "FROM kg_edges e " +
                 "WHERE e.status = 0 AND (e.version IS NULL OR e.version = '1.0') " +
                 "ORDER BY e.src_id, e.dest_id " +
-                "LIMIT ? OFFSET ?";
+                "OFFSET ? " +
+                "LIMIT ?";
             
-            List<Map<String, Object>> relations = jdbcTemplate.queryForList(relationSql, BATCH_SIZE, offset);
+            List<Map<String, Object>> relations = jdbcTemplate.queryForList(relationSql, offset, BATCH_SIZE);
             if (relations.isEmpty()) {
                 break;
             }
             
-            // 收集当前批次需要迁移的节点ID
-            Set<Long> nodeIds = new HashSet<>();
-            relations.forEach(rel -> {
-                nodeIds.add(((Number)rel.get("src_id")).longValue());
-                nodeIds.add(((Number)rel.get("dest_id")).longValue());
-            });
-            
-            // 迁移当前批次的节点及关系
-            migrateNodesWithProperties(nodeIds);
-            migrateRelationships(relations);
-            
-            offset += BATCH_SIZE;
-            log.info("已处理 {} 条关系记录", offset);
+            try {
+                // 收集当前批次需要迁移的节点ID
+                Set<Long> nodeIds = new HashSet<>();
+                relations.forEach(rel -> {
+                    nodeIds.add(((Number)rel.get("src_id")).longValue());
+                    nodeIds.add(((Number)rel.get("dest_id")).longValue());
+                });
+                
+                // 迁移当前批次的节点及关系
+                migrateNodesWithProperties(nodeIds);
+                migrateRelationships(relations);
+                
+                offset += BATCH_SIZE;  // 固定增加批次大小
+                log.info("已处理 {} 条关系记录", offset);
+                
+            } catch (Exception e) {
+                log.error("处理偏移量 {} 的数据时发生错误: {}", offset, e.getMessage());
+                throw e;
+            }
         }
         
         log.info("基于关系的数据迁移完成");
@@ -202,6 +212,26 @@ public class DataMigrationUtil {
                             relationType = "疾病相关传播媒介";
                         }
 
+                        if(relationType.equals("drugs_of")) {
+                            relationType = "厂商相关生产药品";
+                        }
+
+                        if(relationType.equals("need_check")) {
+                            relationType = "疾病相关辅助检查";
+                        }
+                        if(relationType.equals("recommand_eat")) {
+                            relationType = "疾病相关推荐食品";
+                        }
+                        if(relationType.equals("recommand_drug")) {
+                            relationType = "疾病相关好评药品";
+                        }
+                        if(relationType.equals("do_eat")) {
+                            relationType = "疾病相关宜吃食品";
+                        }
+                        if(relationType.equals("no_eat")) {
+                            relationType = "疾病相关忌吃食品";
+                        }
+
                         String cypher = 
                             "MATCH (start:`" + startCategory + "` {name: $startName}), " +
                             "      (end:`" + endCategory + "` {name: $endName}) " +