lipenghui преди 3 дни
родител
ревизия
ec42b1d2b6

+ 6 - 0
yanfan-framework/pom.xml

@@ -107,6 +107,12 @@
             <groupId>org.springframework.boot</groupId>
             <artifactId>spring-boot-starter-websocket</artifactId>
         </dependency>
+
+        <!-- Socket.IO客户端包(兼容2.0.13协议,实现Socket.IO通信) -->
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-websocket</artifactId>
+        </dependency>
     </dependencies>
 
 </project>

+ 0 - 37
yanfan-framework/src/main/java/com/yanfan/framework/config/StompWebSocketConfig.java

@@ -1,37 +0,0 @@
-package com.yanfan.framework.config;
-
-import org.springframework.context.annotation.Configuration;
-import org.springframework.messaging.simp.config.MessageBrokerRegistry;
-import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
-import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
-import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
-
-/**
- * STOMP WebSocket 配置:支持设备编码动态路径
- */
-@Configuration
-@EnableWebSocketMessageBroker // 启用 STOMP 消息代理
-public class StompWebSocketConfig implements WebSocketMessageBrokerConfigurer {
-
-    /**
-     * 配置消息代理(订阅主题前缀)
-     */
-    @Override
-    public void configureMessageBroker(MessageBrokerRegistry registry) {
-        // 1. 启用简单消息代理,前端订阅以下前缀的主题可接收推送
-        registry.enableSimpleBroker("/topic"); // 广播主题(按设备编码细分)
-        // 2. 客户端发送消息的前缀:前端发消息需以 /app 开头,映射到 @MessageMapping
-        registry.setApplicationDestinationPrefixes("/app");
-    }
-
-    /**
-     * 注册 STOMP 端点(前端连接的 WS 路径)
-     */
-    @Override
-    public void registerStompEndpoints(StompEndpointRegistry registry) {
-        // 核心:支持动态设备编码路径 {deviceCode},允许跨域,启用 SockJS 降级
-        registry.addEndpoint("/ws/stomp/{deviceCode}")
-                .setAllowedOriginPatterns("*") // 跨域(生产环境建议限定具体域名)
-                .withSockJS(); // 兼容不支持 WebSocket 的浏览器(自动切换 SockJS)
-    }
-}

+ 164 - 0
yanfan-framework/src/main/java/com/yanfan/framework/websocket/DeviceWebSocketHandler.java

