瀏覽代碼

websocket推送

lipenghui 5 天之前
父節點
當前提交
c0282e59a8

+ 127 - 59
yanfan-framework/src/main/java/com/yanfan/framework/websocket/DeviceWebSocketHandler.java

@@ -9,97 +9,122 @@ import org.springframework.web.socket.*;
 import org.springframework.web.socket.server.support.HttpSessionHandshakeInterceptor;
 
 import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
 
 /**
- * ✅ 修复版核心处理器:解决deviceCode=null + 全链路容错 + 日志完善
- * 支持 ws://ip:port/ws/{deviceCode} 动态路径,保证设备编码100%能获取
+ * ✅ 多连接核心处理器(单设备多连接+多设备并发连接)
+ * 核心特性:
+ * 1. 支持 ws://ip:port/ws/{deviceCode} 动态路径
+ * 2. 单设备可建立多个客户端连接,互不冲突、互不覆盖
+ * 3. 线程安全的连接管理,适配高并发场景
+ * 4. 支持「单连接推送、设备下所有连接推送、全局广播」三种推送方式
  */
 @Slf4j
 @Component
 public class DeviceWebSocketHandler implements WebSocketHandler {
 
-    // 线程安全存储:设备编码 → WebSocket会话(核心映射)
-    public static final Map<String, WebSocketSession> DEVICE_SESSION_MAP = new ConcurrentHashMap<>(16);
+    // ========== ✅ 核心升级:双层Map存储【设备编码 → 该设备下的所有连接会话集合】 ==========
+    // ConcurrentHashMap:保证设备维度的线程安全
+    // CopyOnWriteArraySet:保证单设备下多会话的线程安全,支持并发增删、遍历
+    public static final Map<String, Set<WebSocketSession>> DEVICE_SESSIONS_MAP = new ConcurrentHashMap<>();
 
-    // ========== ✅ 握手拦截器【必生效】:解析路径中的deviceCode ==========
+    // ========== 握手拦截器:解析设备编码 + 会话唯一标识 ==========
     public static class DeviceCodeHandshakeInterceptor extends HttpSessionHandshakeInterceptor {
         @Override
         public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response,
                                        WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
-            // ✅ 优化路径解析逻辑:兼容任意格式路径,100%提取deviceCode
+            // 1. 解析路径中的设备编码
             String uriPath = request.getURI().getPath().trim();
             if (!uriPath.startsWith("/ws/")) {
                 log.error("WebSocket连接失败:非法路径,必须以/ws/开头");
                 return false;
             }
-            // 切割路径,精准获取最后一段的设备编码
             String[] pathSegments = uriPath.split("/");
             String deviceCode = pathSegments[pathSegments.length - 1].trim();
-
-            // ✅ 前置校验:设备编码不能为空
             if (StringUtils.isBlank(deviceCode)) {
                 log.error("WebSocket连接失败:设备编码不能为空");
                 return false;
             }
-            // ✅ 存入会话属性,后续全程可获取(拦截器生效后,此处必存值)
+
+            // 2. 生成【会话唯一ID】:区分同一设备下的不同连接,便于日志排查
+            String sessionId = "WS-" + UUID.randomUUID().toString().replace("-", "").substring(0, 16);
+
+            // 3. 存入会话属性:设备编码 + 会话ID,全程可获取
             attributes.put("deviceCode", deviceCode);
-            log.info("✅ 设备[{}]开始WebSocket握手,路径解析成功", deviceCode);
+            attributes.put("sessionId", sessionId);
+            log.info("【握手】设备[{}] 会话[{}] 开始WebSocket连接", deviceCode, sessionId);
             return super.beforeHandshake(request, response, wsHandler, attributes);
         }
     }
 
