Parcourir la source

厦门五院异步接口更新

wangsy il y a 11 mois
Parent
commit
f06c88c7f8

+ 85 - 51
src/main/java/com/diagbot/facade/data/StructuralDataFacade.java

@@ -2,6 +2,8 @@ package com.diagbot.facade.data;
 
 import java.util.*;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 
 import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
@@ -138,6 +140,8 @@ public class StructuralDataFacade {
     @Autowired
     private MedTaskQueueFacade medTaskQueueFacade;
 
+    private final ExecutorService executor = Executors.newFixedThreadPool(50); // 假设需要10个线程
+
     private FJTZDBConnHarp fjtzdbConnHarp = new FJTZDBConnHarp();
     private BlockingQueue<StructuralDataFacade> taskQueue = new LinkedBlockingQueue<>();
     private BlockingQueue<StructuralDataFacade> taskQueueAnalyzeApi = new LinkedBlockingQueue<>();
@@ -209,24 +213,54 @@ public class StructuralDataFacade {
     }
 
     public void startAsyncProcessing(StructuralDataVo structuralDataVos) {
-        Thread workerThread = new Thread(() -> {
-            while (true) {
-                try {
-                    StructuralDataFacade take = taskQueue.take();
-                    if (take != null) {
-                        take.sendStructuralData(structuralDataVos, aMedAbnormalInfoFacade, behospitalInfoFacade);
-                        if (structuralDataVos.getWorkId() != null) {
-                            UpdateWrapper<MedTaskQueue> medTaskQueueUpdate = new UpdateWrapper<>();
-                            medTaskQueueUpdate.eq("hospital_id", structuralDataVos.getHospitalId()).eq("work_id", structuralDataVos.getWorkId()).set("is_execute", IsDeleteEnum.Y.getKey()).set("gmt_modified", new Date());
-                            medTaskQueueFacade.update(medTaskQueueUpdate);
+//        Thread workerThread = new Thread(() -> {
+//            while (true) {
+//                try {
+//                    StructuralDataFacade take = taskQueue.take();
+//                    if (take != null) {
+//                        take.sendStructuralData(structuralDataVos, aMedAbnormalInfoFacade, behospitalInfoFacade);
+//                        if (structuralDataVos.getWorkId() != null) {
+//                            UpdateWrapper<MedTaskQueue> medTaskQueueUpdate = new UpdateWrapper<>();
+//                            medTaskQueueUpdate.eq("hospital_id", structuralDataVos.getHospitalId()).eq("work_id", structuralDataVos.getWorkId()).set("is_execute", IsDeleteEnum.Y.getKey()).set("gmt_modified", new Date());
+//                            medTaskQueueFacade.update(medTaskQueueUpdate);
+//                        }
+//                    }
+//                } catch (InterruptedException e) {
+//                    Thread.currentThread().interrupt();
+//                }
+//            }
+//        });
+//        workerThread.start();
+
+//        Runnable worker = () -> {
+        executor.submit(() -> {
+            try {
+                while (!Thread.currentThread().isInterrupted()) {
+                    try {
+                        // 等待队列中有任务
+                        StructuralDataFacade task = taskQueue.take();
+                        if (task != null) {
+//                            task.execute(); // 假设SomeTaskClass有一个execute方法来执行任务
+                            task.sendStructuralData(structuralDataVos, aMedAbnormalInfoFacade, behospitalInfoFacade);
+                            if (structuralDataVos.getWorkId() != null) {
+                                UpdateWrapper<MedTaskQueue> medTaskQueueUpdate = new UpdateWrapper<>();
+                                medTaskQueueUpdate.eq("hospital_id", structuralDataVos.getHospitalId()).eq("work_id", structuralDataVos.getWorkId()).set("is_execute", IsDeleteEnum.Y.getKey()).set("gmt_modified", new Date());
+                                medTaskQueueFacade.update(medTaskQueueUpdate);
+                            }
                         }
+                    } catch (InterruptedException e) {
+                        // 处理中断,可能是需要关闭线程池
+                        Thread.currentThread().interrupt();
+                        break; // 退出循环
                     }
-                } catch (InterruptedException e) {
-                    Thread.currentThread().interrupt();
                 }
+            } finally {
+                // 在这里可以执行一些清理工作
             }
         });
-        workerThread.start();
+
+        // 注意:通常不会在这里关闭executor,除非你确定不再需要处理任何任务
+        // 在应用程序关闭时,应该优雅地关闭executor
     }
 
     public boolean addToQueue(StructuralDataVo structuralDataVos) {
@@ -235,47 +269,47 @@ public class StructuralDataFacade {
                 , "一般手术记录", "术前讨论、术前小结", "手术知情同意书", "术后首次病程及谈话记录", "疑难病例讨论记录", "手术安全核查表"
                 , "抢救记录", "死亡记录", "病危通知书", "转入记录", "转出记录", "阶段小结", "病重通知书", "会诊结果单");
 
-        if (modeList.contains(structuralDataVos.getModeName())) {
-            //如果任务列表中存在该病历,则返回
-            int count = medTaskQueueFacade.count(new QueryWrapper<MedTaskQueue>()
-                    .eq("behospital_code", structuralDataVos.getBehospitalCode())
-                    .eq("hospital_id", structuralDataVos.getHospitalId())
-                    .eq("is_deleted", IsDeleteEnum.N.getKey())
-                    .eq("is_analyze", "1")
-                    .eq("is_execute", IsDeleteEnum.N.getKey())
-            );
-            System.out.println("单文书质控查询到数量为+++++++++++++++++++++++++++++:" + count);
-            if (count > 0) {
-                log.info("该病历信息已存在于任务队列中。。。。");
-                return false;
-            }
+        if (!modeList.contains(structuralDataVos.getModeName())) {
+            return false;
+        }
+        //如果任务列表中存在该病历,则返回
+        int count = medTaskQueueFacade.count(new QueryWrapper<MedTaskQueue>()
+                .eq("behospital_code", structuralDataVos.getBehospitalCode())
+                .eq("hospital_id", structuralDataVos.getHospitalId())
+                .eq("is_deleted", IsDeleteEnum.N.getKey())
+                .eq("is_analyze", "1")
+                .eq("is_execute", IsDeleteEnum.N.getKey())
+        );
+        if (count > 0) {
+            log.info("该病历信息已存在于任务队列中。。。。");
+            return false;
+        }
 
-            StructuralDataFacade task = new StructuralDataFacade(aMedAbnormalInfoFacade, behospitalInfoFacade);
-            long timestamp = System.currentTimeMillis(); //当前时间戳
-            //加入任务
-            if (taskQueue.offer(task)) {
-                MedTaskQueue medTaskQueue = new MedTaskQueue();
-                if (StringUtil.isBlank(structuralDataVos.getWorkId())) {
-                    medTaskQueue.setWorkId(String.valueOf(timestamp));
-                    structuralDataVos.setWorkId(String.valueOf(timestamp));
-                } else {
-                    medTaskQueue.setWorkId(structuralDataVos.getWorkId());
-                }
-                medTaskQueue.setHospitalId(structuralDataVos.getHospitalId());
-                medTaskQueue.setBehospitalCode(structuralDataVos.getBehospitalCode());
-                medTaskQueue.setPatientId(structuralDataVos.getPatientId());
-                medTaskQueue.setIsAnalyze("1");
-                medTaskQueue.setModeName(structuralDataVos.getModeName());
-                medTaskQueue.setParamIn(JSON.toJSONString(structuralDataVos));
-                medTaskQueue.setGmtCreate(new Date());
-                medTaskQueueFacade.save(medTaskQueue);
-                return true;
-            } else {
-                return false;
-            }
+        StructuralDataFacade task = new StructuralDataFacade(aMedAbnormalInfoFacade, behospitalInfoFacade);
+        long timestamp = System.currentTimeMillis(); //当前时间戳
+        MedTaskQueue medTaskQueue = new MedTaskQueue();
+        if (StringUtil.isBlank(structuralDataVos.getWorkId())) {
+            medTaskQueue.setWorkId(String.valueOf(timestamp));
+            structuralDataVos.setWorkId(String.valueOf(timestamp));
         } else {
-            return false;
+            medTaskQueue.setWorkId(structuralDataVos.getWorkId());
         }
+        medTaskQueue.setHospitalId(structuralDataVos.getHospitalId());
+        medTaskQueue.setBehospitalCode(structuralDataVos.getBehospitalCode());
+        medTaskQueue.setPatientId(structuralDataVos.getPatientId());
+        medTaskQueue.setIsAnalyze("1");
+        medTaskQueue.setModeName(structuralDataVos.getModeName());
+        medTaskQueue.setParamIn(JSON.toJSONString(structuralDataVos));
+        medTaskQueue.setGmtCreate(new Date());
+        // 尝试保存任务到数据库
+        boolean saved = medTaskQueueFacade.save(medTaskQueue);
+        //加入任务
+        if (saved) {
+            // 如果成功保存到数据库,尝试将任务添加到队列(这里假设taskQueue是线程安全的)
+            return taskQueue.offer(task);
+        }
+        return false;
+
     }
 
     public RespDTO<Map<String, Object>> sendStructuralDataTest(StructuralDataVo structuralDataVos, HashSet<String> modeIds) {

+ 16 - 4
src/main/java/com/diagbot/web/DataController.java

@@ -1,6 +1,7 @@
 package com.diagbot.web;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 
 import javax.validation.Valid;
 
@@ -146,15 +147,26 @@ public class DataController {
 //    @SysLogger("sendStructuralData")
 //    public RespDTO<Map<String, Object>> sendStructuralData(@Valid @RequestBody StructuralDataVo structuralDataVos) {
 //        return structuralDataFacade.sendStructuralData(structuralDataVos);
+//    }
+
+//    @ApiOperation(value = "单份病例保存并返回评分{厦门}")
+//    @PostMapping("/sendStructuralData")
+//    @SysLogger("sendStructuralData")
+//    public RespDTO<Boolean> sendStructuralData(@Valid @RequestBody StructuralDataVo structuralDataVos) {
+//        structuralDataFacade.startAsyncProcessing(structuralDataVos);
+//        Boolean data = structuralDataFacade.addToQueue(structuralDataVos);
+//        return RespDTO.onSuc(data);
 //    }
 
     @ApiOperation(value = "单份病例保存并返回评分{厦门}")
     @PostMapping("/sendStructuralData")
     @SysLogger("sendStructuralData")
-    public RespDTO<Boolean> sendStructuralData(@Valid @RequestBody StructuralDataVo structuralDataVos) {
-        structuralDataFacade.startAsyncProcessing(structuralDataVos);
-        Boolean data = structuralDataFacade.addToQueue(structuralDataVos);
-        return RespDTO.onSuc(data);
+    public CompletableFuture<RespDTO<Boolean>> sendStructuralData(@Valid @RequestBody StructuralDataVo structuralDataVos) {
+        return CompletableFuture.runAsync(() -> {
+            structuralDataFacade.startAsyncProcessing(structuralDataVos);
+            structuralDataFacade.addToQueue(structuralDataVos);
+            // 注意:这里可能不需要直接返回addToQueue的结果,因为操作是异步的
+        }).thenApply(v -> RespDTO.onSuc(true)); // 如果不需要具体的成功数据,可以返回null
     }
 
     @ApiOperation(value = "{历史病例导入单个病人}")

+ 2 - 2
src/main/resources/application-dev.yml

@@ -59,7 +59,7 @@ spring:
     druid:
       driver-class-name: com.mysql.cj.jdbc.Driver
       platform: mysql
-      url: jdbc:mysql://192.168.2.31/qc?serverTimezone=GMT%2B8&useUnicode=true&characterEncoding=utf8&characterSetResults=utf8&useSSL=false&allowMultiQueries=true
+      url: jdbc:mysql://173.18.12.191/qc?serverTimezone=GMT%2B8&useUnicode=true&characterEncoding=utf8&characterSetResults=utf8&useSSL=false&allowMultiQueries=true
       username: root
       password: dsYun8!@#
       # 连接池的配置信息
@@ -111,7 +111,7 @@ spring:
     database:
       cache: 8 # cache索引
       token: 8 # Token索引
-    host: 192.168.2.31  #Redis服务器地址
+    host: 173.18.12.191  #Redis服务器地址
     port: 6379 # Redis服务器连接端口(本地环境端口6378,其他环境端口是6379)
     password:  # Redis服务器连接密码(默认为空)
     lettuce: