|
|
@@ -0,0 +1,218 @@
|
|
|
+package cn.iocoder.yudao.module.pms.websocket;
|
|
|
+
|
|
|
+import cn.hutool.core.collection.CollUtil;
|
|
|
+import cn.hutool.core.date.DateUtil;
|
|
|
+import cn.hutool.core.date.LocalDateTimeUtil;
|
|
|
+import cn.hutool.extra.spring.SpringUtil;
|
|
|
+import cn.iocoder.yudao.framework.common.util.number.NumberUtils;
|
|
|
+import cn.iocoder.yudao.framework.tenant.core.aop.TenantIgnore;
|
|
|
+import cn.iocoder.yudao.framework.tenant.core.util.TenantUtils;
|
|
|
+import cn.iocoder.yudao.module.pms.constant.PmsConstants;
|
|
|
+import cn.iocoder.yudao.module.pms.controller.admin.alarm.vo.IotAlarmRecordPageReqVO;
|
|
|
+import cn.iocoder.yudao.module.pms.controller.admin.alarm.vo.WsTdPropertyVO;
|
|
|
+import cn.iocoder.yudao.module.pms.controller.admin.iotdeviceperson.vo.IotDevicePersonPageReqVO;
|
|
|
+import cn.iocoder.yudao.module.pms.dal.dataobject.IotDeviceDO;
|
|
|
+import cn.iocoder.yudao.module.pms.dal.dataobject.alarm.IotAlarmRecordDO;
|
|
|
+import cn.iocoder.yudao.module.pms.dal.dataobject.alarm.IotAlarmSettingDO;
|
|
|
+import cn.iocoder.yudao.module.pms.dal.dataobject.iotdeviceperson.IotDevicePersonDO;
|
|
|
+import cn.iocoder.yudao.module.pms.dal.mysql.IotDeviceMapper;
|
|
|
+import cn.iocoder.yudao.module.pms.dal.mysql.alarm.IotAlarmRecordMapper;
|
|
|
+import cn.iocoder.yudao.module.pms.dal.mysql.alarm.IotAlarmSettingMapper;
|
|
|
+import cn.iocoder.yudao.module.pms.message.PmsMessage;
|
|
|
+import cn.iocoder.yudao.module.pms.service.iotdeviceperson.IotDevicePersonService;
|
|
|
+import cn.iocoder.yudao.module.system.api.user.AdminUserApi;
|
|
|
+import cn.iocoder.yudao.module.system.api.user.dto.AdminUserRespDTO;
|
|
|
+import com.alibaba.fastjson.JSON;
|
|
|
+import com.google.common.collect.ImmutableList;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.springframework.web.socket.CloseStatus;
|
|
|
+import org.springframework.web.socket.TextMessage;
|
|
|
+import org.springframework.web.socket.WebSocketSession;
|
|
|
+import org.springframework.web.socket.handler.TextWebSocketHandler;
|
|
|
+
|
|
|
+import java.time.LocalDateTime;
|
|
|
+import java.time.temporal.ChronoUnit;
|
|
|
+import java.util.List;
|
|
|
+import java.util.Objects;
|
|
|
+
|
|
|
+/**
|
|
|
+ * 简化版 WS 客户端处理器(仅接收消息,不主动发送)
|
|
|
+ */
|
|
|
+@Slf4j
|
|
|
+public class DynamicWsClientHandler extends TextWebSocketHandler {
|
|
|
+
|
|
|
+ /** 当前连接的配置(含 clientId 和 wsUrl) */
|
|
|
+ private final WsClientConfig config;
|
|
|
+ /** 连接管理器(用于重连、移除连接) */
|
|
|
+ private final WsClientManager manager;
|
|
|
+ private final IotAlarmRecordMapper iotAlarmRecordMapper;
|
|
|
+ private final IotAlarmSettingMapper iotAlarmSettingMapper;
|
|
|
+ private final IotDeviceMapper iotDeviceMapper;
|
|
|
+ private final IotDevicePersonService iotDevicePersonService;
|
|
|
+ private final AdminUserApi adminUserApi;
|
|
|
+ private final PmsMessage pmsMessage;
|
|
|
+ public DynamicWsClientHandler(WsClientConfig config, WsClientManager manager) {
|
|
|
+ this.config = config;
|
|
|
+ this.manager = manager;
|
|
|
+ this.iotAlarmRecordMapper = SpringUtil.getBean(IotAlarmRecordMapper.class);
|
|
|
+ this.iotAlarmSettingMapper = SpringUtil.getBean(IotAlarmSettingMapper.class);
|
|
|
+ this.iotDeviceMapper = SpringUtil.getBean(IotDeviceMapper.class);
|
|
|
+ this.iotDevicePersonService = SpringUtil.getBean(IotDevicePersonService.class);
|
|
|
+ this.adminUserApi = SpringUtil.getBean(AdminUserApi.class);
|
|
|
+ this.pmsMessage = SpringUtil.getBean(PmsMessage.class);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 连接成功建立(仅记录日志,不发送初始化消息)
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ public void afterConnectionEstablished(WebSocketSession session) throws Exception {
|
|
|
+ // 将会话绑定到当前客户端ID,存入管理器
|
|
|
+ manager.bindSession(config.getClientId(), session);
|
|
|
+ // 日志标注 Nginx 代理的 WSS 连接
|
|
|
+ log.info("✅ [{}] 成功连接Nginx代理的WSS服务:{},开始接收消息", config.getClientId(), config.getWsUrl());
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 核心逻辑:接收服务端推送的文本消息
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
|
|
|
+ String payload = message.getPayload();
|
|
|
+ String clientId = config.getClientId();
|
|
|
+ log.info("📩 [{}] 收到Nginx代理WSS推送消息:{}", clientId, payload);
|
|
|
+
|
|
|
+ // 核心:处理接收到的消息(芋道框架业务逻辑)
|
|
|
+ handleReceivedMessage(clientId, payload);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 连接关闭(仅处理重连)
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
|
|
|
+ log.warn("❌ [{}] Nginx代理WSS连接关闭,状态码:{},原因:{}",
|
|
|
+ config.getClientId(), status.getCode(), status.getReason());
|
|
|
+ // 从管理器移除失效会话
|
|
|
+ manager.removeSession(config.getClientId());
|
|
|
+ // 触发重连,保证消息接收不中断
|
|
|
+ manager.reconnect(config);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 连接异常(仅处理重连)
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
|
|
|
+ log.error("❌ [{}] Nginx代理WSS连接异常", config.getClientId(), exception);
|
|
|
+ // 关闭异常会话
|
|
|
+ if (session.isOpen()) {
|
|
|
+ session.close(CloseStatus.SERVER_ERROR);
|
|
|
+ }
|
|
|
+ // 从管理器移除失效会话
|
|
|
+ manager.removeSession(config.getClientId());
|
|
|
+ // 触发重连
|
|
|
+ manager.reconnect(config);
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 处理接收到的消息(核心业务逻辑,适配芋道框架)
|
|
|
+ * 可直接注入芋道的 Service/Mapper 处理消息
|
|
|
+ */
|
|
|
+ @TenantIgnore
|
|
|
+ private void handleReceivedMessage(String clientId, String message) {
|
|
|
+ try {
|
|
|
+ List<WsTdPropertyVO> wsTdPropertyVOS = JSON.parseArray(message, WsTdPropertyVO.class);
|
|
|
+ //todo websocket
|
|
|
+ TenantUtils.executeIgnore(() ->{
|
|
|
+ List<IotDeviceDO> iotDeviceDOS = iotDeviceMapper.selectByCodeIn(ImmutableList.of(clientId));
|
|
|
+ if (CollUtil.isNotEmpty(iotDeviceDOS)) {
|
|
|
+ IotDeviceDO iotDeviceDO = iotDeviceDOS.get(0);
|
|
|
+ List<IotAlarmSettingDO> alarms = iotAlarmSettingMapper.selectList("device_id", iotDeviceDO.getId());
|
|
|
+ if (CollUtil.isNotEmpty(alarms)) {//设备赋值了
|
|
|
+ wsTdPropertyVOS.forEach(wsTdPropertyVO -> {
|
|
|
+ alarms.stream().filter(e -> e.getPropertyCode().equals(wsTdPropertyVO.getIdentity())).findFirst().ifPresent(alarm -> {
|
|
|
+ if (NumberUtils.isConvertibleToDouble(wsTdPropertyVO.getLogValue())&&NumberUtils.isConvertibleToDouble(alarm.getMaxValue())&&
|
|
|
+ NumberUtils.isConvertibleToDouble(alarm.getMinValue())){//如果可以被转为数值
|
|
|
+ if (Double.parseDouble(wsTdPropertyVO.getLogValue())>Double.parseDouble(alarm.getMaxValue())||
|
|
|
+ Double.parseDouble(wsTdPropertyVO.getLogValue())<Double.parseDouble(alarm.getMinValue())) {//大于最大值,小于最小值
|
|
|
+ //查找近半小时的该设备该属性是否有过告警
|
|
|
+ IotAlarmRecordPageReqVO pageReqVO = new IotAlarmRecordPageReqVO();
|
|
|
+ pageReqVO.setDeviceCode(clientId);
|
|
|
+ pageReqVO.setDeviceProperty(wsTdPropertyVO.getIdentity());
|
|
|
+ LocalDateTime offset = LocalDateTimeUtil.offset(LocalDateTime.now(), -30L, ChronoUnit.MINUTES);
|
|
|
+ LocalDateTime[] createTime = new LocalDateTime[]{offset, LocalDateTime.now()};
|
|
|
+ pageReqVO.setCreateTime(createTime);
|
|
|
+ List<IotAlarmRecordDO> iotAlarmRecordDOS = iotAlarmRecordMapper.selectListAlarm(pageReqVO);
|
|
|
+ if (CollUtil.isEmpty(iotAlarmRecordDOS)) {//如果空的话报警并且插入
|
|
|
+ IotAlarmRecordDO iotAlarmRecordDO = new IotAlarmRecordDO();
|
|
|
+ iotAlarmRecordDO.setDeviceCode(clientId);
|
|
|
+ iotAlarmRecordDO.setDeviceProperty(wsTdPropertyVO.getIdentity());
|
|
|
+ iotAlarmRecordDO.setAlarmTime(DateUtil.now());
|
|
|
+ iotAlarmRecordDO.setAlarmValue(wsTdPropertyVO.getLogValue());
|
|
|
+ iotAlarmRecordDO.setDeleted(false);
|
|
|
+ iotAlarmRecordMapper.insert(iotAlarmRecordDO);
|
|
|
+ //发送告警消息通知
|
|
|
+ IotDevicePersonPageReqVO personPageReqVO = new IotDevicePersonPageReqVO();
|
|
|
+ personPageReqVO.setDeviceIds(ImmutableList.of(iotDeviceDO.getId()));
|
|
|
+ List<IotDevicePersonDO> persons = iotDevicePersonService.getPersonsByDeviceIds(personPageReqVO);
|
|
|
+ if (CollUtil.isNotEmpty(persons)) {
|
|
|
+ AdminUserRespDTO user = adminUserApi.getUser(persons.get(0).getPersonId());
|
|
|
+ if (Objects.nonNull(user)) {
|
|
|
+ pmsMessage.sendMessage(iotDeviceDO.getId(), iotDeviceDO.getDeviceCode()+iotDeviceDO.getDeviceName()+","+wsTdPropertyVO.getModelName(), PmsConstants.ALARM_MESSAGE, user.getId(), user.getMobile());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ });
|
|
|
+ });
|
|
|
+ } else {//设备没赋值,就去找分类的max及min值
|
|
|
+ List<IotAlarmSettingDO> flAlarms = iotAlarmSettingMapper.selectList("classify_id", iotDeviceDO.getAssetClass());
|
|
|
+ if (CollUtil.isNotEmpty(flAlarms)) {
|
|
|
+ wsTdPropertyVOS.forEach(wsTdPropertyVO -> {
|
|
|
+ flAlarms.stream().filter(e -> e.getPropertyCode().equals(wsTdPropertyVO.getIdentity())).findFirst().ifPresent(alarm -> {
|
|
|
+ if (NumberUtils.isConvertibleToDouble(wsTdPropertyVO.getLogValue())&&NumberUtils.isConvertibleToDouble(alarm.getMaxValue())&&
|
|
|
+ NumberUtils.isConvertibleToDouble(alarm.getMinValue())){//如果可以被转为数值
|
|
|
+ if (Double.parseDouble(wsTdPropertyVO.getLogValue())>Double.parseDouble(alarm.getMaxValue())||
|
|
|
+ Double.parseDouble(wsTdPropertyVO.getLogValue())<Double.parseDouble(alarm.getMinValue())) {//大于最大值,小于最小值
|
|
|
+ //查找近半小时的该设备该属性是否有过告警
|
|
|
+ IotAlarmRecordPageReqVO pageReqVO = new IotAlarmRecordPageReqVO();
|
|
|
+ pageReqVO.setDeviceCode(clientId);
|
|
|
+ pageReqVO.setDeviceProperty(wsTdPropertyVO.getIdentity());
|
|
|
+ LocalDateTime offset = LocalDateTimeUtil.offset(LocalDateTime.now(), 30L, ChronoUnit.MINUTES);
|
|
|
+ LocalDateTime[] createTime = new LocalDateTime[]{offset, LocalDateTime.now()};
|
|
|
+ pageReqVO.setCreateTime(createTime);
|
|
|
+ List<IotAlarmRecordDO> iotAlarmRecordDOS = iotAlarmRecordMapper.selectListAlarm(pageReqVO);
|
|
|
+ if (CollUtil.isEmpty(iotAlarmRecordDOS)) {//如果空的话报警并且插入
|
|
|
+ IotAlarmRecordDO iotAlarmRecordDO = new IotAlarmRecordDO();
|
|
|
+ iotAlarmRecordDO.setDeviceCode(clientId);
|
|
|
+ iotAlarmRecordDO.setDeviceProperty(wsTdPropertyVO.getIdentity());
|
|
|
+ iotAlarmRecordDO.setAlarmTime(DateUtil.now());
|
|
|
+ iotAlarmRecordDO.setAlarmValue(wsTdPropertyVO.getLogValue());
|
|
|
+ iotAlarmRecordDO.setDeleted(false);
|
|
|
+ iotAlarmRecordMapper.insert(iotAlarmRecordDO);
|
|
|
+ //发送告警消息通知
|
|
|
+ IotDevicePersonPageReqVO personPageReqVO = new IotDevicePersonPageReqVO();
|
|
|
+ personPageReqVO.setDeviceIds(ImmutableList.of(iotDeviceDO.getId()));
|
|
|
+ List<IotDevicePersonDO> persons = iotDevicePersonService.getPersonsByDeviceIds(personPageReqVO);
|
|
|
+ if (CollUtil.isNotEmpty(persons)) {
|
|
|
+ AdminUserRespDTO user = adminUserApi.getUser(persons.get(0).getPersonId());
|
|
|
+ if (Objects.nonNull(user)) {
|
|
|
+ pmsMessage.sendMessage(iotDeviceDO.getId(), iotDeviceDO.getDeviceCode()+iotDeviceDO.getDeviceName()+","+wsTdPropertyVO.getModelName(), PmsConstants.ALARM_MESSAGE, user.getId(), user.getMobile());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ });
|
|
|
+ });
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }});
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("❌ [{}] 处理消息失败", clientId, e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|