Преглед на файлове

消息服务添加即时插件

rengb преди 3 години
родител
ревизия
7a3d30a85c

+ 6 - 0
common/pom.xml

@@ -94,6 +94,12 @@
             <version>${mybatis-plus.version}</version>
             <optional>true</optional>
         </dependency>
+        <!--SpringBoot配置处理器-->
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-configuration-processor</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
 </project>

+ 4 - 1
gateway-service/src/main/resources/bootstrap.yml

@@ -85,4 +85,7 @@ secure:
       - "/actuator/**"
       - "/user-auth/oauth/token"
       - "/user-auth/rsa/publicKey"
-      - "/security-center/userManage/login"
+      - "/security-center/userManage/login"
+      - "/message-service/pageSample/sample"
+      - "/message-service/backSample/sendToDefaultTopic"
+      - "/message-service/backSample/sendToTopic"

+ 8 - 0
message-service/pom.xml

@@ -26,6 +26,14 @@
             <groupId>org.springframework.boot</groupId>
             <artifactId>spring-boot-starter-amqp</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.springframework.integration</groupId>
+            <artifactId>spring-integration-mqtt</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-thymeleaf</artifactId>
+        </dependency>
     </dependencies>
 
     <build>

+ 34 - 0
message-service/src/main/java/com/lantone/message/config/MqttConfig.java

@@ -0,0 +1,34 @@
+package com.lantone.message.config;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.stereotype.Component;
+
+/**
+ * @Description: MQTT相关配置
+ * @author: rengb
+ * @time: 2021/1/5 18:27
+ */
+@Data
+@EqualsAndHashCode(callSuper = false)
+@Component
+@ConfigurationProperties(prefix = "rabbitmq.mqtt")
+public class MqttConfig {
+
+    /**
+     * RabbitMQ的MQTT默认topic
+     */
+    private String defaultTopic;
+
+    /**
+     * RabbitMQ的MQTT连接地址
+     */
+    private String url;
+
+    /**
+     * RabbitMQ的MQTT前端页面连接地址
+     */
+    private String frontUrl;
+
+}

+ 59 - 0
message-service/src/main/java/com/lantone/message/config/MqttInboundConfig.java

@@ -0,0 +1,59 @@
+package com.lantone.message.config;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.integration.annotation.ServiceActivator;
+import org.springframework.integration.channel.DirectChannel;
+import org.springframework.integration.core.MessageProducer;
+import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
+import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.MessageChannel;
+import org.springframework.messaging.MessageHandler;
+import org.springframework.messaging.MessagingException;
+
+/**
+ * @Description: (默认主题)即时消息订阅者相关配置
+ * @author: rengb
+ * @time: 2021/1/5 18:27
+ */
+@Slf4j
+@Configuration
+public class MqttInboundConfig {
+
+    @Autowired
+    private MqttConfig mqttConfig;
+
+    @Bean
+    public MessageChannel mqttInputChannel() {
+        return new DirectChannel();
+    }
+
+    @Bean
+    public MessageProducer inbound() {
+        MqttPahoMessageDrivenChannelAdapter adapter =
+                new MqttPahoMessageDrivenChannelAdapter(mqttConfig.getUrl(), "subscriberClient",
+                        mqttConfig.getDefaultTopic());
+        adapter.setCompletionTimeout(5000);
+        adapter.setConverter(new DefaultPahoMessageConverter());
+        //设置消息质量:0->至多一次;1->至少一次;2->只有一次(框架暂不支持)
+        adapter.setQos(1);
+        adapter.setOutputChannel(mqttInputChannel());
+        return adapter;
+    }
+
+    @Bean
+    @ServiceActivator(inputChannel = "mqttInputChannel")
+    public MessageHandler handler() {
+        return new MessageHandler() {
+            @Override
+            public void handleMessage(Message<?> message) throws MessagingException {
+                //处理订阅消息  此处可作为业务处理的入口
+                log.info("handleMessage : {}", message.getPayload());
+            }
+        };
+    }
+
+}

+ 60 - 0
message-service/src/main/java/com/lantone/message/config/MqttOutboundConfig.java