@@ -0,0 +1,164 @@
+package com.yanfan.framework.websocket;
+
+import com.yanfan.common.utils.StringUtils;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.http.server.ServerHttpRequest;
+import org.springframework.http.server.ServerHttpResponse;
+import org.springframework.stereotype.Component;
+import org.springframework.web.socket.*;
+import org.springframework.web.socket.server.support.HttpSessionHandshakeInterceptor;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * ✅ 修复版核心处理器:解决deviceCode=null + 全链路容错 + 日志完善
+ * 支持 ws://ip:port/ws/{deviceCode} 动态路径,保证设备编码100%能获取
+ */
+@Slf4j
+@Component
+public class DeviceWebSocketHandler implements WebSocketHandler {
+
+    // 线程安全存储:设备编码 → WebSocket会话(核心映射)
+    public static final Map<String, WebSocketSession> DEVICE_SESSION_MAP = new ConcurrentHashMap<>(16);
+
+    // ========== ✅ 握手拦截器【必生效】:解析路径中的deviceCode ==========
+    public static class DeviceCodeHandshakeInterceptor extends HttpSessionHandshakeInterceptor {
+        @Override
+        public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response,
+                                       WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
+            // ✅ 优化路径解析逻辑:兼容任意格式路径,100%提取deviceCode
+            String uriPath = request.getURI().getPath().trim();
+            if (!uriPath.startsWith("/ws/")) {
+                log.error("WebSocket连接失败:非法路径,必须以/ws/开头");
+                return false;
+            }
+            // 切割路径,精准获取最后一段的设备编码
+            String[] pathSegments = uriPath.split("/");
+            String deviceCode = pathSegments[pathSegments.length - 1].trim();
+
+            // ✅ 前置校验:设备编码不能为空
+            if (StringUtils.isBlank(deviceCode)) {
+                log.error("WebSocket连接失败:设备编码不能为空");
+                return false;
+            }
+            // ✅ 存入会话属性,后续全程可获取(拦截器生效后,此处必存值)
+            attributes.put("deviceCode", deviceCode);
+            log.info("✅ 设备[{}]开始WebSocket握手,路径解析成功", deviceCode);
+            return super.beforeHandshake(request, response, wsHandler, attributes);
+        }
+    }
+
+    // ========== WebSocket生命周期方法 ==========
+    /**
+     * 连接建立成功(拦截器已存入deviceCode,此处100%能获取)
+     */
+    @Override
+    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
+        // ✅ 从会话属性获取deviceCode(拦截器已校验非空,此处直接取值)
+        String deviceCode = (String) session.getAttributes().get("deviceCode");
+        log.info("✅ 设备[{}]WebSocket连接建立成功", deviceCode);
+
+        // 存储会话,重连时覆盖旧会话(保证设备会话唯一性)
+        DEVICE_SESSION_MAP.put(deviceCode, session);
+        log.info("当前在线设备数:{},在线设备:{}", DEVICE_SESSION_MAP.size(), DEVICE_SESSION_MAP.keySet());
+
+        // 连接成功,主动向前端推送回执
+        sendMessageToDevice(deviceCode, "【成功】设备" + deviceCode + "已建立WebSocket连接");
+    }
+
+    /**
+     * 接收前端发送的消息(前端→后端)
+     */
+    @Override
+    public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
+        String deviceCode = (String) session.getAttributes().get("deviceCode");
+        String receiveMsg = message.getPayload().toString().trim();
+        log.info("📥 收到设备[{}]消息:{}", deviceCode, receiveMsg);
+
+        // 业务示例:收到消息后,给前端回复回执
+        sendMessageToDevice(deviceCode, "【已接收】你的消息:" + receiveMsg);
+    }
+
+    /**
+     * 连接异常处理
+     */
+    @Override
+    public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
+        String deviceCode = (String) session.getAttributes().get("deviceCode");
+        log.error("❌ 设备[{}]WebSocket连接异常", deviceCode, exception);
+        // 异常时清理会话
+        clearInvalidSession(deviceCode, session);
+    }
+
+    /**
+     * 连接关闭处理
+     */
+    @Override
+    public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
+        String deviceCode = (String) session.getAttributes().get("deviceCode");
+        log.info("🔌 设备[{}]WebSocket连接关闭,状态:{}", deviceCode, closeStatus);
+        // 关闭时清理会话
+        clearInvalidSession(deviceCode, session);
+    }
+
+    @Override
+    public boolean supportsPartialMessages() {
+        return false;
+    }
+
+    // ========== 核心工具方法 ==========
+    /**
+     * ✅ 根据设备编码,精准推送消息给指定前端(后端→前端)
+     */
+    public boolean sendMessageToDevice(String deviceCode, String content) {
+        WebSocketSession session = DEVICE_SESSION_MAP.get(deviceCode);
+        if (session == null || !session.isOpen()) {
+            log.error("❌ 设备[{}]推送失败:设备未在线/连接已断开", deviceCode);
+            return false;
+        }
+        try {
+            TextMessage textMessage = new TextMessage(content);
+            session.sendMessage(textMessage);
+            log.info("📤 设备[{}]推送消息成功:{}", deviceCode, content);
+            return true;
+        } catch (Exception e) {
+            log.error("❌ 设备[{}]推送消息异常", deviceCode, e);
+            clearInvalidSession(deviceCode, session);
+            return false;
+        }
+    }
+
+    /**
+     * 广播消息:推送给所有在线设备
+     */
+    public void broadcastMessage(String content) {
+        log.info("📢 开始广播消息:{},在线设备数:{}", content, DEVICE_SESSION_MAP.size());
+        TextMessage textMessage = new TextMessage(content);
+        DEVICE_SESSION_MAP.forEach((deviceCode, session) -> {
+            if (session.isOpen()) {
+                try {
+                    session.sendMessage(textMessage);
+                } catch (Exception e) {
+                    log.error("❌ 设备[{}]广播失败", deviceCode, e);
+                    clearInvalidSession(deviceCode, session);
+                }
+            }
+        });
+    }
+
+    /**
+     * 私有工具:清理无效会话,释放资源
+     */
+    private void clearInvalidSession(String deviceCode, WebSocketSession session) {
+        if (session.isOpen()) {
+            try {
+                session.close();
+            } catch (Exception e) {
+                log.error("关闭会话异常", e);
+            }
+        }
+        DEVICE_SESSION_MAP.remove(deviceCode);
+        log.info("✅ 设备[{}]无效会话已清理,当前在线设备数:{}", deviceCode, DEVICE_SESSION_MAP.size());
+    }
+}

+ 37 - 0
yanfan-framework/src/main/java/com/yanfan/framework/websocket/WebSocketNativeConfig.java

@@ -0,0 +1,37 @@
+package com.yanfan.framework.websocket;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.web.socket.config.annotation.EnableWebSocket;
+import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
+import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
+import org.springframework.web.socket.server.standard.ServerEndpointExporter;
+
+/**
+ * ✅ 修复版配置类:核心绑定握手拦截器,解决deviceCode=null问题
+ */
+@Configuration
+@EnableWebSocket // 开启原生WebSocket支持
+public class WebSocketNativeConfig implements WebSocketConfigurer {
+
+    private final DeviceWebSocketHandler deviceWebSocketHandler;
+
+    public WebSocketNativeConfig(DeviceWebSocketHandler deviceWebSocketHandler) {
+        this.deviceWebSocketHandler = deviceWebSocketHandler;
+    }
+
+    @Override
+    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
+        registry.addHandler(deviceWebSocketHandler, "/ws/{deviceCode}")
+                .addInterceptors(new DeviceWebSocketHandler.DeviceCodeHandshakeInterceptor()) // ✅ 关键:绑定拦截器(原代码缺失)
+                .setAllowedOrigins("*"); // 允许跨域(生产可指定域名)
+    }
+
+    /**
+     * 原生WebSocket必备Bean:自动注册端点
+     */
+    @Bean
+    public ServerEndpointExporter serverEndpointExporter() {
+        return new ServerEndpointExporter();
+    }
+}

