Просмотр исходного кода

Merge remote-tracking branch 'origin/master'

hasan лет назад: 4
Родитель
Сommit
befddb5d15

+ 15 - 0
pom.xml

@@ -67,6 +67,21 @@
67 67
             <version>RELEASE</version>
68 68
             <scope>compile</scope>
69 69
         </dependency>
70
+
71
+        <!-- mqtt支持 start -->
72
+        <dependency>
73
+            <groupId>org.springframework.boot</groupId>
74
+            <artifactId>spring-boot-starter-integration</artifactId>
75
+        </dependency>
76
+        <dependency>
77
+            <groupId>org.springframework.integration</groupId>
78
+            <artifactId>spring-integration-stream</artifactId>
79
+        </dependency>
80
+        <dependency>
81
+            <groupId>org.springframework.integration</groupId>
82
+            <artifactId>spring-integration-mqtt</artifactId>
83
+        </dependency>
84
+        <!-- mqtt支持 end -->
70 85
     </dependencies>
71 86
 
72 87
     <build>

+ 5 - 3
src/main/java/com/sjls/nstthh/NstthhApplication.java

@@ -5,6 +5,8 @@ import com.sjls.nstthh.usb.NRJavaSerialService;
5 5
 import org.springframework.boot.SpringApplication;
6 6
 import org.springframework.boot.autoconfigure.SpringBootApplication;
7 7
 
8
+import java.util.Arrays;
9
+
8 10
 
9 11
 @SpringBootApplication
10 12
 
@@ -17,14 +19,14 @@ public class NstthhApplication {
17 19
 //        for (String s: NRSerialPort.getAvailableSerialPorts()){
18 20
 //            System.out.println("Availible port = " + s);
19 21
 //        }
20
-
21
-        try {
22
+        //System.err.println("来了没有");
23
+        /*try {
22 24
             NRJavaSerialService nRJavaSerialService = new NRJavaSerialService();
23 25
             nRJavaSerialService.connect("/dev/ttyUSB0");// /dev/ttyUSB0
24 26
 
25 27
         } catch (Exception e) {
26 28
             e.printStackTrace();
27
-        }
29
+        }*/
28 30
 
29 31
     }
30 32
 

+ 11 - 0
src/main/java/com/sjls/nstthh/controller/HardwareAdmin.java

@@ -1,8 +1,11 @@
1 1
 package com.sjls.nstthh.controller;
2 2
 
3
+import com.sjls.nstthh.mqtt.MsgWriter;
3 4
 import org.springframework.web.bind.annotation.RequestMapping;
4 5
 import org.springframework.web.bind.annotation.RestController;
5 6
 
7
+import javax.annotation.Resource;
8
+
6 9
 /**
7 10
  * smp admin
8 11
  */
@@ -10,10 +13,18 @@ import org.springframework.web.bind.annotation.RestController;
10 13
 @RestController
11 14
 public class HardwareAdmin {
12 15
 
16
+    @Resource
17
+    private MsgWriter msgWriter;
18
+
13 19
     @RequestMapping("/hello")
14 20
     public String index() {
15 21
         return "Hello World";
16 22
     }
17 23
 
24
+    @RequestMapping(value = "/testmqtt")
25
+    public void test_mqtt(String msg) {
26
+        msgWriter.sendToMqtt(msg);
27
+    }
28
+
18 29
 }
19 30
 

+ 164 - 0
src/main/java/com/sjls/nstthh/mqtt/MqttSenderConfig.java

@@ -0,0 +1,164 @@
1
+package com.sjls.nstthh.mqtt;
2
+
3
+import lombok.extern.slf4j.Slf4j;
4
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
5
+import org.slf4j.Logger;
6
+import org.slf4j.LoggerFactory;
7
+import org.springframework.beans.factory.annotation.Value;
8
+import org.springframework.context.annotation.Bean;
9
+import org.springframework.context.annotation.Configuration;
10
+import org.springframework.integration.annotation.IntegrationComponentScan;
11
+import org.springframework.integration.annotation.ServiceActivator;
12
+import org.springframework.integration.channel.DirectChannel;
13
+import org.springframework.integration.core.MessageProducer;
14
+import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
15
+import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
16
+import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
17
+import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
18
+import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
19
+import org.springframework.messaging.Message;
20
+import org.springframework.messaging.MessageChannel;
21
+import org.springframework.messaging.MessageHandler;
22
+
23
+@Configuration
24
+@IntegrationComponentScan
25
+@Slf4j
26
+public class MqttSenderConfig {
27
+
28
+    private static final Logger LOGGER = LoggerFactory.getLogger(MqttSenderConfig.class);
29
+
30
+    @Value("${spring.mqtt.username}")
31
+    private String username;
32
+
33
+    @Value("${spring.mqtt.password}")
34
+    private String password;
35
+
36
+    @Value("${spring.mqtt.url}")
37
+    private String hostUrl;
38
+
39
+    @Value("${spring.mqtt.publishclientid}")
40
+    private String publishclientid;
41
+
42
+    @Value("${spring.mqtt.subsribeclientid}")
43
+    private String subsribeclientid;
44
+
45
+    @Value("${spring.mqtt.default_topic}")
46
+    private String defaultTopic;
47
+
48
+    /**
49
+     * 订阅的bean名称
50
+     */
51
+    public static final String CHANNEL_NAME_IN = "mqttInboundChannel";
52
+    /**
53
+     * 发布的bean名称
54
+     */
55
+    public static final String CHANNEL_NAME_OUT = "mqttOutboundChannel";
56
+
57
+//    @Autowired
58
+//    private MqttCallbackHandler mqttCallbackHandler;
59
+
60
+    /**
61
+     * MQTT连接器选项
62
+     */
63
+    @Bean
64
+    public <hostUrl> MqttConnectOptions getMqttConnectOptions() {
65
+        MqttConnectOptions options = new MqttConnectOptions();
66
+        // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,
67
+        // 这里设置为true表示每次连接到服务器都以新的身份连接
68
+        options.setCleanSession(false);
69
+        // 设置连接的用户名
70
+        options.setUserName(username);
71
+        // 设置连接的密码
72
+        options.setPassword(password.toCharArray());
73
+        options.setServerURIs(new String[]{hostUrl});
74
+        // 设置超时时间 单位为秒
75
+        options.setConnectionTimeout(10);
76
+        // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线,但这个方法并没有重连的机制
77
+        options.setKeepAliveInterval(20);
78
+        // 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。
79
+        //options.setWill("willTopic", WILL_DATA, 2, false);
80
+        return options;
81
+    }
82
+
83
+
84
+    /**
85
+     * MQTT客户端
86
+     */
87
+    @Bean
88
+    public MqttPahoClientFactory mqttClientFactory() {
89
+        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
90
+        factory.setConnectionOptions(getMqttConnectOptions());
91
+        return factory;
92
+    }
93
+
94
+    /**
95
+     * MQTT信息通道(生产者)
96
+     */
97
+    @Bean(name = CHANNEL_NAME_OUT)
98
+    public MessageChannel mqttOutboundChannel() {
99
+        return new DirectChannel();
100
+    }
101
+
102
+    /**
103
+     * MQTT消息处理器(生产者)
104
+     */
105
+    @Bean
106
+    @ServiceActivator(inputChannel = CHANNEL_NAME_OUT)
107
+    public MessageHandler mqttOutbound() {
108
+        MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(
109
+                publishclientid, mqttClientFactory());
110
+        messageHandler.setAsync(true);
111
+        //if (null == topic) topic = defaultTopic;
112
+        messageHandler.setDefaultTopic(defaultTopic);
113
+        return messageHandler;
114
+    }
115
+
116
+    /**
117
+     * MQTT消息订阅绑定(消费者)
118
+     */
119
+    @Bean
120
+    public MessageProducer inbound() {
121
+        // 可以同时消费(订阅)多个Topic
122
+        MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
123
+                subsribeclientid, mqttClientFactory(), new String[]{defaultTopic});
124
+        adapter.setCompletionTimeout(5000);
125
+        adapter.setConverter(new DefaultPahoMessageConverter());
126
+        adapter.setQos(2);
127
+        // 设置订阅通道
128
+        adapter.setOutputChannel(mqttInboundChannel());
129
+        return adapter;
130
+    }
131
+
132
+    /**
133
+     * MQTT信息通道(消费者)
134
+     */
135
+    @Bean(name = CHANNEL_NAME_IN)
136
+    public MessageChannel mqttInboundChannel() {
137
+        return new DirectChannel();
138
+    }
139
+
140
+    /**
141
+     * MQTT消息处理器(消费者)
142
+     */
143
+    @Bean
144
+    @ServiceActivator(inputChannel = CHANNEL_NAME_IN)
145
+    public MessageHandler handler() {
146
+        return new MessageHandler() {
147
+            @Override
148
+            public void handleMessage(Message<?> message)  {
149
+                try {
150
+                    String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
151
+                    String qos = message.getHeaders().get("mqtt_receivedQos").toString();
152
+                    String payload = message.getPayload().toString();
153
+                    //mqttCallbackHandler.handle(topic,payload);
154
+                    LOGGER.info("主题:"+topic);
155
+                    LOGGER.info("内容:"+payload);
156
+                    LOGGER.info("级别:"+qos);
157
+                    //message.get
158
+                } catch (Exception e) {
159
+                    LOGGER.error(e.getMessage(), e);
160
+                }
161
+            }
162
+        };
163
+    }
164
+}