@@ -0,0 +1,60 @@
+package com.lantone.message.config;
+
+import lombok.extern.slf4j.Slf4j;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.integration.annotation.ServiceActivator;
+import org.springframework.integration.channel.DirectChannel;
+import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
+import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
+import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.MessageChannel;
+import org.springframework.messaging.MessageHandler;
+
+/**
+ * @Description: (服务端)即时消息发布者相关配置
+ * @author: rengb
+ * @time: 2021/1/5 18:27
+ */
+@Slf4j
+@Configuration
+public class MqttOutboundConfig {
+
+    @Autowired
+    private MqttConfig mqttConfig;
+
+    @Bean
+    public MqttPahoClientFactory mqttClientFactory() {
+        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
+        MqttConnectOptions options = new MqttConnectOptions();
+        options.setServerURIs(new String[] { mqttConfig.getUrl() });
+        factory.setConnectionOptions(options);
+        return factory;
+    }
+
+    @Bean
+    @ServiceActivator(inputChannel = "mqttOutboundChannel")
+    public MessageHandler mqttOutbound() {
+        MqttPahoMessageHandler messageHandler =
+                new MqttPahoMessageHandler("publisherClient", mqttClientFactory()) {
+                    @Override
+                    public void handleMessage(Message<?> message) {
+                        super.handleMessage(message);
+                        //处理发布消息  此处可作为业务处理的入口
+                        log.info("handleMessage : {}", message.getPayload());
+                    }
+                };
+        messageHandler.setAsync(true);
+        messageHandler.setDefaultTopic(mqttConfig.getDefaultTopic());
+        return messageHandler;
+    }
+
+    @Bean
+    public MessageChannel mqttOutboundChannel() {
+        return new DirectChannel();
+    }
+
+}

+ 32 - 0
message-service/src/main/java/com/lantone/message/gateway/MqttGateway.java

@@ -0,0 +1,32 @@
+package com.lantone.message.gateway;
+
+import org.springframework.integration.annotation.MessagingGateway;
+import org.springframework.integration.mqtt.support.MqttHeaders;
+import org.springframework.messaging.handler.annotation.Header;
+import org.springframework.stereotype.Component;
+
+/**
+ * @Description: MQTT网关,通过接口将数据传递到集成流
+ * @author: rengb
+ * @time: 2021/1/5 18:27
+ */
+@Component
+@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
+public interface MqttGateway {
+
+    /**
+     * 发送消息到默认topic
+     */
+    void sendToMqtt(String payload);
+
+    /**
+     * 发送消息到指定topic
+     */
+    void sendToMqtt(String payload, @Header(MqttHeaders.TOPIC) String topic);
+
+    /**
+     * 发送消息到指定topic并设置QOS
+     */
+    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
+
+}

+ 39 - 0
message-service/src/main/java/com/lantone/message/web/BackSampleController.java

@@ -0,0 +1,39 @@
+package com.lantone.message.web;
+
+import com.lantone.common.api.CommonResult;
+import com.lantone.message.gateway.MqttGateway;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+/**
+ * @Description: 即时通讯后端示例
+ * @author: rengb
+ * @time: 2021/1/5 18:27
+ */
+@RestController
+@Api(value = "即时通讯后端示例", tags = { "即时通讯后端示例" })
+@RequestMapping("/backSample")
+public class BackSampleController {
+
+    @Autowired
+    private MqttGateway mqttGateway;
+
+    @PostMapping("/sendToDefaultTopic")
+    @ApiOperation("向默认主题发送消息")
+    public CommonResult sendToDefaultTopic(String payload) {
+        mqttGateway.sendToMqtt(payload);
+        return CommonResult.success(null);
+    }
+
+    @PostMapping("/sendToTopic")
+    @ApiOperation("向指定主题发送消息")
+    public CommonResult sendToTopic(String payload, String topic) {
+        mqttGateway.sendToMqtt(payload, topic);
+        return CommonResult.success(null);
+    }
+
+}

+ 30 - 0
message-service/src/main/java/com/lantone/message/web/PageSampleController.java

@@ -0,0 +1,30 @@
+package com.lantone.message.web;
+
+import com.lantone.message.config.MqttConfig;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Controller;
+import org.springframework.ui.ModelMap;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RequestMapping;
+import springfox.documentation.annotations.ApiIgnore;
+
+/**
+ * @Description: 即时通讯页面示例
+ * @author: rengb
+ * @time: 2021/1/5 18:27
+ */
+@ApiIgnore
+@Controller
+@RequestMapping("/pageSample")
+public class PageSampleController {
+
+    @Autowired
+    private MqttConfig mqttConfig;
+
+    @GetMapping("/sample")
+    public String index(ModelMap model) {
+        model.addAttribute("mqtt_url", mqttConfig.getFrontUrl());
+        return "sample";
+    }
+
+}

