Browse Source

mq 上行

hanqingsong 3 years ago
parent
commit
b4c86da1da

+ 13 - 11
depot-intelligent/pom.xml

@@ -16,22 +16,24 @@
16
     <parent>
16
     <parent>
17
         <groupId>org.springframework.boot</groupId>
17
         <groupId>org.springframework.boot</groupId>
18
         <artifactId>spring-boot-starter-parent</artifactId>
18
         <artifactId>spring-boot-starter-parent</artifactId>
19
-        <version>2.0.2.RELEASE</version>
19
+        <version>2.3.12.RELEASE</version>
20
         <relativePath/> <!-- lookup parent from repository -->
20
         <relativePath/> <!-- lookup parent from repository -->
21
     </parent>
21
     </parent>
22
-    <distributionManagement>
22
+    <!--<distributionManagement>
23
         <repository>
23
         <repository>
24
             <id>nexus</id>
24
             <id>nexus</id>
25
-            <!--名称自己定义-->
25
+            &lt;!&ndash;名称自己定义&ndash;&gt;
26
             <name>release</name>
26
             <name>release</name>
27
             <url>http://10.10.1.25:8081/repository/host_repository/</url>
27
             <url>http://10.10.1.25:8081/repository/host_repository/</url>
28
         </repository>
28
         </repository>
29
-    </distributionManagement>
29
+    </distributionManagement>-->
30
     <properties>
30
     <properties>
31
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
31
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
32
         <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
32
         <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
33
+        <skipTests>true</skipTests>
33
         <java.version>1.8</java.version>
34
         <java.version>1.8</java.version>
34
-        <spring-cloud.version>Finchley.RELEASE</spring-cloud.version>
35
+        <spring-cloud.version>Hoxton.SR12</spring-cloud.version>
36
+        <rocketmq-starter.version>2.2.1</rocketmq-starter.version>
35
     </properties>
37
     </properties>
36
 
38
 
37
     <dependencies>
39
     <dependencies>
@@ -141,12 +143,6 @@
141
             <artifactId>quartz</artifactId>
143
             <artifactId>quartz</artifactId>
142
             <version>2.3.0</version>
144
             <version>2.3.0</version>
143
         </dependency>
145
         </dependency>
144
-
145
-        <dependency>
146
-            <groupId>org.springframework.boot</groupId>
147
-            <artifactId>spring-boot-starter-test</artifactId>
148
-        </dependency>
149
-
150
         <dependency>
146
         <dependency>
151
             <groupId>org.smslib</groupId>
147
             <groupId>org.smslib</groupId>
152
             <artifactId>smslib</artifactId>
148
             <artifactId>smslib</artifactId>
@@ -176,6 +172,12 @@
176
             <groupId>org.springframework.session</groupId>
172
             <groupId>org.springframework.session</groupId>
177
             <artifactId>spring-session-data-redis</artifactId>
173
             <artifactId>spring-session-data-redis</artifactId>
178
         </dependency>
174
         </dependency>
175
+        <!-- rocketmq -->
176
+        <dependency>
177
+            <groupId>org.apache.rocketmq</groupId>
178
+            <artifactId>rocketmq-spring-boot-starter</artifactId>
179
+            <version>${rocketmq-starter.version}</version>
180
+        </dependency>
179
     </dependencies>
181
     </dependencies>
180
 
182
 
181
     <dependencyManagement>
183
     <dependencyManagement>

+ 9 - 7
depot-intelligent/src/main/java/com/chinaitop/depot/intelligent/pushs/service/impl/GrainPushsServiceImpl.java

@@ -12,6 +12,8 @@ import com.chinaitop.depot.intelligent.grainsituation.model.TWarningThresholdHis
12
 import com.chinaitop.depot.intelligent.grainsituation.service.TWarningThresholdHistoryService;
12
 import com.chinaitop.depot.intelligent.grainsituation.service.TWarningThresholdHistoryService;
13
 import com.chinaitop.depot.intelligent.grainsituation.service.TemperatureRecordService;
13
 import com.chinaitop.depot.intelligent.grainsituation.service.TemperatureRecordService;
