Przeglądaj źródła

webSocket集群及统计在线用户数

shengyang000 5 lat temu
rodzic
commit
3d32495073

+ 17 - 0
src/main/java/com/unissoft/controller/QuartzController.java

@@ -21,6 +21,9 @@ public class QuartzController {
21 21
     @Autowired
22 22
     private QuartzService quartzService;
23 23
 
24
+//	@Autowired
25
+//    private PublishService publishService;
26
+
24 27
 	@ApiOperation(value = "新增任务", notes = "")
25 28
 	@PostMapping("/authority_button/add")
26 29
 	public ResultView add(QuartzModel quartz){
@@ -94,4 +97,18 @@ public class QuartzController {
94 97
 		return ResultView.error();
95 98
 	}
96 99
 
100
+//	@ApiOperation(value = "测试Redis发布", notes = "")
101
+//	@GetMapping("/authority_button/testRedisPublish")
102
+//	public ResultView testRedisPublish() {
103
+//		MessageVO vo = new MessageVO();
104
+//		vo.setTitle("aaa");
105
+//		vo.setContent("测试");
106
+//		vo.setFromUserId(1);
107
+//		vo.setToUserId(5);
108
+//		vo.setType(10);
109
+//
110
+//		publishService.publish(GrainConstant.MQ_MSG_CHANNEL, vo);
111
+//		return ResultView.success();
112
+//	}
113
+
97 114
 }

+ 5 - 20
src/main/java/com/unissoft/controller/SysMessageController.java

@@ -3,15 +3,15 @@ package com.unissoft.controller;
3 3
 
4 4
 import com.unissoft.dto.MessageVO;
5 5
 import com.unissoft.enums.MessageTypeEnum;
6
+import com.unissoft.model.SysUserPO;
6 7
 import com.unissoft.result.ResultView;
7 8
 import com.unissoft.service.SysMessageService;
9
+import com.unissoft.utils.UserUtils;
8 10
 import io.swagger.annotations.ApiOperation;
9 11
 import lombok.extern.slf4j.Slf4j;
10 12
 import org.springframework.beans.factory.annotation.Autowired;
11 13
 import org.springframework.web.bind.annotation.*;
12 14
 
13
-import java.util.Collection;
14
-
15 15
 /**
16 16
  * <p>
17 17
  * 消息推送表 前端控制器
@@ -26,15 +26,13 @@ import java.util.Collection;
26 26
 public class SysMessageController {
27 27
 
28 28
     @Autowired
29
-    private WebSocketServer webSocketServer;
30
-
31
-    @Autowired
32 29
     private SysMessageService sysMessageService;
33 30
 
34
-
35 31
     @ApiOperation(value = "向用户发送信息", notes = "")
36 32
     @PostMapping("/authority/sendMessage")
37 33
     public ResultView sendMessage(@RequestBody MessageVO messageVO) {
34
+        SysUserPO userPO = UserUtils.getCurrUserInfo();
35
+        messageVO.setFromUserId(userPO.getuId());
38 36
         if(null == messageVO.getType()) {
39 37
             messageVO.setType(MessageTypeEnum.ATTENTION.getCode());//默认类型
40 38
         }
@@ -42,23 +40,10 @@ public class SysMessageController {
42 40
         return ResultView.success();
43 41
     }
44 42
 
45
-    /**
46
-     * @TODO 后期需改造成 获取所有用户ID,并消息持久化
47
-     * @param messageVO
48
-     * @return
49
-     */
50
-    @ApiOperation(value = "向所有用户发送信息", notes = "")
51
-    @PostMapping("/authority/sendAllUsersMessage")
52
-    public ResultView sendAllUsersMessage(@RequestBody MessageVO messageVO) {
53
-        Collection<Integer> userIds = webSocketServer.getOnlineUserIds();
54
-        messageVO.setToUserIds(userIds);
55
-        return sendMessage(messageVO);
56
-    }
57
-
58 43
     @ApiOperation(value = "根据用户id删除webSocket连接", notes = "")
59 44
     @GetMapping("/authority/removeSession/{id}")
60 45
     public ResultView removeSession(@PathVariable int id) {
61
-        webSocketServer.removeSession(id);
46
+        sysMessageService.removeWebSocketSession(id);
62 47
         return ResultView.success();
63 48
     }
64 49
 

+ 64 - 83
src/main/java/com/unissoft/controller/WebSocketServer.java

@@ -1,29 +1,29 @@
1 1
 package com.unissoft.controller;
2 2
 
3
-import com.alibaba.fastjson.JSON;
4 3
 import com.unissoft.config.MyApplicationContextAware;
4
+import com.unissoft.constant.GrainConstant;
5 5
 import com.unissoft.dto.MessageVO;
6
+import com.unissoft.enums.OnlineStatusEnum;
7
+import com.unissoft.redismq.PublishService;
6 8
 import com.unissoft.service.SysMessageService;
9
+import com.unissoft.utils.JsonUtil;
10
+import com.unissoft.vo.UserOnlineStatusVO;
7 11
 import lombok.extern.slf4j.Slf4j;
12
+import org.springframework.data.redis.core.RedisTemplate;
8 13
 import org.springframework.stereotype.Component;
9 14
 
10 15
 import javax.websocket.*;
11 16
 import javax.websocket.server.PathParam;
12 17
 import javax.websocket.server.ServerEndpoint;
13 18
 import java.io.IOException;
14
-import java.util.Collection;
15 19
 import java.util.List;
16 20
 import java.util.concurrent.ConcurrentHashMap;
17
-import java.util.concurrent.atomic.AtomicInteger;
18 21
 
19 22
 @Slf4j
20 23
 @ServerEndpoint("/webSocket/{userId}")
21 24
 @Component
22 25
 public class WebSocketServer {
23 26
 
24
-    /** 静态变量,用来记录当前在线连接数(把它设计成线程安全) */
25
-    private static AtomicInteger onlineNum = new AtomicInteger(0);
26
-
27 27
     /** 用来存放每个客户端对应的WebSocket的Session对象 */
28 28
     private static ConcurrentHashMap<Integer, Session> sessionPools = new ConcurrentHashMap<>();
29 29
 
@@ -34,41 +34,43 @@ public class WebSocketServer {
34 34
      */
35 35
     @OnOpen
36 36
     public void onOpen(Session session, @PathParam(value = "userId") Integer userId) {
37
-
38
-        log.error("------------------用户ID:" + userId);
39
-
40 37
         if(sessionPools.containsKey(userId)) {
41 38
             sessionPools.remove(userId);
42
-            sessionPools.put(userId, session);
43
-        } else {
44
-            sessionPools.put(userId, session);
45
-            addOnlineCount();
46
-            log.info("有新连接加入:{},当前在线人数为:{}", "userId为" + userId, onlineNum.get());
47 39
         }
40
+        sessionPools.put(userId, session);
41
+        addOnlineUser(userId);
42
+        publishStatisticsOnlineStatus();//发布在线统计状态
48 43
 
49 44
         //登录后,查询是否有待推送的通知,如果有立马推送
50 45
         new Thread() {
51 46
             @Override
52 47
             public void run() {
53
-                pushMessage2User(userId);
48
+                pushMessage2User(userId, session);
54 49
             }
55 50
         }.start();
56 51
     }
57 52
 
58
-    private void pushMessage2User(Integer userId) {
53
+    private void pushMessage2User(Integer userId, Session session) {
59 54
         SysMessageService sysMessageService = MyApplicationContextAware.getBean(SysMessageService.class);
60 55
 
61 56
         List<MessageVO> list = sysMessageService.getUnpushedUserMessage(userId);
62
-        if(null != list && ! list.isEmpty()) {
57
+        if(null != list && list.isEmpty() == false) {
63 58
             for(MessageVO vo : list) {
64
-                String content = com.alibaba.fastjson.JSONObject.toJSONString(vo);
65
-                sendUserMessage(userId, content);
59
+                String content = JsonUtil.parseObjToJson(vo);
60
+                sendMessage(session, content);
66 61
             }
67 62
             //设置为已推送状态
68 63
             sysMessageService.batchSetPushedStatus(list);
69 64
         }
70 65
     }
71 66
 
67
+    private void publishStatisticsOnlineStatus() {
68
+        PublishService publishService = MyApplicationContextAware.getBean(PublishService.class);
69
+        UserOnlineStatusVO onlineStatusVO = new UserOnlineStatusVO();
70
+        onlineStatusVO.setOnlineStatus(OnlineStatusEnum.STATISTICS.getCode());
71
+        publishService.publish(GrainConstant.MQ_USER_STATUS_CHANNEL, onlineStatusVO);
72
+    }
73
+
72 74
 
73 75
     /**
74 76
      * 关闭连接时调用
@@ -77,7 +79,14 @@ public class WebSocketServer {
77 79
     @OnClose
78 80
     public void onClose(@PathParam(value = "userId") Integer userId) {
79 81
         removeSession(userId);
80
-        log.info("有一连接关闭:{},当前在线人数为:{}", "userId为" + userId, onlineNum.get());
82
+        publishStatisticsOnlineStatus();//发布在线统计状态
83
+    }
84
+
85
+    public static void removeSession(Integer userId) {
86
+        if(sessionPools.containsKey(userId)) {
87
+            sessionPools.remove(userId);
88
+        }
89
+        subOnlineUser(userId);
81 90
     }
82 91
 
83 92
     /**
@@ -86,16 +95,11 @@ public class WebSocketServer {
86 95
      */
87 96
     @OnMessage
88 97
     public void onMessage(String message, @PathParam(value = "userId") Integer userId) {
89
-        log.info("服务端收到客户端[{}]的消息:{}", userId, message);
90
-
91
-        MessageVO messageVO = JSON.parseObject(message, MessageVO.class);
98
+        MessageVO messageVO = JsonUtil.parseJsonToObj(message, MessageVO.class);
92 99
         if(null != messageVO) {
93
-            if(messageVO.isBroadcast()) {//广播
94
-                sendAllUsersMessage(messageVO.getContent());
95
-            } else {
96
-                Collection<Integer> toUserIds = messageVO.getToUserIds();
97
-                sendBatchUsersMessage(toUserIds, messageVO.getContent());
98
-            }
100
+            messageVO.setFromUserId(userId);
101
+            SysMessageService sysMessageService = MyApplicationContextAware.getBean(SysMessageService.class);
102
+            sysMessageService.sendMessage(messageVO);
99 103
         }
100 104
     }
101 105
 
@@ -109,17 +113,6 @@ public class WebSocketServer {
109 113
         log.error("发生错误,错误原因:" + error.getMessage());
110 114
     }
111 115
 
112
-    /**
113
-     * 删除连接
114
-     * @param userId
115
-     */
116
-    public void removeSession(Integer userId) {
117
-        if(sessionPools.containsKey(userId)) {
118
-            sessionPools.remove(userId);
119
-            subOnlineCount();
120
-        }
121
-    }
122
-
123 116
 
124 117
     /**
125 118
      * 向指定用户发送信息
@@ -131,72 +124,60 @@ public class WebSocketServer {
131 124
         if(null == session) {//用户不在线,返回false
132 125
             return false;
133 126
         }
134
-        try {
135
-            sendMessage(session, content);
136
-            return true;
137
-        } catch (Exception e) {//推送失败,返回false
138
-            log.error(e.getMessage());
139
-            return false;
140
-        }
127
+        return sendMessage(session, content);
141 128
     }
142 129
 
143
-    /**
144
-     * 向所有在线用户发送信息
145
-     * @param content
146
-     */
147
-    public void sendAllUsersMessage(String content) {
148
-        if(! sessionPools.isEmpty()) {
149
-            for(Session session : sessionPools.values()) {
150
-                try {
151
-                    sendMessage(session, content);
152
-                } catch (Exception e) {
153
-                    log.error(e.getMessage());
154
-                }
155
-            }
130
+    public void noticeOnlineNum2All(long onlineNum) {
131
+        MessageVO messageVO = new MessageVO();
132
+        messageVO.setType(9999);//表示在线用户数信息
133
+        messageVO.setOnlineNum(onlineNum);
134
+        String content = JsonUtil.parseObjToJson(messageVO);
135
+        for(Session toSession : sessionPools.values()) {
136
+            sendMessage(toSession, content);
156 137
         }
157 138
     }
158 139
 
159
-    /**
160
-     * 向某批用户在线发送信息
161
-     * @param toUserIds
162
-     * @param content
163
-     */
164
-    public void sendBatchUsersMessage(Collection<Integer> toUserIds, String content) {
165
-        if(null != toUserIds && ! toUserIds.isEmpty()) {
166
-            for(Integer toUserId : toUserIds) {
167
-                sendUserMessage(toUserId, content);
168
-            }
169
-        }
170
-    }
171 140
 
172 141
     /**
173 142
      * 向某客户端发送消息
174 143
      * @param toSession   接收客户端session
175
-     * @param content   信息内
144
+     * @param content   信息内容
176 145
      * @throws IOException
177 146
      */
178
-    private synchronized void sendMessage(Session toSession, String content) throws IOException {
179
-        if (toSession != null) {
147
+    private synchronized boolean sendMessage(Session toSession, String content) {
148
+        try {
180 149
             toSession.getBasicRemote().sendText(content);
150
+            return true;
151
+        } catch (IOException e) {//推送失败,返回false
152
+            log.error(e.getMessage());
153
+            return false;
181 154
         }
182 155
     }
183 156
 
184 157
 
185 158
     /**
186
-     * 在线用户量加1
159
+     * 在线用户添加
187 160
      */
188
-    public static void addOnlineCount() {
189
-        onlineNum.incrementAndGet();
161
+    public static void addOnlineUser(Integer userId) {
162
+        RedisTemplate redisTemplate = MyApplicationContextAware.getBean(RedisTemplate.class);
163
+        redisTemplate.opsForSet().add(GrainConstant.ONLINE_USERS, userId);
190 164
     }
191 165
 
192 166
     /**
193
-     * 在线用户量减1
167
+     * 在线用户删除
194 168
      */
195
-    public static void subOnlineCount() {
196
-        onlineNum.decrementAndGet();
169
+    public static void subOnlineUser(Integer userId) {
170
+        RedisTemplate redisTemplate = MyApplicationContextAware.getBean(RedisTemplate.class);
171
+        redisTemplate.opsForSet().remove(GrainConstant.ONLINE_USERS, userId);
197 172
     }
198 173
 
199
-    public static Collection<Integer> getOnlineUserIds() {
200
-        return sessionPools.keySet();
174
+    /**
175
+     * 是否在该服务上存在连接的用户
176
+     * @param userId
177
+     * @return
178
+     */
179
+    public static boolean containsUser(Integer userId) {
180
+        return sessionPools.containsKey(userId);
201 181
     }
182
+
202 183
 }

+ 18 - 0
src/main/java/com/unissoft/enums/OnlineStatusEnum.java

@@ -0,0 +1,18 @@
1
+package com.unissoft.enums;
2
+
3
+import lombok.Getter;
4
+
5
+@Getter
6
+public enum OnlineStatusEnum {
7
+    STATISTICS(1, "统计"),
8
+    OFFLINE(0, "下线"),
9
+    ;
10
+
11
+    private Integer code;
12
+    private String desc;
13
+
14
+    OnlineStatusEnum(Integer code, String desc) {
15
+        this.code = code;
16
+        this.desc = desc;
17
+    }
18
+}

+ 6 - 4
src/main/java/com/unissoft/job/ToBeDoneJob.java

@@ -17,7 +17,10 @@ import org.springframework.beans.factory.annotation.Autowired;
17 17
 import org.springframework.stereotype.Component;
18 18
 
19 19
 import java.io.Serializable;
20
-import java.util.*;
20
+import java.util.Date;
21
+import java.util.HashMap;
22
+import java.util.List;
23
+import java.util.Map;
21 24
 
22 25
 @DisallowConcurrentExecution
23 26
 @Component
@@ -72,9 +75,8 @@ public class ToBeDoneJob implements Job, Serializable {
72 75
                 String content = vo.getDepotName() + "-" + vo.getHouseName() + "-" + "待处理";
73 76
                 messageVO.setContent(content);
74 77
                 //
75
-                Set<Integer> userIdSet = new HashSet<>();
76
-                userIdSet.add(vo.getUserId());
77
-                messageVO.setToUserIds(userIdSet);
78
+                messageVO.setFromUserId(-1);//-1 代表系统推送
79
+                messageVO.setToUserId(vo.getUserId());
78 80
                 messageVO.setType(MessageTypeEnum.ATTENTION.getCode());
79 81
                 sysMessageService.sendMessage(messageVO);
80 82
             }

+ 7 - 0
src/main/java/com/unissoft/mapper/SysMessageMapper.java

@@ -1,9 +1,13 @@
1 1
 package com.unissoft.mapper;
2 2
 
3 3
 import com.baomidou.mybatisplus.core.mapper.BaseMapper;
4
+import com.unissoft.dto.MessageVO;
4 5
 import com.unissoft.model.SysMessage;
6
+import org.apache.ibatis.annotations.Param;
5 7
 import org.springframework.stereotype.Repository;
6 8
 
9
+import java.util.List;
10
+
7 11
 /**
8 12
  * <p>
9 13
  * 消息推送表 Mapper 接口
@@ -15,4 +19,7 @@ import org.springframework.stereotype.Repository;
15 19
 @Repository
16 20
 public interface SysMessageMapper extends BaseMapper<SysMessage> {
17 21
 
22
+    void batchSetPushedStatus(List<MessageVO> list);
23
+
24
+    void setPushedStatus(@Param("id") Integer id);
18 25
 }

+ 20 - 2
src/main/java/com/unissoft/mapper/SysMessageMapper.xml

@@ -8,7 +8,8 @@
8 8
         <result column="type" property="type" />
9 9
         <result column="title" property="title" />
10 10
         <result column="content" property="content" />
11
-        <result column="user_id" property="userId" />
11
+        <result column="from_user_id" property="fromUserId" />
12
+        <result column="to_user_id" property="toUserId" />
12 13
         <result column="push_status" property="pushStatus" />
13 14
         <result column="url" property="url" />
14 15
         <result column="create_by" property="createBy" />
@@ -21,7 +22,24 @@
21 22
 
22 23
     <!-- 通用查询结果列 -->
23 24
     <sql id="Base_Column_List">
24
-        id, type, title, content, user_id, push_status, url, create_by, create_time, update_by, update_time, delete_by, delete_time
25
+        id, type, title, content, from_user_id, to_user_id, push_status, url,
26
+        create_by, create_time, update_by, update_time, delete_by, delete_time
25 27
     </sql>
26 28
 
29
+    <update id="batchSetPushedStatus" parameterType="list">
30
+        update sys_message
31
+        set push_status = 1
32
+        <where>
33
+            <foreach item="item" collection="list"  index="index" separator=" or " open="(" close=")">
34
+                id = #{item.id}
35
+            </foreach>
36
+        </where>
37
+    </update>
38
+
39
+    <update id="setPushedStatus" parameterType="Integer">
40
+        update sys_message
41
+        set push_status = 1
42
+        where id = #{id}
43
+    </update>
44
+
27 45
 </mapper>

+ 2 - 0
src/main/java/com/unissoft/mapper/SysResourceRoleMapper.java

@@ -4,6 +4,7 @@ import com.baomidou.mybatisplus.core.mapper.BaseMapper;
4 4
 import com.unissoft.dto.ResourceRoleInfoDto;
5 5
 import com.unissoft.model.SysResourceRole;
6 6
 import org.apache.ibatis.annotations.Param;
7
+import org.springframework.stereotype.Repository;
7 8
 
8 9
 import java.util.List;
9 10
 
@@ -15,6 +16,7 @@ import java.util.List;
15 16
  * @author wsm
16 17
  * @since 2020-11-25
17 18
  */
19
+@Repository
18 20
 public interface SysResourceRoleMapper extends BaseMapper<SysResourceRole> {
19 21
 
20 22
     /**

+ 7 - 3
src/main/java/com/unissoft/model/SysMessage.java

@@ -37,9 +37,13 @@ public class SysMessage extends BaseModel implements Serializable {
37 37
     @TableField("content")
38 38
     private String content;
39 39
 
40
-    @ApiModelProperty(value = "用户id")
41
-    @TableField("user_id")
42
-    private Integer userId;
40
+    @ApiModelProperty(value = "发送用户ID")
41
+    @TableField("from_user_id")
42
+    private Integer fromUserId;
43
+
44
+    @ApiModelProperty(value = "接收用户ID")
45
+    @TableField("to_user_id")
46
+    private Integer toUserId;
43 47
 
44 48
     @ApiModelProperty(value = "推送状态,0:未推送,  1:已推送")
45 49
     @TableField("push_status")

+ 58 - 0
src/main/java/com/unissoft/redismq/MsgListenerHandle.java

@@ -0,0 +1,58 @@
1
+package com.unissoft.redismq;
2
+
3
+import com.unissoft.controller.WebSocketServer;
4
+import com.unissoft.dto.MessageVO;
5
+import com.unissoft.service.SysMessageService;
6
+import com.unissoft.utils.JsonUtil;
7
+import lombok.extern.slf4j.Slf4j;
8
+import org.springframework.beans.factory.annotation.Autowired;
9
+import org.springframework.data.redis.connection.Message;
10
+import org.springframework.data.redis.core.RedisTemplate;
11
+import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
12
+import org.springframework.stereotype.Component;
13
+
14
+/**
15
+ * mqMsgChannel 订阅频道处理类
16
+ */
17
+@Slf4j
18
+@Component
19
+public class MsgListenerHandle extends MessageListenerAdapter {
20
+
21
+    @Autowired
22
+    private RedisTemplate<String, String> redisTemplate;
23
+
24
+    @Autowired
25
+    private WebSocketServer webSocketServer;
26
+
27
+    @Autowired
28
+    private SysMessageService sysMessageService;
29
+
30
+    /**
31
+     * 收到监听消息
32
+     * @param message
33
+     * @param bytes
34
+     */
35
+    @Override
36
+    public void onMessage(Message message, byte[] bytes) {
37
+        byte[] body = message.getBody();
38
+        byte[] channel = message.getChannel();
39
+        String messageStr;
40
+        String topic;
41
+        try {
42
+            messageStr = redisTemplate.getStringSerializer().deserialize(body);
43
+            topic = redisTemplate.getStringSerializer().deserialize(channel);
44
+            log.info("收到主题:" + topic + ", 信息内容:" + messageStr);
45
+        } catch (Exception e) {
46
+            log.error(e.getMessage(), e);
47
+            return;
48
+        }
49
+
50
+        MessageVO messageVO = JsonUtil.parseJsonToObj(messageStr, MessageVO.class);
51
+        if(WebSocketServer.containsUser(messageVO.getToUserId())) {
52
+            boolean pushedStatus = webSocketServer.sendUserMessage(messageVO.getToUserId(), messageStr);
53
+            if(pushedStatus) {
54
+                sysMessageService.setPushedStatus(messageVO.getId());
55
+            }
56
+        }
57
+    }
58
+}

+ 22 - 0
src/main/java/com/unissoft/redismq/PublishService.java

@@ -0,0 +1,22 @@
1
+package com.unissoft.redismq;
2
+
3
+import com.unissoft.utils.JsonUtil;
4
+import org.springframework.beans.factory.annotation.Autowired;
5
+import org.springframework.data.redis.core.StringRedisTemplate;
6
+import org.springframework.stereotype.Component;
7
+
8
+@Component
9
+public class PublishService {
10
+	
11
+	@Autowired
12
+	StringRedisTemplate redisTemplate;
13
+
14
+	/**
15
+	 * 发布方法
16
+	 * @param channel 消息发布订阅 主题
17
+	 * @param message 消息信息
18
+	 */
19
+	public void publish(String channel, Object message) {
20
+		redisTemplate.convertAndSend(channel, JsonUtil.parseObjToJson(message));
21
+	}
22
+}

+ 34 - 0
src/main/java/com/unissoft/redismq/RedisListenerBean.java

@@ -0,0 +1,34 @@
1
+package com.unissoft.redismq;
2
+
3
+import com.unissoft.constant.GrainConstant;
4
+import org.springframework.context.annotation.Bean;
5
+import org.springframework.context.annotation.Configuration;
6
+import org.springframework.data.redis.connection.RedisConnectionFactory;
7
+import org.springframework.data.redis.listener.PatternTopic;
8
+import org.springframework.data.redis.listener.RedisMessageListenerContainer;
9
+
10
+@Configuration
11
+public class RedisListenerBean {
12
+
13
+    /**
14
+     * redis消息监听器容器
15
+     * 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器
16
+     * 通过反射技术调用消息订阅处理器的相关方法进行一些业务处理
17
+     * @param connectionFactory
18
+     * @param msgListenerHandle
19
+     * @param userStatusListenerHandle
20
+     * @return
21
+     */
22
+    @Bean
23
+    public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
24
+                                                   MsgListenerHandle msgListenerHandle,
25
+                                                   UserStatusListenerHandle userStatusListenerHandle){
26
+        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
27
+        container.setConnectionFactory(connectionFactory);
28
+        //订阅频道,可订阅多个
29
+        container.addMessageListener(msgListenerHandle, new PatternTopic(GrainConstant.MQ_MSG_CHANNEL));
30
+        container.addMessageListener(userStatusListenerHandle, new PatternTopic(GrainConstant.MQ_USER_STATUS_CHANNEL));
31
+        return container;
32
+    }
33
+
34
+}

+ 60 - 0
src/main/java/com/unissoft/redismq/UserStatusListenerHandle.java

@@ -0,0 +1,60 @@
1
+package com.unissoft.redismq;
2
+
3
+import com.unissoft.constant.GrainConstant;
4
+import com.unissoft.controller.WebSocketServer;
5
+import com.unissoft.enums.OnlineStatusEnum;
6
+import com.unissoft.utils.JsonUtil;
7
+import com.unissoft.vo.UserOnlineStatusVO;
8
+import lombok.extern.slf4j.Slf4j;
9
+import org.springframework.beans.factory.annotation.Autowired;
10
+import org.springframework.data.redis.connection.Message;
11
+import org.springframework.data.redis.core.RedisTemplate;
12
+import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
13
+import org.springframework.stereotype.Component;
14
+
15
+/**
16
+ * mqUserStatusChannel 订阅频道处理类
17
+ */
18
+@Slf4j
19
+@Component
20
+public class UserStatusListenerHandle extends MessageListenerAdapter {
21
+
22
+    @Autowired
23
+    private RedisTemplate<String, String> redisTemplate;
24
+
25
+    @Autowired
26
+    private WebSocketServer webSocketServer;
27
+
28
+    /**
29
+     * 收到监听消息
30
+     * @param message
31
+     * @param bytes
32
+     */
33
+    @Override
34
+    public void onMessage(Message message, byte[] bytes) {
35
+        byte[] body = message.getBody();
36
+        byte[] channel = message.getChannel();
37
+        String rawMsg;
38
+        String topic;
39
+        try {
40
+            rawMsg = redisTemplate.getStringSerializer().deserialize(body);
41
+            topic = redisTemplate.getStringSerializer().deserialize(channel);
42
+            log.info("收到主题:" + topic + ", 信息内容:" + rawMsg);
43
+        } catch (Exception e) {
44
+            log.error(e.getMessage(), e);
45
+            return;
46
+        }
47
+        //
48
+        UserOnlineStatusVO onlineStatusVO = JsonUtil.parseJsonToObj(rawMsg, UserOnlineStatusVO.class);
49
+        if(onlineStatusVO.getOnlineStatus() == OnlineStatusEnum.OFFLINE.getCode()) {
50
+            if(WebSocketServer.containsUser(onlineStatusVO.getUserId())) {
51
+                WebSocketServer.removeSession(onlineStatusVO.getUserId());
52
+            }
53
+        } else if(onlineStatusVO.getOnlineStatus() == OnlineStatusEnum.STATISTICS.getCode()) {//统计在线人数
54
+            long onlineNum = redisTemplate.opsForSet().size(GrainConstant.ONLINE_USERS);//在线人数
55
+            //通知所有用户 在线人数
56
+            webSocketServer.noticeOnlineNum2All(onlineNum);
57
+        }
58
+
59
+    }
60
+}

+ 6 - 0
src/main/java/com/unissoft/service/SysMessageService.java

@@ -5,6 +5,7 @@ import com.unissoft.dto.MessageVO;
5 5
 import com.unissoft.model.SysMessage;
6 6
 
7 7
 import java.util.List;
8
+import java.util.Set;
8 9
 
9 10
 /**
10 11
  * <p>
@@ -18,8 +19,13 @@ public interface SysMessageService extends IService<SysMessage> {
18 19
 
19 20
     void sendMessage(MessageVO messageVO);
20 21
 
22
+    void batchSendMessage(MessageVO messageVO, Set<Integer> userIds);
23
+
24
+    void setPushedStatus(Integer id);
25
+
21 26
     List<MessageVO> getUnpushedUserMessage(Integer userId);
22 27
 
23 28
     void batchSetPushedStatus(List<MessageVO> voList);
24 29
 
30
+    void removeWebSocketSession(int id);
25 31
 }

+ 3 - 3
src/main/java/com/unissoft/service/impl/SysAnnouncementServiceImpl.java

@@ -194,15 +194,15 @@ public class SysAnnouncementServiceImpl extends ServiceImpl<SysAnnouncementMappe
194 194
      */
195 195
     private void saveAnnouncementUsersAndSendMessage(SysAnnouncement model, SysUserPO userPO) {
196 196
         Map<Integer, String> users = getNoticeUserInfo(userPO.getDepotId(), model.getRecvType(), model.getRecvDetails());
197
-        if(null != users && ! users.isEmpty()) {
197
+        if(null != users && users.isEmpty() == false) {
198 198
             saveAnnouncementUsers(model.getId(), users);
199 199
 
200 200
             MessageVO messageVO = new MessageVO();
201 201
             messageVO.setTitle(model.getTitle());
202 202
             messageVO.setContent(model.getTitle());
203 203
             messageVO.setType(MessageTypeEnum.ATTENTION.getCode());
204
-            messageVO.setToUserIds(users.keySet());
205
-            sysMessageService.sendMessage(messageVO);
204
+            messageVO.setFromUserId(userPO.getuId());
205
+            sysMessageService.batchSendMessage(messageVO, users.keySet());
206 206
         }
207 207
     }
208 208
 

+ 44 - 33
src/main/java/com/unissoft/service/impl/SysMessageServiceImpl.java

@@ -1,21 +1,24 @@
1 1
 package com.unissoft.service.impl;
2 2
 
3
-import com.alibaba.fastjson.JSONObject;
4 3
 import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
5 4
 import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
6
-import com.unissoft.controller.WebSocketServer;
5
+import com.unissoft.constant.GrainConstant;
7 6
 import com.unissoft.dto.MessageVO;
7
+import com.unissoft.enums.OnlineStatusEnum;
8 8
 import com.unissoft.mapper.SysMessageMapper;
9 9
 import com.unissoft.model.SysMessage;
10
+import com.unissoft.redismq.PublishService;
10 11
 import com.unissoft.service.SysMessageService;
12
+import com.unissoft.vo.UserOnlineStatusVO;
13
+import org.springframework.beans.BeanUtils;
11 14
 import org.springframework.beans.factory.annotation.Autowired;
12 15
 import org.springframework.stereotype.Service;
13 16
 import org.springframework.transaction.annotation.Transactional;
14 17
 
15 18
 import java.util.ArrayList;
16
-import java.util.Collection;
17 19
 import java.util.Date;
18 20
 import java.util.List;
21
+import java.util.Set;
19 22
 
20 23
 /**
21 24
  * <p>
@@ -32,33 +35,36 @@ public class SysMessageServiceImpl extends ServiceImpl<SysMessageMapper, SysMess
32 35
     private SysMessageMapper sysMessageMapper;
33 36
 
34 37
     @Autowired
35
-    private WebSocketServer webSocketServer;
38
+    private PublishService publishService;
36 39
 
37 40
     @Override
38 41
     @Transactional
39 42
     public void sendMessage(MessageVO messageVO) {
40
-        Collection<Integer> toUserIds = messageVO.getToUserIds();
41
-        if(null != toUserIds && toUserIds.isEmpty() == false) {
42
-            messageVO.setToUserIds(null);
43
+        if(null != messageVO.getToUserId()) {
43 44
             SysMessage message = new SysMessage();
44
-            message.setType(messageVO.getType());
45
-            message.setTitle(messageVO.getTitle());
46
-            message.setContent(messageVO.getContent());
47
-
45
+            BeanUtils.copyProperties(messageVO, message);
46
+            message.setPushStatus(false);
48 47
             Date now = new Date();
49 48
             message.setCreateTime(now);
50 49
             message.setUpdateTime(now);
51
-            for(Integer toUserId : toUserIds) {
52
-                messageVO.setUserId(toUserId);
53
-                String content = JSONObject.toJSONString(messageVO);
54
-                boolean pushStatus = webSocketServer.sendUserMessage(toUserId, content);
55
-                message.setUserId(toUserId);
56
-                message.setPushStatus(pushStatus);
57
-                sysMessageMapper.insert(message);
58
-            }
50
+            //
51
+            sysMessageMapper.insert(message);
52
+            messageVO.setId(message.getId());
53
+            //
54
+            publishService.publish(GrainConstant.MQ_MSG_CHANNEL, messageVO);
59 55
         }
60 56
     }
61 57
 
58
+    @Override
59
+    @Transactional
60
+    public void batchSendMessage(MessageVO messageVO, Set<Integer> userIds) {
61
+        if(null != userIds && userIds.isEmpty() == false) {
62
+            for(Integer userId : userIds) {
63
+                messageVO.setToUserId(userId);
64
+                sendMessage(messageVO);
65
+            }
66
+        }
67
+    }
62 68
 
63 69
     /**
64 70
      * 获取未推送的用户消息
@@ -69,7 +75,7 @@ public class SysMessageServiceImpl extends ServiceImpl<SysMessageMapper, SysMess
69 75
     public List<MessageVO> getUnpushedUserMessage(Integer userId) {
70 76
         List<MessageVO> result = null;
71 77
         QueryWrapper<SysMessage> ew = new QueryWrapper<>();
72
-        ew.eq("user_id", userId);
78
+        ew.eq("to_user_id", userId);
73 79
         ew.eq("push_status", 0);
74 80
         List<SysMessage> list = sysMessageMapper.selectList(ew);
75 81
 
@@ -77,11 +83,7 @@ public class SysMessageServiceImpl extends ServiceImpl<SysMessageMapper, SysMess
77 83
             result = new ArrayList<>();
78 84
             for(SysMessage sysMessage : list) {
79 85
                 MessageVO messageVO = new MessageVO();
80
-                messageVO.setId(sysMessage.getId());
81
-                messageVO.setUserId(userId);
82
-                messageVO.setTitle(sysMessage.getTitle());
83
-                messageVO.setContent(sysMessage.getContent());
84
-                messageVO.setType(sysMessage.getType());
86
+                BeanUtils.copyProperties(sysMessage, messageVO);
85 87
                 result.add(messageVO);
86 88
             }
87 89
         }
@@ -89,13 +91,22 @@ public class SysMessageServiceImpl extends ServiceImpl<SysMessageMapper, SysMess
89 91
     }
90 92
 
91 93
     @Override
92
-    public void batchSetPushedStatus(List<MessageVO> voList) {
93
-        List<SysMessage> list = new ArrayList<>();
94
-        for(MessageVO vo : voList) {
95
-            SysMessage sysMessage = getById(vo.getId());
96
-            sysMessage.setPushStatus(true);
97
-            list.add(sysMessage);
98
-        }
99
-        updateBatchById(list);
94
+    public void batchSetPushedStatus(List<MessageVO> list) {
95
+        sysMessageMapper.batchSetPushedStatus(list);
100 96
     }
97
+
98
+    @Override
99
+    public void setPushedStatus(Integer id) {
100
+        sysMessageMapper.setPushedStatus(id);
101
+    }
102
+
103
+    @Override
104
+    public void removeWebSocketSession(int id) {
105
+        UserOnlineStatusVO onlineStatusVO = new UserOnlineStatusVO();
106
+        onlineStatusVO.setUserId(id);
107
+        onlineStatusVO.setOnlineStatus(OnlineStatusEnum.OFFLINE.getCode());
108
+        publishService.publish(GrainConstant.MQ_USER_STATUS_CHANNEL, onlineStatusVO);
109
+    }
110
+
111
+
101 112
 }

+ 3 - 10
src/main/java/com/unissoft/service/impl/SysNoticeUserServiceImpl.java

@@ -5,7 +5,6 @@ import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
5 5
 import com.google.common.collect.Maps;
6 6
 import com.unissoft.MyConstant;
7 7
 import com.unissoft.constant.GrainConstant;
8
-import com.unissoft.controller.WebSocketServer;
9 8
 import com.unissoft.dto.MessageVO;
10 9
 import com.unissoft.dto.SendNoticeVO;
11 10
 import com.unissoft.dto.SysNoticeVO;
@@ -41,8 +40,6 @@ import java.util.Map;
41 40
 @Service
42 41
 public class SysNoticeUserServiceImpl extends ServiceImpl<SysNoticeUserMapper, SysNoticeUser> implements SysNoticeUserService {
43 42
 
44
-
45
-
46 43
     @Autowired
47 44
     private SysNoticeUserMapper noticeUserMapper;
48 45
 
@@ -52,9 +49,6 @@ public class SysNoticeUserServiceImpl extends ServiceImpl<SysNoticeUserMapper, S
52 49
     @Autowired
53 50
     private SysMessageService sysMessageService;
54 51
 
55
-    @Autowired
56
-    private WebSocketServer webSocketServer;
57
-
58 52
     @Override
59 53
     public Page<SysNoticeUserVO> getPage(PageParam pageParam, Integer userId) {
60 54
         Integer pageIndex = pageParam.getPageIndex();
@@ -142,17 +136,16 @@ public class SysNoticeUserServiceImpl extends ServiceImpl<SysNoticeUserMapper, S
142 136
                     save(sysNoticeUser);
143 137
                 }
144 138
                 //
145
-                MessageVO messageVO = getMessageVO(users, sysNotice);
146
-                sysMessageService.sendMessage(messageVO);
139
+                MessageVO messageVO = getMessageVO(sysNotice);
140
+                sysMessageService.batchSendMessage(messageVO, users.keySet());
147 141
             }
148 142
         }
149 143
 
150 144
     }
151 145
 
152
-    private MessageVO getMessageVO(Map<Integer, String> users, SysNotice sysNotice) {
146
+    private MessageVO getMessageVO(SysNotice sysNotice) {
153 147
 
154 148
         MessageVO messageVO = new MessageVO();
155
-        messageVO.setToUserIds(users.keySet());
156 149
         messageVO.setTitle(sysNotice.getTitle());
157 150
         messageVO.setContent(sysNotice.getContent());
158 151
 

+ 13 - 0
src/main/java/com/unissoft/vo/UserOnlineStatusVO.java

@@ -0,0 +1,13 @@
1
+package com.unissoft.vo;
2
+
3
+import lombok.Data;
4
+import lombok.EqualsAndHashCode;
5
+
6
+import java.io.Serializable;
7
+
8
+@Data
9
+@EqualsAndHashCode(callSuper = false)
10
+public class UserOnlineStatusVO implements Serializable {
11
+    private Integer userId;
12
+    private Integer onlineStatus;
13
+}