Browse Source

厦门五院异步接口优化

wangsy 10 tháng trước cách đây
mục cha
commit
0a307061e8

+ 58 - 32
src/main/java/com/diagbot/facade/data/StructuralDataFacade.java

@@ -845,24 +845,49 @@ public class StructuralDataFacade {
     }
 
     public void startAsyncProcessingAnalyzeApi(AnalyzeRunVO analyzeRunVO) {
-        Thread workerThread = new Thread(() -> {
-            while (true) {
-                try {
-                    StructuralDataFacade take = taskQueueAnalyzeApi.take();
-                    if (take != null) {
-                        take.sendAnalyzers(analyzeRunVO, aMedAbnormalInfoFacade, behospitalInfoFacade);
-                        if (analyzeRunVO.getWorkId() != null) {
-                            UpdateWrapper<MedTaskQueue> medTaskQueueUpdate = new UpdateWrapper<>();
-                            medTaskQueueUpdate.eq("hospital_id", analyzeRunVO.getHospitalId()).eq("work_id", analyzeRunVO.getWorkId()).set("is_execute", IsDeleteEnum.Y.getKey()).set("gmt_modified", new Date());
-                            medTaskQueueFacade.update(medTaskQueueUpdate);
+//        Thread workerThread = new Thread(() -> {
+//            while (true) {
+//                try {
+//                    StructuralDataFacade take = taskQueueAnalyzeApi.take();
+//                    if (take != null) {
+//                        take.sendAnalyzers(analyzeRunVO, aMedAbnormalInfoFacade, behospitalInfoFacade);
+//                        if (analyzeRunVO.getWorkId() != null) {
+//                            UpdateWrapper<MedTaskQueue> medTaskQueueUpdate = new UpdateWrapper<>();
+//                            medTaskQueueUpdate.eq("hospital_id", analyzeRunVO.getHospitalId()).eq("work_id", analyzeRunVO.getWorkId()).set("is_execute", IsDeleteEnum.Y.getKey()).set("gmt_modified", new Date());
+//                            medTaskQueueFacade.update(medTaskQueueUpdate);
+//                        }
+//                    }
+//                } catch (InterruptedException e) {
+//                    Thread.currentThread().interrupt();
+//                }
+//            }
+//        });
+//        workerThread.start();
+        executor.submit(() -> {
+            try {
+                while (!Thread.currentThread().isInterrupted()) {
+                    try {
+                        // 等待队列中有任务
+                        StructuralDataFacade take = taskQueueAnalyzeApi.take();
+                        if (take != null) {
+//                            task.execute(); // 假设SomeTaskClass有一个execute方法来执行任务
+                            take.sendAnalyzers(analyzeRunVO, aMedAbnormalInfoFacade, behospitalInfoFacade);
+                            if (analyzeRunVO.getWorkId() != null) {
+                                UpdateWrapper<MedTaskQueue> medTaskQueueUpdate = new UpdateWrapper<>();
+                                medTaskQueueUpdate.eq("hospital_id", analyzeRunVO.getHospitalId()).eq("work_id", analyzeRunVO.getWorkId()).set("is_execute", IsDeleteEnum.Y.getKey()).set("gmt_modified", new Date());
+                                medTaskQueueFacade.update(medTaskQueueUpdate);
+                            }
                         }
+                    } catch (InterruptedException e) {
+                        // 处理中断,可能是需要关闭线程池
+                        Thread.currentThread().interrupt();
+                        break; // 退出循环
                     }
-                } catch (InterruptedException e) {
-                    Thread.currentThread().interrupt();
                 }
+            } finally {
+                // 在这里可以执行一些清理工作
             }
         });
-        workerThread.start();
     }
 
     public boolean addToQueueAnalyzeApi(AnalyzeRunVO analyzeRunVO) {
@@ -879,7 +904,6 @@ public class StructuralDataFacade {
                 .eq("is_analyze", "2")
                 .eq("is_execute", IsDeleteEnum.N.getKey())
         );
-        System.out.println("AnalyzeApi查询到数量为+++++++++++++++++++++++++++++:" + count);
         if (count > 0) {
             log.info("该病历信息已存在于任务队列中。。。。");
             return false;
@@ -888,26 +912,28 @@ public class StructuralDataFacade {
         StructuralDataFacade task = new StructuralDataFacade(aMedAbnormalInfoFacade, behospitalInfoFacade);
         long timestamp = System.currentTimeMillis(); //当前时间戳
         //加入任务
-        if (taskQueueAnalyzeApi.offer(task)) {
-            MedTaskQueue medTaskQueue = new MedTaskQueue();
-            if (StringUtil.isBlank(analyzeRunVO.getWorkId())) {
-                medTaskQueue.setWorkId(String.valueOf(timestamp));
-                analyzeRunVO.setWorkId(String.valueOf(timestamp));
-            } else {
-                medTaskQueue.setWorkId(analyzeRunVO.getWorkId());
-            }
-            medTaskQueue.setHospitalId(analyzeRunVO.getHospitalId());
-            medTaskQueue.setBehospitalCode(analyzeRunVO.getBehospitalCode());
-            medTaskQueue.setPatientId(analyzeRunVO.getPatientId());
-            medTaskQueue.setIsAnalyze("2");
-            medTaskQueue.setParamIn(JSON.toJSONString(analyzeRunVO));
-            medTaskQueue.setGmtCreate(new Date());
-            medTaskQueueFacade.save(medTaskQueue);
-
-            return true;
+        MedTaskQueue medTaskQueue = new MedTaskQueue();
+        if (StringUtil.isBlank(analyzeRunVO.getWorkId())) {
+            medTaskQueue.setWorkId(String.valueOf(timestamp));
+            analyzeRunVO.setWorkId(String.valueOf(timestamp));
         } else {
-            return false;
+            medTaskQueue.setWorkId(analyzeRunVO.getWorkId());
         }
+        medTaskQueue.setHospitalId(analyzeRunVO.getHospitalId());
+        medTaskQueue.setBehospitalCode(analyzeRunVO.getBehospitalCode());
+        medTaskQueue.setPatientId(analyzeRunVO.getPatientId());
+        medTaskQueue.setIsAnalyze("2");
+        medTaskQueue.setParamIn(JSON.toJSONString(analyzeRunVO));
+        medTaskQueue.setGmtCreate(new Date());
+        medTaskQueueFacade.save(medTaskQueue);
+        // 尝试保存任务到数据库
+        boolean saved = medTaskQueueFacade.save(medTaskQueue);
+        //加入任务
+        if (saved) {
+            // 如果成功保存到数据库,尝试将任务添加到队列(这里假设taskQueue是线程安全的)
+            return taskQueue.offer(task);
+        }
+        return false;
     }
 
     public StructuralDataFacade(AMedAbnormalInfoFacade aMedAbnormalInfoFacade, BehospitalInfoFacade behospitalInfoFacade) {

+ 10 - 4
src/main/java/com/diagbot/web/BehospitalInfoController.java

@@ -1,6 +1,7 @@
 package com.diagbot.web;
 
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 
 import javax.servlet.http.HttpServletResponse;
 import javax.validation.Valid;
@@ -142,10 +143,15 @@ public class BehospitalInfoController {
     @SysLogger("analyze_api")
     @PostMapping("/analyze_api")
     @ApiOperation(value = "评分-对外api接口[by:zhoutg]")
-    public RespDTO<Boolean> analyzeApi(@RequestBody AnalyzeRunVO analyzeRunVO) {
-        structuralDataFacade.startAsyncProcessingAnalyzeApi(analyzeRunVO);
-        Boolean data = structuralDataFacade.addToQueueAnalyzeApi(analyzeRunVO);
-        return RespDTO.onSuc(data);
+    public CompletableFuture<RespDTO<Boolean>> analyzeApi(@RequestBody AnalyzeRunVO analyzeRunVO) {
+//        structuralDataFacade.startAsyncProcessingAnalyzeApi(analyzeRunVO);
+//        Boolean data = structuralDataFacade.addToQueueAnalyzeApi(analyzeRunVO);
+//        return RespDTO.onSuc(data);
+        return CompletableFuture.runAsync(() -> {
+            structuralDataFacade.startAsyncProcessingAnalyzeApi(analyzeRunVO);
+            structuralDataFacade.addToQueueAnalyzeApi(analyzeRunVO);
+            // 注意:这里可能不需要直接返回addToQueue的结果,因为操作是异步的
+        }).thenApply(v -> RespDTO.onSuc(true)); // 如果不需要具体的成功数据,可以返回null
     }
 
     @ApiOperation(value = "新增质控条目[by:zhoutg]")