+ 10 - 0
yanfan-open-api/src/main/java/com/yanfan/data/controller/DeviceController.java

@@ -1,5 +1,6 @@
 package com.yanfan.data.controller;
 
+import com.alibaba.fastjson2.JSON;
 import com.yanfan.common.annotation.Anonymous;
 import com.yanfan.common.annotation.Log;
 import com.yanfan.common.constant.HttpStatus;
@@ -14,6 +15,7 @@ import com.yanfan.common.utils.MessageUtils;
 import com.yanfan.common.utils.SecurityUtils;
 import com.yanfan.common.utils.StringUtils;
 import com.yanfan.common.utils.poi.ExcelUtil;
+import com.yanfan.framework.websocket.DeviceWebSocketHandler;
 import com.yanfan.iot.domain.*;
 import com.yanfan.iot.model.*;
 import com.yanfan.iot.model.dto.ThingsModelDTO;
@@ -33,6 +35,7 @@ import org.springframework.security.access.prepost.PreAuthorize;
 import org.springframework.web.bind.annotation.*;
 import org.springframework.web.multipart.MultipartFile;
 
+import javax.annotation.security.PermitAll;
 import javax.servlet.http.HttpServletResponse;
 import java.util.ArrayList;
 import java.util.List;
@@ -680,4 +683,11 @@ public class DeviceController extends BaseController {
         return AjaxResult.success(deviceService.selectDeviceLongitudeAndLatitude());
     }
 
+    @Autowired
+    private DeviceWebSocketHandler deviceWebSocketHandler;
+    @GetMapping("/init")
+    @PermitAll
+    public void select(){
+        deviceWebSocketHandler.sendMessageToDevice("YF6660355", JSON.toJSONString("eee"));
+    }
 }

+ 10 - 5
yanfan-service/yanfan-iot-service/src/main/java/com/yanfan/iot/service/impl/DeviceServiceImpl.java

@@ -27,6 +27,7 @@ import com.yanfan.common.utils.ip.IpUtils;
 import com.yanfan.common.utils.json.JsonUtils;
 import com.yanfan.common.utils.uuid.IdUtils;
 import com.yanfan.framework.manager.AsyncManager;
+import com.yanfan.framework.websocket.DeviceWebSocketHandler;
 import com.yanfan.iot.cache.IDeviceCache;
 import com.yanfan.iot.cache.ITSLCache;
 import com.yanfan.iot.cache.ITSLValueCache;
@@ -59,7 +60,6 @@ import org.springframework.cache.annotation.CacheEvict;
 import org.springframework.cache.annotation.Cacheable;
 import org.springframework.cache.annotation.Caching;
 import org.springframework.context.annotation.Lazy;
-import org.springframework.messaging.simp.SimpMessagingTemplate;
 import org.springframework.scheduling.annotation.Async;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Transactional;
@@ -82,8 +82,6 @@ import static com.yanfan.common.utils.SecurityUtils.*;
 public class DeviceServiceImpl implements IDeviceService {
     private static final Logger log = LoggerFactory.getLogger(DeviceServiceImpl.class);
     @Autowired
-    private SimpMessagingTemplate messagingTemplate;
-    @Autowired
     private DeviceMapper deviceMapper;
     @Autowired
     private DeviceUserMapper deviceUserMapper;
@@ -298,6 +296,9 @@ public class DeviceServiceImpl implements IDeviceService {
         return deviceMapper.selectProductAuthenticate(model);
     }
 
+    @Autowired
+    private DeviceWebSocketHandler deviceWebSocketHandler;
+
     /**
      * 更新设备的物模型
      *
@@ -488,8 +489,12 @@ public class DeviceServiceImpl implements IDeviceService {
             Device device1 = deviceMapper.selectDeviceBySerialNumber(input.getDeviceNumber());
             if ("连续油管".equals(device1.getProductName())){
                 //如果是连油设备发送ws
-                String destination = String.format("/topic/device/%s", input.getDeviceNumber());
-                messagingTemplate.convertAndSend(destination, JSON.toJSONString(deviceLogList));
+//                String destination = String.format("/topic/device/%s", input.getDeviceNumber());
+//                messagingTemplate.convertAndSend(destination, JSON.toJSONString(deviceLogList));
+//                socketIoMessageHandler.pushMsgToDevice(input.getDeviceNumber(), "device_push_msg", deviceLogList);
+//                customWebSocketHandler.pushMsgToDevice(input.getDeviceNumber(), JSON.toJSONString(deviceLogList));
+//                webSocketHandler.sendMessageToDevice(input.getDeviceNumber(), JSON.toJSONString(deviceLogList));
+                deviceWebSocketHandler.sendMessageToDevice(input.getDeviceNumber(), JSON.toJSONString(deviceLogList));
             }
             logService.saveBatch(tdLogDto);
 //                }