+ 16 - 0
src/main/java/com/sjls/nstthh/mqtt/MsgWriter.java

@@ -0,0 +1,16 @@
1
+package com.sjls.nstthh.mqtt;
2
+
3
+import org.springframework.integration.annotation.MessagingGateway;
4
+import org.springframework.integration.mqtt.support.MqttHeaders;
5
+import org.springframework.messaging.handler.annotation.Header;
6
+import org.springframework.stereotype.Component;
7
+
8
+@Component
9
+@MessagingGateway(defaultRequestChannel = MqttSenderConfig.CHANNEL_NAME_OUT)
10
+public interface MsgWriter {
11
+
12
+    void sendToMqtt(String data);
13
+    void sendToMqtt(String payload,@Header(MqttHeaders.TOPIC) String topic);
14
+    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
15
+
16
+}

+ 17 - 1
src/main/resources/application.yml

@@ -23,4 +23,20 @@ spring:
23 23
   h2:
24 24
     console:
25 25
       path: /h2-console
26
-      enabled: true
26
+      enabled: true
27
+
28
+  mqtt:
29
+    #MQTT-服务端账号
30
+    username: admin
31
+    #MQTT-服务端密码
32
+    password: public
33
+    #MQTT-服务端地址
34
+    url: tcp://192.168.50.169:1883
35
+    #MQTT-发布者clientid
36
+    publishclientid: client_publish
37
+    #MQTT-接收者clientid
38
+    subsribeclientid: client_subsribe
39
+    #MQTT-默认主题
40
+    default_topic: test
41
+
42
+