ソースを参照

添加简单的rocketmq例子,以后可能会更改

lvzhikai 3 年 前
コミット
006e675ec5
共有1 個のファイルを変更した113 個の追加0 個の削除を含む
  1. 113 0
      src/main/java/com/chinaitop/depot/intelligent/rocketMQ/MQConsumerService.java

+ 113 - 0
src/main/java/com/chinaitop/depot/intelligent/rocketMQ/MQConsumerService.java

@@ -0,0 +1,113 @@
1
+package com.chinaitop.depot.intelligent.rocketMQ;
2
+
3
+import com.alibaba.fastjson.JSON;
4
+import com.chinaitop.depot.intelligent.basicdata.service.ResolutionToSendAlarmService;
5
+import com.chinaitop.depot.intelligent.basicdata.service.WarningThresholdService;
6
+import com.chinaitop.depot.intelligent.grainsituation.mapper.TTestdataLayerMapper;
7
+import com.chinaitop.depot.intelligent.grainsituation.mapper.TWarningThresholdHistoryMapper;
8
+import com.chinaitop.depot.intelligent.grainsituation.model.TTestdata;
9
+import com.chinaitop.depot.intelligent.grainsituation.model.TTestdataLayer;
10
+import com.chinaitop.depot.intelligent.grainsituation.model.TWarningThresholdHistory;
11
+import com.chinaitop.depot.intelligent.grainsituation.service.TWarningThresholdHistoryService;
12
+import com.chinaitop.depot.intelligent.grainsituation.service.TemperatureRecordService;
13
+import com.chinaitop.depot.intelligent.utils.UuidUtils;
14
+import com.chinaitop.depot.intelligent.utils.WarningMessage;
15
+import lombok.extern.slf4j.Slf4j;
16
+import org.apache.catalina.User;
17
+import org.apache.rocketmq.common.message.MessageExt;
18
+import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
19
+import org.apache.rocketmq.spring.core.RocketMQListener;
20
+import org.springframework.stereotype.Component;
21
+import org.springframework.stereotype.Service;
22
+
23
+import javax.annotation.Resource;
24
+import java.util.ArrayList;
25
+import java.util.List;
26
+import java.util.stream.Collectors;
27
+
28
+@Slf4j
29
+@Component
30
+public class MQConsumerService {
31
+
32
+    @Resource
33
+    private TemperatureRecordService temperatureRecordService;
34
+    @Resource
35
+    private TTestdataLayerMapper tTestdataLayerMapper;
36
+    @Resource
37
+    private TWarningThresholdHistoryMapper tWarningThresholdHistoryMapper;
38
+    @Resource
39
+    private ResolutionToSendAlarmService resolutionToSendAlarmService;
40
+    @Resource
41
+    private TWarningThresholdHistoryService tWarningThresholdHistoryService;
42
+
43
+    // topic需要和生产者的topic一致,consumerGroup属性是必须指定的,内容可以随意
44
+    // selectorExpression的意思指的就是tag,默认为“*”,不设置的话会监听所有消息
45
+    @Service
46
+    @RocketMQMessageListener(topic = "smart_grp_yn", selectorExpression = "t_testdata_c", consumerGroup = "t_testdata_c")
47
+    public class ConsumerSend implements RocketMQListener<ArrayList<TTestdata>> {
48
+        // 监听到消息就会执行此方法
49
+        @Override
50
+        public void onMessage(ArrayList<TTestdata> tTestdataList) {
51
+            log.info("监听到消息:tTestdataList={}", JSON.toJSONString(tTestdataList));
52
+            temperatureRecordService.insertBatch(tTestdataList);
53
+        }
54
+    }
55
+
56
+    // topic需要和生产者的topic一致,consumerGroup属性是必须指定的,内容可以随意
57
+    // selectorExpression的意思指的就是tag,默认为“*”,不设置的话会监听所有消息
58
+    @Service
59
+    @RocketMQMessageListener(topic = "smart_grp_yn", selectorExpression = "t_testdata_layer_c", consumerGroup = "t_testdata_layer_c")
60
+    public class ConsumerSend1 implements RocketMQListener<ArrayList<TTestdataLayer>> {
61
+        // 监听到消息就会执行此方法
62
+        @Override
63
+        public void onMessage(ArrayList<TTestdataLayer> tTestdataLayerArrayList) {
64
+            log.info("监听到消息:tTestdataLayerArrayList={}", JSON.toJSONString(tTestdataLayerArrayList));
65
+            tTestdataLayerMapper.insertBatch(tTestdataLayerArrayList);
66
+        }
67
+    }
68
+
69
+    // topic需要和生产者的topic一致,consumerGroup属性是必须指定的,内容可以随意
70
+    // selectorExpression的意思指的就是tag,默认为“*”,不设置的话会监听所有消息
71
+    @Service
72
+    @RocketMQMessageListener(topic = "smart_grp_yn", selectorExpression = "t_warning_threshold_history_c", consumerGroup = "t_warning_threshold_history_c")
73
+    public class ConsumerSend3 implements RocketMQListener<ArrayList<TWarningThresholdHistory>> {
74
+        // 监听到消息就会执行此方法
75
+        @Override
76
+        public void onMessage(ArrayList<TWarningThresholdHistory> tWarningThresholdHistorieList) {
77
+            log.info("监听到消息:tWarningThresholdHistorieList={}", JSON.toJSONString(tWarningThresholdHistorieList));
78
+            tWarningThresholdHistoryMapper.insertBatch(tWarningThresholdHistorieList);
79
+            List<TWarningThresholdHistory> collect = tWarningThresholdHistorieList.stream().filter(TWarningThresholdHistory -> TWarningThresholdHistory.getwWarning().equals("0")).collect(Collectors.toList());
80
+            boolean alarmDetails = resolutionToSendAlarmService.getAlarmDetails(collect);
81
+            if (!alarmDetails) {
82
+                collect.forEach(obj -> {
83
+                    obj.setIsSend("2");
84
+                    tWarningThresholdHistoryService.updateById(obj);
85
+                });
86
+            }
87
+        }
88
+    }
89
+
90
+    // 注意:这个ConsumerSend2和上面ConsumerSend在没有添加tag做区分时,不能共存,
91
+    // 不然生产者发送一条消息,这两个都会去消费,如果类型不同会有一个报错,所以实际运用中最好加上tag,写这只是让你看知道就行
92
+//    @Service
93
+//    @RocketMQMessageListener(topic = "smart_grp_yn", consumerGroup = "Con_Group_Two")
94
+//    public class ConsumerSend2 implements RocketMQListener<String> {
95
+//        @Override
96
+//        public void onMessage(String str) {
97
+//            log.info("监听到消息:555str={}", str);
98
+//        }
99
+//    }
100
+
101
+    // MessageExt:是一个消息接收通配符,不管发送的是String还是对象,都可接收,当然也可以像上面明确指定类型(我建议还是指定类型较方便)
102
+//    @Service
103
+//    @RocketMQMessageListener(topic = "smart_grp_yn", selectorExpression = "tag2", consumerGroup = "Con_Group_Three")
104
+//    public class Consumer implements RocketMQListener<MessageExt> {
105
+//        @Override
106
+//        public void onMessage(MessageExt messageExt) {
107
+//            byte[] body = messageExt.getBody();
108
+//            String msg = new String(body);
109
+//            log.info("监听到消息:666msg={}", msg);
110
+//        }
111
+//    }
112
+
113
+}