-    // ========== WebSocket生命周期方法 ==========
+    // ========== WebSocket连接生命周期方法 ==========
     /**
-     * 连接建立成功(拦截器已存入deviceCode,此处100%能获取)
+     * 1. 连接建立成功:核心动作 → 将新会话加入【对应设备的会话集合】
      */
     @Override
     public void afterConnectionEstablished(WebSocketSession session) throws Exception {
-        // ✅ 从会话属性获取deviceCode(拦截器已校验非空,此处直接取值)
         String deviceCode = (String) session.getAttributes().get("deviceCode");
-        log.info("✅ 设备[{}]WebSocket连接建立成功", deviceCode);
+        String sessionId = (String) session.getAttributes().get("sessionId");
 
-        // 存储会话,重连时覆盖旧会话(保证设备会话唯一性)
-        DEVICE_SESSION_MAP.put(deviceCode, session);
-        log.info("当前在线设备数:{},在线设备:{}", DEVICE_SESSION_MAP.size(), DEVICE_SESSION_MAP.keySet());
+        // ✅ 核心:设备编码不存在则初始化集合,存在则直接添加会话
+        DEVICE_SESSIONS_MAP.computeIfAbsent(deviceCode, k -> new CopyOnWriteArraySet<>()).add(session);
 
-        // 连接成功,主动向前端推送回执
-        sendMessageToDevice(deviceCode, "【成功】设备" + deviceCode + "已建立WebSocket连接");
+        // 连接成功日志 + 统计信息
+        int deviceConnCount = DEVICE_SESSIONS_MAP.get(deviceCode).size(); // 该设备当前连接数
+        int totalDeviceCount = DEVICE_SESSIONS_MAP.size(); // 在线设备总数
+        int totalConnCount = DEVICE_SESSIONS_MAP.values().stream().mapToInt(Set::size).sum(); // 全局总连接数
+        log.info("✅ 连接成功 | 设备[{}] 会话[{}] | 设备连接数:{} | 在线设备数:{} | 全局总连接数:{}",
+                deviceCode, sessionId, deviceConnCount, totalDeviceCount, totalConnCount);
+
+        // 主动推送连接成功回执给当前客户端
+        sendMessageToSingleSession(session, "【连接成功】设备:" + deviceCode + " | 会话ID:" + sessionId);
     }
 
     /**
-     * 接收前端发送的消息(前端→后端)
+     * 2. 接收前端消息:支持单连接接收,自动识别设备+会话
      */
     @Override
     public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
         String deviceCode = (String) session.getAttributes().get("deviceCode");
+        String sessionId = (String) session.getAttributes().get("sessionId");
         String receiveMsg = message.getPayload().toString().trim();
-        log.info("📥 收到设备[{}]消息:{}", deviceCode, receiveMsg);
 
-        // 业务示例:收到消息后,给前端回复回执
-        sendMessageToDevice(deviceCode, "【已接收】你的消息:" + receiveMsg);
+        log.info("📥 接收消息 | 设备[{}] 会话[{}] | 内容:{}", deviceCode, sessionId, receiveMsg);
+
+        // 业务示例:收到消息后,给【当前连接】回复回执
+        sendMessageToSingleSession(session, "【已接收】你的消息:" + receiveMsg);
+
+        // 扩展示例:收到消息后,推送给【该设备下所有连接】
+        // sendMessageToDeviceAllSession(deviceCode, "【设备广播】收到会话["+sessionId+"]消息:" + receiveMsg);
     }
 
     /**
-     * 连接异常处理
+     * 3. 连接异常:自动清理无效会话,避免内存泄漏
      */
     @Override
     public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
         String deviceCode = (String) session.getAttributes().get("deviceCode");
-        log.error("❌ 设备[{}]WebSocket连接异常", deviceCode, exception);
-        // 异常时清理会话
-        clearInvalidSession(deviceCode, session);
+        String sessionId = (String) session.getAttributes().get("sessionId");
+        log.error("❌ 连接异常 | 设备[{}] 会话[{}]", deviceCode, sessionId, exception);
+
+        // 清理无效会话
+        removeInvalidSession(deviceCode, sessionId, session);
     }
 
     /**
-     * 连接关闭处理
+     * 4. 连接关闭:主动清理会话,更新连接统计
      */
     @Override
     public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
         String deviceCode = (String) session.getAttributes().get("deviceCode");
-        log.info("🔌 设备[{}]WebSocket连接关闭,状态:{}", deviceCode, closeStatus);
-        // 关闭时清理会话
-        clearInvalidSession(deviceCode, session);
+        String sessionId = (String) session.getAttributes().get("sessionId");
+        log.info("🔌 连接关闭 | 设备[{}] 会话[{}] | 关闭状态:{}", deviceCode, sessionId, closeStatus);
+
+        // 清理会话
+        removeInvalidSession(deviceCode, sessionId, session);
     }
 
     @Override
@@ -107,58 +132,101 @@ public class DeviceWebSocketHandler implements WebSocketHandler {
         return false;
     }
 
-    // ========== 核心工具方法 ==========
+    // ========== ✅ 核心推送方法(3种,满足所有业务场景) ==========
     /**
-     * ✅ 根据设备编码,精准推送消息给指定前端(后端→前端)
+     * 方法1:推送给【指定单个会话】(精准到某一个客户端连接)
+     * @param session 目标会话
+     * @param content 推送内容
      */