14
 import com.chinaitop.depot.intelligent.pushs.service.GrainPushsService;
14
 import com.chinaitop.depot.intelligent.pushs.service.GrainPushsService;
15
+import com.chinaitop.depot.intelligent.rocketMQ.producerMQ.model.MqMsg;
16
+import com.chinaitop.depot.intelligent.rocketMQ.producerMQ.service.RocketMqService;
15
 import com.chinaitop.depot.intelligent.utils.JsonToObjectUtils;
17
 import com.chinaitop.depot.intelligent.utils.JsonToObjectUtils;
16
 import com.chinaitop.depot.intelligent.utils.UuidUtils;
18
 import com.chinaitop.depot.intelligent.utils.UuidUtils;
17
 import com.chinaitop.depot.intelligent.utils.VerificationSourceUtils;
19
 import com.chinaitop.depot.intelligent.utils.VerificationSourceUtils;
@@ -38,27 +40,22 @@ public class GrainPushsServiceImpl implements GrainPushsService {
38
 
40
 
39
     @Resource
41
     @Resource
40
     private TemperatureRecordService temperatureRecordService;
42
     private TemperatureRecordService temperatureRecordService;
41
-
42
     @Resource
43
     @Resource
43
     private TTestdataLayerMapper tTestdataLayerMapper;
44
     private TTestdataLayerMapper tTestdataLayerMapper;
44
-
45
     @Resource
45
     @Resource
46
     private WarningThresholdService warningThresholdService;
46
     private WarningThresholdService warningThresholdService;
47
-
48
     @Resource
47
     @Resource
49
     private TWarningThresholdHistoryMapper tWarningThresholdHistoryMapper;
48
     private TWarningThresholdHistoryMapper tWarningThresholdHistoryMapper;
50
-
51
     @Resource
49
     @Resource
52
     private UuidUtils uuidUtils;
50
     private UuidUtils uuidUtils;
53
-
54
     @Resource
51
     @Resource
55
     private WarningMessage warningMessage;
52
     private WarningMessage warningMessage;
56
-
57
     @Resource
53
     @Resource
58
     private ResolutionToSendAlarmService resolutionToSendAlarmService;
54
     private ResolutionToSendAlarmService resolutionToSendAlarmService;
59
-
60
     @Resource
55
     @Resource
61
     private TWarningThresholdHistoryService tWarningThresholdHistoryService;
56
     private TWarningThresholdHistoryService tWarningThresholdHistoryService;
57
+    @Resource
58
+    private RocketMqService rocketMqService;
62
 
59
 
63
     @Transactional(rollbackFor = Exception.class)
60
     @Transactional(rollbackFor = Exception.class)
64
     @Override
61
     @Override
@@ -282,18 +279,23 @@ public class GrainPushsServiceImpl implements GrainPushsService {
282
 
279
 
283
                 if (tTestdataList.size() > 0) {
280
                 if (tTestdataList.size() > 0) {
284
                     temperatureRecordService.insertBatch(tTestdataList);
281
                     temperatureRecordService.insertBatch(tTestdataList);
282
+                    // 消息 CRUD 增加(Create)、* 不需要检索(Retrieve)、更新(Update)和删除(Delete) 小写区分增删改查
283
+                    rocketMqService.asyncSend(new MqMsg("smart_grp_yn", "t_testdata_c", null, tTestdataList));
285
                 }
284
                 }
286
                 if (tTestdataLayerArrayList.size() > 0) {
285
                 if (tTestdataLayerArrayList.size() > 0) {
287
                     tTestdataLayerMapper.insertBatch(tTestdataLayerArrayList);
286
                     tTestdataLayerMapper.insertBatch(tTestdataLayerArrayList);
287
+                    rocketMqService.asyncSend(new MqMsg("smart_grp_yn", "t_testdata_layer_c", null, tTestdataLayerArrayList));
288
                 }
288
                 }
289
                 if (tWarningThresholdHistorieList.size() > 0) {
289
                 if (tWarningThresholdHistorieList.size() > 0) {
290
                     tWarningThresholdHistoryMapper.insertBatch(tWarningThresholdHistorieList);
290
                     tWarningThresholdHistoryMapper.insertBatch(tWarningThresholdHistorieList);
291
+                    rocketMqService.asyncSend(new MqMsg("smart_grp_yn", "t_warning_threshold_history_c", null, tWarningThresholdHistorieList));
291
                     List<TWarningThresholdHistory> collect = tWarningThresholdHistorieList.stream().filter(TWarningThresholdHistory -> TWarningThresholdHistory.getwWarning().equals("0")).collect(Collectors.toList());
292
                     List<TWarningThresholdHistory> collect = tWarningThresholdHistorieList.stream().filter(TWarningThresholdHistory -> TWarningThresholdHistory.getwWarning().equals("0")).collect(Collectors.toList());
292
                     boolean alarmDetails = resolutionToSendAlarmService.getAlarmDetails(collect);
293
                     boolean alarmDetails = resolutionToSendAlarmService.getAlarmDetails(collect);
293
                     if (!alarmDetails) {
294
                     if (!alarmDetails) {
294
                         collect.forEach(obj -> {
295
                         collect.forEach(obj -> {
295
                             obj.setIsSend("2");
296
                             obj.setIsSend("2");
296
                             tWarningThresholdHistoryService.updateById(obj);
297
                             tWarningThresholdHistoryService.updateById(obj);
298
+                            rocketMqService.asyncSend(new MqMsg("smart_grp_yn", "t_warning_threshold_history_u", null, tWarningThresholdHistorieList));
297
                         });
299
                         });
298
                     }
300
                     }
299
                 }
301
                 }

+ 42 - 0
depot-intelligent/src/main/java/com/chinaitop/depot/intelligent/rocketMQ/producerMQ/model/MqMsg.java

@@ -0,0 +1,42 @@
1
+package com.chinaitop.depot.intelligent.rocketMQ.producerMQ.model;
2
+
3
+import lombok.Data;
4
+import javax.validation.constraints.NotNull;
5
+import java.io.Serializable;
6
+
7
+/**
8
+ * @author qingsong.han
9
+ * @description:
10
+ * @create 2022-03-23 13:59
11
+ */
12
+@Data
13
+public class MqMsg implements Serializable {
14
+    private static final long serialVersionUID = -2023812638050201776L;
15
+
16
+    public MqMsg(@NotNull String topic, String tags, String transactionName, Object content) {
17
+        this.topic = topic;
18
+        this.tags = tags;
19
+        this.transactionName = transactionName;
20
+        this.content = content;
21
+    }
22
+
23
+    /**
24
+     * 一级消息:消息topic(topic为消息的主题)
25
+     */
26
+    @NotNull
27
+
28
+    private String topic;
29
+    /**
30
+     * 二级消息:消息topic对应的tags
31
+     */
32
+    private String tags;
33
+    /**
34
+     * 事务名称 为空默认 rocketmq_transaction_default_global_name
35
+     */
36
+    private String transactionName;
37
+    /**
38
+     * 消息内容
39
+     */
40
+    private Object content;
41
+
42
+}

+ 21 - 0
depot-intelligent/src/main/java/com/chinaitop/depot/intelligent/rocketMQ/producerMQ/service/RocketMqService.java

@@ -0,0 +1,21 @@
1
+package com.chinaitop.depot.intelligent.rocketMQ.producerMQ.service;
2
+
3
+import com.chinaitop.depot.intelligent.rocketMQ.producerMQ.model.MqMsg;
4
+
5
+/**
6
+ * @author qingsong.han
7
+ * @description: send Rocket MQ
8
+ * @create 2022-03-23 13:45
9
+ */
10
+public interface RocketMqService {
11
+
12
+    /**
13
+     * 异步发送消息,异步返回消息结果 smart_grp_sx
14
+     *
15
+     * 当发送的消息很重要,且对响应时间非常敏感的时候采用async方式
16
+     *
17
+     * @param mqMsg 发送消息实体类
18
+     */
19
+    void asyncSend(MqMsg mqMsg);
20
+
21
+}

+ 53 - 0
depot-intelligent/src/main/java/com/chinaitop/depot/intelligent/rocketMQ/producerMQ/service/impl/RocketMqServiceImpl.java

@@ -0,0 +1,53 @@
1
+package com.chinaitop.depot.intelligent.rocketMQ.producerMQ.service.impl;
2
+
3
+import com.chinaitop.depot.intelligent.rocketMQ.producerMQ.model.MqMsg;
4
+import com.chinaitop.depot.intelligent.rocketMQ.producerMQ.service.RocketMqService;
5
+import lombok.extern.slf4j.Slf4j;
6
+import org.apache.rocketmq.client.producer.SendCallback;
7
+import org.apache.rocketmq.client.producer.SendResult;
8
+import org.apache.rocketmq.spring.core.RocketMQTemplate;
9
+import org.springframework.stereotype.Service;
10
+import org.springframework.util.StringUtils;
11
+
12
+import javax.annotation.Resource;
13
+
14
+/**
15
+ * @author qingsong.han
16
+ * @description:
17
+ * @create 2022-03-23 14:09
18
+ */
19
+@Slf4j
20
+@Service
21
+public class RocketMqServiceImpl implements RocketMqService {
22
+    @Resource
23
+    private RocketMQTemplate rocketMQTemplate;
24
+
25
+    /**
26
+     * topic: smart_grp_yn
27
+     * tags:
28
+     *  测温: yn_cw
29
+     *
30
+     * @param mqMsg 发送消息实体类
31
+     */
32
+    @Override
33
+    public void asyncSend(MqMsg mqMsg) {
34
+        log.info("asyncSend发送消息到mqMsg: {}", mqMsg);
35
+        rocketMQTemplate.asyncSend(
36
+                StringUtils.isEmpty(mqMsg.getTags())
37
+                        ? mqMsg.getTopic()
38
+                        : String.format("%s:%s", mqMsg.getTopic(), mqMsg.getTags()),
39
+                mqMsg.getContent(),
40
+                new SendCallback() {
41
+                    @Override
42
+                    public void onSuccess(SendResult sendResult) {
43
+                        // 成功不做日志记录或处理
44
+                    }
45
+
46
+                    @Override
47
+                    public void onException(Throwable throwable) {
48
+                        log.info("mqMsg: {}消息发送失败", mqMsg);
49
+                    }
50
+                }
51
+        );
52
+    }
53
+}

BIN
depot-intelligent/src/main/resources/RXTXcomm-2.0.1.jar


+ 14 - 2
depot-intelligent/src/main/resources/bootstrap.yml

@@ -1,5 +1,5 @@
1
 server:
1
 server:
2
-  port: 9028
2
+  port: 9029
3
   tomcat:
3
   tomcat:
4
     uri-encoding: utf-8
4
     uri-encoding: utf-8
5
 
5
 
@@ -20,7 +20,7 @@ spring:
20
   # 数据库配置
20
   # 数据库配置
21
   datasource:
21
   datasource:
22
     driver-class-name: com.mysql.jdbc.Driver
22
     driver-class-name: com.mysql.jdbc.Driver
23
-    url: jdbc:mysql://localhost:3306/depot?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=UTC
23
+    url: jdbc:mysql://localhost:3306/depot_yunnan?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=UTC
24
     username: root
24
     username: root
25
     password: 123456
25
     password: 123456
26
   # 缓存配置
26
   # 缓存配置
@@ -65,3 +65,15 @@ gas-org-ids: 386,177,273,280,300,298,294
65
 # 数量检测版本区分version3
65
 # 数量检测版本区分version3
66
 num_version3: :20013
66
 num_version3: :20013
67
 
67
 
68
+rocketmq:
69
+  # master
70
+  name-server: 172.16.0.8:9876,172.16.0.10:9876
71
+  producer:
72
+    group: depot_sxp_group
73
+    send-message-timeout: 2000
74
+    retry-times-when-send-async-failed: 5
75
+    retry-times-when-send-failed: 5
76
+# sql 打印
77
+#logging:
78
+#  level:
79
+#   com.chinaitop.depot.intelligent.grainsituation.mapper: debug

BIN
depot-intelligent/src/main/resources/smslib-3.5.4.jar