+ 2 - 0
message-service/src/main/java/com/lantone/message/web/RedisRefreshController.java

@@ -7,12 +7,14 @@ import io.swagger.annotations.ApiOperation;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.web.bind.annotation.PostMapping;
 import org.springframework.web.bind.annotation.RestController;
+import springfox.documentation.annotations.ApiIgnore;
 
 /**
  * @Description: redis刷新处理API
  * @author: rengb
  * @time: 2021/1/5 18:27
  */
+@ApiIgnore
 @RestController
 @Api(value = "redis刷新处理API", tags = { "redis刷新处理API" })
 public class RedisRefreshController {

+ 2 - 0
message-service/src/main/java/com/lantone/message/web/SyslogHandleController.java

@@ -11,12 +11,14 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.web.bind.annotation.PostMapping;
 import org.springframework.web.bind.annotation.RequestBody;
 import org.springframework.web.bind.annotation.RestController;
+import springfox.documentation.annotations.ApiIgnore;
 
 /**
  * @Description: 系统相关日志处理API
  * @author: rengb
  * @time: 2021/1/5 18:27
  */
+@ApiIgnore
 @RestController
 @Api(value = "系统相关日志处理API", tags = { "系统相关日志处理API" })
 public class SyslogHandleController {

+ 8 - 0
message-service/src/main/resources/bootstrap.yml

@@ -3,6 +3,8 @@ server:
 spring:
   application:
     name: message-service
+  thymeleaf:
+    cache: false
   redis:
     database: 12
     host: 192.168.2.236 # Redis服务器地址
@@ -67,6 +69,12 @@ management:
     health:
       show-details: always
 
+rabbitmq:
+  mqtt:
+    url: tcp://192.168.2.236:1883
+    front-url: ws://192.168.2.236:15675/ws
+    defaultTopic: defaultSampleTopic
+
 swagger.title: 消息管理服务
 swagger.des: 消息管理服务
 swagger.version: 0.0.1-SNAPSHOT

Файловите разлики са ограничени, защото са твърде много
+ 1 - 0
message-service/src/main/resources/public/mqtt.min.js


+ 70 - 0
message-service/src/main/resources/templates/sample.html

@@ -0,0 +1,70 @@
+<!DOCTYPE html>
+<html lang="en">
+<head>
+    <meta charset="UTF-8">
+    <title>Title</title>
+</head>
+<body>
+<div>
+    <label>目标Topic:<input id="targetTopicInput" type="text"></label><br>
+    <label>发送消息:<input id="messageInput" type="text"></label><br>
+    <button onclick="sendMessage()">发送</button>
+    <button onclick="clearMessage()">清空</button>
+    <div id="messageDiv"></div>
+</div>
+</body>
+<script src="../mqtt.min.js"></script>
+<script>
+    //RabbitMQ的web-mqtt连接地址
+    const url = '[[${mqtt_url}]]';
+    //获取订阅的topic
+    const topic = getQueryString("topic");
+    //连接到消息队列
+    let client = mqtt.connect(url);
+    client.on('connect', function () {
+        //连接成功后订阅topic
+        client.subscribe(topic, function (err) {
+            if (!err) {
+                showMessage("订阅topic:" + topic + "成功!");
+            }
+        });
+    });
+    //获取订阅topic中的消息
+    client.on('message', function (topic, message) {
+        showMessage("收到消息:" + message.toString());
+    });
+
+    //发送消息
+    function sendMessage() {
+        let targetTopic = document.getElementById("targetTopicInput").value;
+        let message = document.getElementById("messageInput").value;
+        //向目标topic中发送消息
+        client.publish(targetTopic, message);
+        showMessage("发送消息给" + targetTopic + "的消息:" + message);
+    }
+
+    //从URL中获取参数
+    function getQueryString(name) {
+        let reg = new RegExp("(^|&)" + name + "=([^&]*)(&|$)", "i");
+        let r = window.location.search.substr(1).match(reg);
+        if (r != null) {
+            return decodeURIComponent(r[2]);
+        }
+        return null;
+    }
+
+    //在消息列表中展示消息
+    function showMessage(message) {
+        let messageDiv = document.getElementById("messageDiv");
+        let messageEle = document.createElement("div");
+        messageEle.innerText = message;
+        messageDiv.appendChild(messageEle);
+    }
+
+    //清空消息列表
+    function clearMessage() {
+        let messageDiv = document.getElementById("messageDiv");
+        messageDiv.innerHTML = "";
+    }
+</script>
+</html>