-    public boolean sendMessageToDevice(String deviceCode, String content) {
-        WebSocketSession session = DEVICE_SESSION_MAP.get(deviceCode);
+    public boolean sendMessageToSingleSession(WebSocketSession session, String content) {
         if (session == null || !session.isOpen()) {
-            log.error("❌ 设备[{}]推送失败:设备未在线/连接已断开", deviceCode);
+            log.error("❌ 单会话推送失败:会话已关闭/无效");
             return false;
         }
         try {
-            TextMessage textMessage = new TextMessage(content);
-            session.sendMessage(textMessage);
-            log.info("📤 设备[{}]推送消息成功:{}", deviceCode, content);
+            session.sendMessage(new TextMessage(content));
+            String deviceCode = (String) session.getAttributes().get("deviceCode");
+            String sessionId = (String) session.getAttributes().get("sessionId");
+            log.info("📤 单会话推送成功 | 设备[{}] 会话[{}] | 内容:{}", deviceCode, sessionId, content);
             return true;
         } catch (Exception e) {
-            log.error("❌ 设备[{}]推送消息异常", deviceCode, e);
-            clearInvalidSession(deviceCode, session);
+            log.error("❌ 单会话推送异常", e);
             return false;
         }
     }
 
     /**
-     * 广播消息:推送给所有在线设备
+     * 方法2:推送给【指定设备下的所有连接】(单设备多客户端全推送,核心多连接能力)
+     * @param deviceCode 目标设备编码
+     * @param content 推送内容
      */
-    public void broadcastMessage(String content) {
-        log.info("📢 开始广播消息:{},在线设备数:{}", content, DEVICE_SESSION_MAP.size());
-        TextMessage textMessage = new TextMessage(content);
-        DEVICE_SESSION_MAP.forEach((deviceCode, session) -> {
+    public boolean sendMessageToDeviceAllSession(String deviceCode, String content) {
+        Set<WebSocketSession> sessionSet = DEVICE_SESSIONS_MAP.get(deviceCode);
+        if (sessionSet == null || sessionSet.isEmpty()) {
+            log.error("❌ 设备推送失败 | 设备[{}]:无在线连接", deviceCode);
+            return false;
+        }
+
+        TextMessage message = new TextMessage(content);
+        sessionSet.forEach(session -> {
             if (session.isOpen()) {
                 try {
-                    session.sendMessage(textMessage);
+                    session.sendMessage(message);
                 } catch (Exception e) {
-                    log.error("❌ 设备[{}]广播失败", deviceCode, e);
-                    clearInvalidSession(deviceCode, session);
+                    String sessionId = (String) session.getAttributes().get("sessionId");
+                    log.error("❌ 设备推送异常 | 设备[{}] 会话[{}]", deviceCode, sessionId, e);
                 }
             }
         });
+        log.info("📤 设备全连接推送成功 | 设备[{}] | 推送连接数:{} | 内容:{}",
+                deviceCode, sessionSet.size(), content);
+        return true;
     }
 
     /**
-     * 私有工具:清理无效会话,释放资源
+     * 方法3:全局广播【所有设备的所有连接】(系统公告、全局通知场景)
+     * @param content 广播内容
      */
-    private void clearInvalidSession(String deviceCode, WebSocketSession session) {
-        if (session.isOpen()) {
-            try {
-                session.close();
-            } catch (Exception e) {
-                log.error("关闭会话异常", e);
+    public void broadcastMessageToAll(String content) {
+        int totalPushCount = 0;
+        TextMessage message = new TextMessage(content);
+
+        for (Map.Entry<String, Set<WebSocketSession>> entry : DEVICE_SESSIONS_MAP.entrySet()) {
+            String deviceCode = entry.getKey();
+            Set<WebSocketSession> sessionSet = entry.getValue();
+
+            for (WebSocketSession session : sessionSet) {
+                if (session.isOpen()) {
+                    try {
+                        session.sendMessage(message);
+                        totalPushCount++;
+                    } catch (Exception e) {
+                        String sessionId = (String) session.getAttributes().get("sessionId");
+                        log.error("❌ 全局广播异常 | 设备[{}] 会话[{}]", deviceCode, sessionId, e);
+                    }
+                }
             }
         }
-        DEVICE_SESSION_MAP.remove(deviceCode);
-        log.info("✅ 设备[{}]无效会话已清理,当前在线设备数:{}", deviceCode, DEVICE_SESSION_MAP.size());
+        log.info("📢 全局广播完成 | 推送总连接数:{} | 内容:{}", totalPushCount, content);
+    }
+
+    // ========== 私有工具方法:清理无效会话 + 统计更新 ==========
+    private void removeInvalidSession(String deviceCode, String sessionId, WebSocketSession session) {
+        if (deviceCode == null || DEVICE_SESSIONS_MAP.get(deviceCode) == null) return;
+
+        // 移除当前无效会话
+        DEVICE_SESSIONS_MAP.get(deviceCode).remove(session);
+
+        // 若设备下无任何连接,移除该设备的空集合(优化内存)
+        if (DEVICE_SESSIONS_MAP.get(deviceCode).isEmpty()) {
+            DEVICE_SESSIONS_MAP.remove(deviceCode);
+        }
+
+        // 更新统计信息
+        int deviceConnCount = DEVICE_SESSIONS_MAP.getOrDefault(deviceCode, new CopyOnWriteArraySet<>()).size();
+        int totalDeviceCount = DEVICE_SESSIONS_MAP.size();
+        int totalConnCount = DEVICE_SESSIONS_MAP.values().stream().mapToInt(Set::size).sum();
+        log.info("✅ 会话清理完成 | 设备[{}] 会话[{}] | 设备剩余连接数:{} | 在线设备数:{} | 全局总连接数:{}",
+                deviceCode, sessionId, deviceConnCount, totalDeviceCount, totalConnCount);
     }
 }

+ 8 - 5
yanfan-framework/src/main/java/com/yanfan/framework/websocket/WebSocketNativeConfig.java

@@ -1,5 +1,6 @@
 package com.yanfan.framework.websocket;
 
+import com.yanfan.framework.websocket.DeviceWebSocketHandler;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.web.socket.config.annotation.EnableWebSocket;
@@ -8,10 +9,11 @@ import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry
 import org.springframework.web.socket.server.standard.ServerEndpointExporter;
 
 /**
- * ✅ 修复版配置类:核心绑定握手拦截器,解决deviceCode=null问题
+ * WebSocket核心配置类(多连接支持)
+ * 开启原生WebSocket + 注册动态路径处理器 + 绑定握手拦截器
  */
 @Configuration
-@EnableWebSocket // 开启原生WebSocket支持
+@EnableWebSocket // 开启Spring原生WebSocket功能
 public class WebSocketNativeConfig implements WebSocketConfigurer {
 
     private final DeviceWebSocketHandler deviceWebSocketHandler;
@@ -23,12 +25,13 @@ public class WebSocketNativeConfig implements WebSocketConfigurer {
     @Override
     public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
         registry.addHandler(deviceWebSocketHandler, "/ws/{deviceCode}")
-                .addInterceptors(new DeviceWebSocketHandler.DeviceCodeHandshakeInterceptor()) // ✅ 关键:绑定拦截器(原代码缺失)
-                .setAllowedOrigins("*"); // 允许跨域(生产可指定域名)
+                // 绑定拦截器:解析设备编码 + 支持多连接核心
+                .addInterceptors(new DeviceWebSocketHandler.DeviceCodeHandshakeInterceptor())
+                .setAllowedOrigins("*"); // 允许跨域(生产可指定具体域名)
     }
 
     /**
-     * 原生WebSocket必备Bean:自动注册端点
+     * 原生WebSocket必备Bean:自动注册所有端点,必须注入
      */
     @Bean
     public ServerEndpointExporter serverEndpointExporter() {

+ 1 - 1
yanfan-open-api/src/main/java/com/yanfan/data/controller/DeviceController.java

@@ -688,6 +688,6 @@ public class DeviceController extends BaseController {
     @GetMapping("/init")
     @PermitAll
     public void select(){
-        deviceWebSocketHandler.sendMessageToDevice("YF6660355", JSON.toJSONString("eee"));
+        deviceWebSocketHandler.sendMessageToDeviceAllSession("YF6660355", JSON.toJSONString("eee"));
     }
 }

+ 2 - 1
yanfan-service/yanfan-iot-service/src/main/java/com/yanfan/iot/service/impl/DeviceServiceImpl.java

@@ -489,7 +489,8 @@ public class DeviceServiceImpl implements IDeviceService {
             Device device1 = deviceMapper.selectDeviceBySerialNumber(input.getDeviceNumber());
             if ("连续油管".equals(device1.getProductName())){
                 //如果是连油设备发送ws
-                deviceWebSocketHandler.sendMessageToDevice(input.getDeviceNumber(), JSON.toJSONString(deviceLogList));
+//                deviceWebSocketHandler.sendMessageToDevice(input.getDeviceNumber(), JSON.toJSONString(deviceLogList));
+                deviceWebSocketHandler.sendMessageToDeviceAllSession(input.getDeviceNumber(), JSON.toJSONString(deviceLogList));
             }
             logService.saveBatch(tdLogDto);
 //                }