|
@@ -0,0 +1,476 @@
|
|
|
|
|
+package cn.iocoder.yudao.module.pms.abc.config;
|
|
|
|
|
+
|
|
|
|
|
+import cn.hutool.core.collection.CollUtil;
|
|
|
|
|
+import cn.hutool.core.date.DateUtil;
|
|
|
|
|
+import cn.hutool.core.date.LocalDateTimeUtil;
|
|
|
|
|
+import cn.iocoder.yudao.framework.common.util.number.NumberUtils;
|
|
|
|
|
+import cn.iocoder.yudao.framework.tenant.core.aop.TenantIgnore;
|
|
|
|
|
+import cn.iocoder.yudao.framework.tenant.core.util.TenantUtils;
|
|
|
|
|
+import cn.iocoder.yudao.module.pms.constant.PmsConstants;
|
|
|
|
|
+import cn.iocoder.yudao.module.pms.controller.admin.alarm.vo.IotAlarmRecordPageReqVO;
|
|
|
|
|
+import cn.iocoder.yudao.module.pms.controller.admin.alarm.vo.WsTdPropertyVO;
|
|
|
|
|
+import cn.iocoder.yudao.module.pms.controller.admin.iotdeviceperson.vo.IotDevicePersonPageReqVO;
|
|
|
|
|
+import cn.iocoder.yudao.module.pms.dal.dataobject.IotDeviceDO;
|
|
|
|
|
+import cn.iocoder.yudao.module.pms.dal.dataobject.alarm.IotAlarmRecordDO;
|
|
|
|
|
+import cn.iocoder.yudao.module.pms.dal.dataobject.alarm.IotAlarmSettingDO;
|
|
|
|
|
+import cn.iocoder.yudao.module.pms.dal.dataobject.iotdeviceperson.IotDevicePersonDO;
|
|
|
|
|
+import cn.iocoder.yudao.module.pms.dal.mysql.IotDeviceMapper;
|
|
|
|
|
+import cn.iocoder.yudao.module.pms.dal.mysql.alarm.IotAlarmRecordMapper;
|
|
|
|
|
+import cn.iocoder.yudao.module.pms.dal.mysql.alarm.IotAlarmSettingMapper;
|
|
|
|
|
+import cn.iocoder.yudao.module.pms.message.PmsMessage;
|
|
|
|
|
+import cn.iocoder.yudao.module.pms.service.iotdeviceperson.IotDevicePersonService;
|
|
|
|
|
+import cn.iocoder.yudao.module.system.api.user.AdminUserApi;
|
|
|
|
|
+import cn.iocoder.yudao.module.system.api.user.dto.AdminUserRespDTO;
|
|
|
|
|
+import com.alibaba.fastjson.JSON;
|
|
|
|
|
+import com.google.common.collect.ImmutableList;
|
|
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
|
|
+import org.eclipse.paho.client.mqttv3.*;
|
|
|
|
|
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
|
|
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
|
|
+import org.springframework.stereotype.Component;
|
|
|
|
|
+
|
|
|
|
|
+import javax.annotation.PostConstruct;
|
|
|
|
|
+import javax.annotation.PreDestroy;
|
|
|
|
|
+import javax.annotation.Resource;
|
|
|
|
|
+import java.nio.charset.StandardCharsets;
|
|
|
|
|
+import java.time.LocalDateTime;
|
|
|
|
|
+import java.time.temporal.ChronoUnit;
|
|
|
|
|
+import java.util.Arrays;
|
|
|
|
|
+import java.util.List;
|
|
|
|
|
+import java.util.Objects;
|
|
|
|
|
+import java.util.UUID;
|
|
|
|
|
+import java.util.concurrent.ConcurrentHashMap;
|
|
|
|
|
+import java.util.concurrent.Executors;
|
|
|
|
|
+import java.util.concurrent.ScheduledExecutorService;
|
|
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
|
|
+
|
|
|
|
|
+@Slf4j
|
|
|
|
|
+@Component
|
|
|
|
|
+public class MqttClientManager {
|
|
|
|
|
+ @Autowired
|
|
|
|
|
+ private MqttProperties mqttProperties;
|
|
|
|
|
+
|
|
|
|
|
+ private MqttClient mqttClient;
|
|
|
|
|
+ // 存储「需要订阅的 Topic + QoS」(核心:重连时重新订阅)
|
|
|
|
|
+ private final ConcurrentHashMap<String, Integer> needSubscribeTopics = new ConcurrentHashMap<>();
|
|
|
|
|
+ // 连接重试线程池(避免重复创建)
|
|
|
|
|
+ private final ScheduledExecutorService connectRetryExecutor = Executors.newSingleThreadScheduledExecutor();
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 初始化 MQTT 连接
|
|
|
|
|
+ */
|
|
|
|
|
+ @PostConstruct
|
|
|
|
|
+ public void init() {
|
|
|
|
|
+ // 初始化连接
|
|
|
|
|
+ connectMqtt();
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 建立 MQTT 连接(抽离独立方法,方便重试/重连调用)
|
|
|
|
|
+ */
|
|
|
|
|
+ private void connectMqtt() {
|
|
|
|
|
+ // 若已连接,直接返回
|
|
|
|
|
+ if (mqttClient != null && mqttClient.isConnected()) {
|
|
|
|
|
+ log.warn("MQTT 客户端已处于连接状态,跳过连接");
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ try {
|
|
|
|
|
+ // 1. 生成客户端ID(避免重复)
|
|
|
|
|
+ String clientId = mqttProperties.getClientIdPrefix() + UUID.randomUUID().toString().replace("-", "");
|
|
|
|
|
+ MemoryPersistence persistence = new MemoryPersistence();
|
|
|
|
|
+
|
|
|
|
|
+ // 2. 创建 MQTT 客户端
|
|
|
|
|
+ mqttClient = new MqttClient(mqttProperties.getHost(), clientId, persistence);
|
|
|
|
|
+
|
|
|
|
|
+ // 3. 配置连接参数
|
|
|
|
|
+ MqttConnectOptions connectOptions = new MqttConnectOptions();
|
|
|
|
|
+ connectOptions.setCleanSession(mqttProperties.isCleanSession());
|
|
|
|
|
+ connectOptions.setUserName(mqttProperties.getUsername());
|
|
|
|
|
+ connectOptions.setPassword(mqttProperties.getPassword().toCharArray());
|
|
|
|
|
+ connectOptions.setConnectionTimeout(mqttProperties.getConnectTimeout() / 1000); // 单位:秒
|
|
|
|
|
+ connectOptions.setAutomaticReconnect(true); // 开启自动重连
|
|
|
|
|
+ connectOptions.setMaxReconnectDelay(mqttProperties.getReconnectPeriod()); // 最大重连延迟
|
|
|
|
|
+
|
|
|
|
|
+ // 4. 设置回调(核心:重连成功后重新订阅)
|
|
|
|
|
+ mqttClient.setCallback(new MqttCallbackExtended() {
|
|
|
|
|
+ // 重连/首次连接成功回调(关键:MqttCallbackExtended 才有的方法)
|
|
|
|
|
+ @Override
|
|
|
|
|
+ public void connectComplete(boolean reconnect, String serverURI) {
|
|
|
|
|
+ if (reconnect) {
|
|
|
|
|
+ log.info("MQTT 重连成功,服务器地址:{}", serverURI);
|
|
|
|
|
+ } else {
|
|
|
|
|
+ log.info("MQTT 首次连接成功,服务器地址:{}", serverURI);
|
|
|
|
|
+ }
|
|
|
|
|
+ // 无论首次连接还是重连,都强制重新订阅所有 Topic(避免丢失)
|
|
|
|
|
+ subscribeAllNeededTopics();
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 连接断开回调
|
|
|
|
|
+ @Override
|
|
|
|
|
+ public void connectionLost(Throwable cause) {
|
|
|
|
|
+ log.error("MQTT 连接断开,原因:{}", cause.getMessage(), cause);
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 消息到达回调
|
|
|
|
|
+ @Override
|
|
|
|
|
+ public void messageArrived(String topic, MqttMessage message) throws Exception {
|
|
|
|
|
+ handleReceivedMessage(topic, message);
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 消息发布完成回调
|
|
|
|
|
+ @Override
|
|
|
|
|
+ public void deliveryComplete(IMqttDeliveryToken token) {
|
|
|
|
|
+ // 无需处理发布回调(当前仅订阅)
|
|
|
|
|
+ }
|
|
|
|
|
+ });
|
|
|
|
|
+
|
|
|
|
|
+ // 5. 建立连接
|
|
|
|
|
+ mqttClient.connect(connectOptions);
|
|
|
|
|
+ log.info("MQTT 客户端连接成功,客户端ID:{}", clientId);
|
|
|
|
|
+
|
|
|
|
|
+ } catch (MqttException e) {
|
|
|
|
|
+ log.error("MQTT 连接失败,错误码:{},5秒后重试", e.getReasonCode(), e);
|
|
|
|
|
+ // 连接失败,5秒后重试
|
|
|
|
|
+ connectRetryExecutor.schedule(this::connectMqtt, 5, TimeUnit.SECONDS);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 批量订阅 Topic(核心:先记录,再订阅)
|
|
|
|
|
+ * @param topics 要订阅的 Topic 数组
|
|
|
|
|
+ */
|
|
|
|
|
+ public void batchSubscribe(String[] topics) {
|
|
|
|
|
+ batchSubscribe(topics, mqttProperties.getQos());
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 批量订阅 Topic(指定 QoS)
|
|
|
|
|
+ * @param topics 要订阅的 Topic 数组
|
|
|
|
|
+ * @param qos QoS 级别(0/1/2)
|
|
|
|
|
+ */
|
|
|
|
|
+ public void batchSubscribe(String[] topics, int qos) {
|
|
|
|
|
+ if (topics == null || topics.length == 0) {
|
|
|
|
|
+ log.warn("批量订阅的 Topic 列表为空,跳过");
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 1. 先记录需要订阅的 Topic(持久化,重连时使用)
|
|
|
|
|
+ for (String topic : topics) {
|
|
|
|
|
+ if (topic == null || topic.trim().isEmpty()) {
|
|
|
|
|
+ log.warn("跳过空的 Topic 订阅");
|
|
|
|
|
+ continue;
|
|
|
|
|
+ }
|
|
|
|
|
+ needSubscribeTopics.put(topic.trim(), qos);
|
|
|
|
|
+ log.debug("已记录需要订阅的 Topic:{},QoS:{}", topic, qos);
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 2. 立即订阅(如果当前已连接)
|
|
|
|
|
+ if (mqttClient != null && mqttClient.isConnected()) {
|
|
|
|
|
+ subscribeTopics(topics, qos);
|
|
|
|
|
+ } else {
|
|
|
|
|
+ log.warn("MQTT 客户端未连接,已记录 Topic 待连接后自动订阅:{}", Arrays.toString(topics));
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 单个订阅 Topic
|
|
|
|
|
+ * @param topic 要订阅的 Topic
|
|
|
|
|
+ */
|
|
|
|
|
+ public void subscribe(String topic) {
|
|
|
|
|
+ subscribe(topic, mqttProperties.getQos());
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 单个订阅 Topic(指定 QoS)
|
|
|
|
|
+ * @param topic 要订阅的 Topic
|
|
|
|
|
+ * @param qos QoS 级别
|
|
|
|
|
+ */
|
|
|
|
|
+ public void subscribe(String topic, int qos) {
|
|
|
|
|
+ if (topic == null || topic.trim().isEmpty()) {
|
|
|
|
|
+ log.warn("订阅的 Topic 为空,跳过");
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+ topic = topic.trim();
|
|
|
|
|
+
|
|
|
|
|
+ // 1. 记录需要订阅的 Topic
|
|
|
|
|
+ needSubscribeTopics.put(topic, qos);
|
|
|
|
|
+
|
|
|
|
|
+ // 2. 立即订阅(如果已连接)
|
|
|
|
|
+ if (mqttClient != null && mqttClient.isConnected()) {
|
|
|
|
|
+ try {
|
|
|
|
|
+ mqttClient.subscribe(topic, qos);
|
|
|
|
|
+ log.info("MQTT 订阅 Topic 成功:{},QoS:{}", topic, qos);
|
|
|
|
|
+ } catch (MqttException e) {
|
|
|
|
|
+ log.error("MQTT 订阅 Topic 失败:{},QoS:{}", topic, qos, e);
|
|
|
|
|
+ }
|
|
|
|
|
+ } else {
|
|
|
|
|
+ log.warn("MQTT 客户端未连接,已记录 Topic 待连接后自动订阅:{}", topic);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 取消单个 Topic 订阅
|
|
|
|
|
+ * @param topic 要取消的 Topic
|
|
|
|
|
+ */
|
|
|
|
|
+ public void unsubscribe(String topic) {
|
|
|
|
|
+ if (topic == null || topic.trim().isEmpty()) {
|
|
|
|
|
+ log.warn("取消订阅的 Topic 为空,跳过");
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+ topic = topic.trim();
|
|
|
|
|
+
|
|
|
|
|
+ // 1. 移除需要订阅的记录
|
|
|
|
|
+ needSubscribeTopics.remove(topic);
|
|
|
|
|
+
|
|
|
|
|
+ // 2. 立即取消订阅(如果已连接)
|
|
|
|
|
+ if (mqttClient != null && mqttClient.isConnected()) {
|
|
|
|
|
+ try {
|
|
|
|
|
+ mqttClient.unsubscribe(topic);
|
|
|
|
|
+ log.info("MQTT 取消订阅 Topic 成功:{}", topic);
|
|
|
|
|
+ } catch (MqttException e) {
|
|
|
|
|
+ log.error("MQTT 取消订阅 Topic 失败:{}", topic, e);
|
|
|
|
|
+ }
|
|
|
|
|
+ } else {
|
|
|
|
|
+ log.warn("MQTT 客户端未连接,已移除 Topic 订阅记录:{}", topic);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 订阅所有已记录的 Topic(核心:重连/首次连接时调用)
|
|
|
|
|
+ */
|
|
|
|
|
+ private void subscribeAllNeededTopics() {
|
|
|
|
|
+ if (needSubscribeTopics.isEmpty()) {
|
|
|
|
|
+ log.debug("无需要订阅的 Topic,跳过重订阅");
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+ if (mqttClient == null || !mqttClient.isConnected()) {
|
|
|
|
|
+ log.warn("MQTT 未连接,无法订阅 Topic");
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 拆分 Topic 和 QoS 数组
|
|
|
|
|
+ String[] topics = needSubscribeTopics.keySet().toArray(new String[0]);
|
|
|
|
|
+ int[] qosArr = needSubscribeTopics.values().stream().mapToInt(Integer::intValue).toArray();
|
|
|
|
|
+
|
|
|
|
|
+ try {
|
|
|
|
|
+ // 强制订阅(即使已订阅,重复调用也不会报错)
|
|
|
|
|
+ mqttClient.subscribe(topics, qosArr);
|
|
|
|
|
+ log.info("MQTT 批量订阅/重订阅成功,共 {} 个 Topic:{}", topics.length, Arrays.toString(topics));
|
|
|
|
|
+ } catch (MqttException e) {
|
|
|
|
|
+ log.error("MQTT 重订阅 Topic 失败,5秒后重试", e);
|
|
|
|
|
+ // 订阅失败,5秒后重试
|
|
|
|
|
+ connectRetryExecutor.schedule(this::subscribeAllNeededTopics, 5, TimeUnit.SECONDS);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 直接订阅指定 Topic 数组(底层调用)
|
|
|
|
|
+ */
|
|
|
|
|
+ private void subscribeTopics(String[] topics, int qos) {
|
|
|
|
|
+ int[] qosArr = new int[topics.length];
|
|
|
|
|
+ Arrays.fill(qosArr, qos);
|
|
|
|
|
+ try {
|
|
|
|
|
+ mqttClient.subscribe(topics, qosArr);
|
|
|
|
|
+ log.info("MQTT 批量订阅成功,Topic 列表:{},QoS:{}", Arrays.toString(topics), qos);
|
|
|
|
|
+ } catch (MqttException e) {
|
|
|
|
|
+ log.error("MQTT 批量订阅失败,Topic 列表:{}", Arrays.toString(topics), e);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 处理收到的 MQTT 消息(核心业务逻辑)
|
|
|
|
|
+ */
|
|
|
|
|
+ private void handleReceivedMessage(String topic, MqttMessage message) {
|
|
|
|
|
+ try {
|
|
|
|
|
+ // 1. 基础校验:是否是需要处理的 Topic
|
|
|
|
|
+ if (!needSubscribeTopics.containsKey(topic)) {
|
|
|
|
|
+ log.warn("收到未订阅的 Topic 消息,忽略处理:{}", topic);
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 2. 解析消息体(指定 UTF-8 编码,避免乱码)
|
|
|
|
|
+ String payload = new String(message.getPayload(), StandardCharsets.UTF_8);
|
|
|
|
|
+ log.debug("收到 MQTT 消息,Topic:{},内容:{}", topic, payload);
|
|
|
|
|
+
|
|
|
|
|
+ // 3. 结构化解析(根据实际消息格式调整)
|
|
|
|
|
+ String message1 = JSON.parseObject(payload, String.class);
|
|
|
|
|
+
|
|
|
|
|
+ //分发逻辑处理
|
|
|
|
|
+ String[] split = topic.split("/");
|
|
|
|
|
+ handleReceivedMessagePms(split[2], message1);
|
|
|
|
|
+
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ log.error("处理 MQTT 消息失败,Topic:{}", topic, e);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ @Resource
|
|
|
|
|
+ private IotDeviceMapper iotDeviceMapper;
|
|
|
|
|
+ @Resource
|
|
|
|
|
+ private IotAlarmSettingMapper iotAlarmSettingMapper;
|
|
|
|
|
+ @Resource
|
|
|
|
|
+ private IotAlarmRecordMapper iotAlarmRecordMapper;
|
|
|
|
|
+ @Resource
|
|
|
|
|
+ private IotDevicePersonService iotDevicePersonService;
|
|
|
|
|
+ @Resource
|
|
|
|
|
+ private AdminUserApi adminUserApi;
|
|
|
|
|
+ @Resource
|
|
|
|
|
+ private PmsMessage pmsMessage;
|
|
|
|
|
+
|
|
|
|
|
+ @TenantIgnore
|
|
|
|
|
+ private void handleReceivedMessagePms(String clientId, String message) {
|
|
|
|
|
+ try {
|
|
|
|
|
+ if (!message.startsWith("【连接成功】")){
|
|
|
|
|
+ List<WsTdPropertyVO> wsTdPropertyVOS = JSON.parseArray(message, WsTdPropertyVO.class);
|
|
|
|
|
+ //todo websocket
|
|
|
|
|
+ TenantUtils.executeIgnore(() ->{
|
|
|
|
|
+ List<IotDeviceDO> iotDeviceDOS = iotDeviceMapper.selectByCodeIn(ImmutableList.of(clientId));
|
|
|
|
|
+ if (CollUtil.isNotEmpty(iotDeviceDOS)) {
|
|
|
|
|
+ IotDeviceDO iotDeviceDO = iotDeviceDOS.get(0);
|
|
|
|
|
+ List<IotAlarmSettingDO> alarms = iotAlarmSettingMapper.selectList("device_id", iotDeviceDO.getId());
|
|
|
|
|
+ if (CollUtil.isNotEmpty(alarms)) {//设备赋值了
|
|
|
|
|
+ wsTdPropertyVOS.forEach(wsTdPropertyVO -> {
|
|
|
|
|
+ alarms.stream().filter(e -> e.getPropertyCode().equals(wsTdPropertyVO.getIdentity())).findFirst().ifPresent(alarm -> {
|
|
|
|
|
+ if (NumberUtils.isConvertibleToDouble(wsTdPropertyVO.getLogValue())&&NumberUtils.isConvertibleToDouble(alarm.getMaxValue())&&
|
|
|
|
|
+ NumberUtils.isConvertibleToDouble(alarm.getMinValue())){//如果可以被转为数值
|
|
|
|
|
+ if (Double.parseDouble(wsTdPropertyVO.getLogValue())>Double.parseDouble(alarm.getMaxValue())||
|
|
|
|
|
+ Double.parseDouble(wsTdPropertyVO.getLogValue())<Double.parseDouble(alarm.getMinValue())) {//大于最大值,小于最小值
|
|
|
|
|
+ //查找近半小时的该设备该属性是否有过告警
|
|
|
|
|
+ IotAlarmRecordPageReqVO pageReqVO = new IotAlarmRecordPageReqVO();
|
|
|
|
|
+ pageReqVO.setDeviceCode(clientId);
|
|
|
|
|
+ pageReqVO.setDeviceProperty(wsTdPropertyVO.getIdentity());
|
|
|
|
|
+ LocalDateTime offset = LocalDateTimeUtil.offset(LocalDateTime.now(), -30L, ChronoUnit.MINUTES);
|
|
|
|
|
+ LocalDateTime[] createTime = new LocalDateTime[]{offset, LocalDateTime.now()};
|
|
|
|
|
+ pageReqVO.setCreateTime(createTime);
|
|
|
|
|
+ List<IotAlarmRecordDO> iotAlarmRecordDOS = iotAlarmRecordMapper.selectListAlarm(pageReqVO);
|
|
|
|
|
+ if (CollUtil.isEmpty(iotAlarmRecordDOS)) {//如果空的话报警并且插入
|
|
|
|
|
+ IotAlarmRecordDO iotAlarmRecordDO = new IotAlarmRecordDO();
|
|
|
|
|
+ iotAlarmRecordDO.setDeviceCode(clientId);
|
|
|
|
|
+ iotAlarmRecordDO.setDeviceProperty(wsTdPropertyVO.getIdentity());
|
|
|
|
|
+ iotAlarmRecordDO.setAlarmTime(DateUtil.now());
|
|
|
|
|
+ iotAlarmRecordDO.setAlarmValue(wsTdPropertyVO.getLogValue());
|
|
|
|
|
+ iotAlarmRecordDO.setDeleted(false);
|
|
|
|
|
+ iotAlarmRecordMapper.insert(iotAlarmRecordDO);
|
|
|
|
|
+ //发送告警消息通知
|
|
|
|
|
+ IotDevicePersonPageReqVO personPageReqVO = new IotDevicePersonPageReqVO();
|
|
|
|
|
+ personPageReqVO.setDeviceIds(ImmutableList.of(iotDeviceDO.getId()));
|
|
|
|
|
+ List<IotDevicePersonDO> persons = iotDevicePersonService.getPersonsByDeviceIds(personPageReqVO);
|
|
|
|
|
+ if (CollUtil.isNotEmpty(persons)) {
|
|
|
|
|
+ AdminUserRespDTO user = adminUserApi.getUser(persons.get(0).getPersonId());
|
|
|
|
|
+ if (Objects.nonNull(user)) {
|
|
|
|
|
+ pmsMessage.sendMessage(iotDeviceDO.getId(), iotDeviceDO.getDeviceCode()+iotDeviceDO.getDeviceName()+","+wsTdPropertyVO.getModelName(), PmsConstants.ALARM_MESSAGE, user.getId(), user.getMobile());
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ });
|
|
|
|
|
+ });
|
|
|
|
|
+ } else {//设备没赋值,就去找分类的max及min值
|
|
|
|
|
+ List<IotAlarmSettingDO> flAlarms = iotAlarmSettingMapper.selectList("classify_id", iotDeviceDO.getAssetClass());
|
|
|
|
|
+ if (CollUtil.isNotEmpty(flAlarms)) {
|
|
|
|
|
+ wsTdPropertyVOS.forEach(wsTdPropertyVO -> {
|
|
|
|
|
+ flAlarms.stream().filter(e -> e.getPropertyCode().equals(wsTdPropertyVO.getIdentity())).findFirst().ifPresent(alarm -> {
|
|
|
|
|
+ if (NumberUtils.isConvertibleToDouble(wsTdPropertyVO.getLogValue())&&NumberUtils.isConvertibleToDouble(alarm.getMaxValue())&&
|
|
|
|
|
+ NumberUtils.isConvertibleToDouble(alarm.getMinValue())){//如果可以被转为数值
|
|
|
|
|
+ if (Double.parseDouble(wsTdPropertyVO.getLogValue())>Double.parseDouble(alarm.getMaxValue())||
|
|
|
|
|
+ Double.parseDouble(wsTdPropertyVO.getLogValue())<Double.parseDouble(alarm.getMinValue())) {//大于最大值,小于最小值
|
|
|
|
|
+ //查找近半小时的该设备该属性是否有过告警
|
|
|
|
|
+ IotAlarmRecordPageReqVO pageReqVO = new IotAlarmRecordPageReqVO();
|
|
|
|
|
+ pageReqVO.setDeviceCode(clientId);
|
|
|
|
|
+ pageReqVO.setDeviceProperty(wsTdPropertyVO.getIdentity());
|
|
|
|
|
+ LocalDateTime offset = LocalDateTimeUtil.offset(LocalDateTime.now(), 30L, ChronoUnit.MINUTES);
|
|
|
|
|
+ LocalDateTime[] createTime = new LocalDateTime[]{offset, LocalDateTime.now()};
|
|
|
|
|
+ pageReqVO.setCreateTime(createTime);
|
|
|
|
|
+ List<IotAlarmRecordDO> iotAlarmRecordDOS = iotAlarmRecordMapper.selectListAlarm(pageReqVO);
|
|
|
|
|
+ if (CollUtil.isEmpty(iotAlarmRecordDOS)) {//如果空的话报警并且插入
|
|
|
|
|
+ IotAlarmRecordDO iotAlarmRecordDO = new IotAlarmRecordDO();
|
|
|
|
|
+ iotAlarmRecordDO.setDeviceCode(clientId);
|
|
|
|
|
+ iotAlarmRecordDO.setDeviceProperty(wsTdPropertyVO.getIdentity());
|
|
|
|
|
+ iotAlarmRecordDO.setAlarmTime(DateUtil.now());
|
|
|
|
|
+ iotAlarmRecordDO.setAlarmValue(wsTdPropertyVO.getLogValue());
|
|
|
|
|
+ iotAlarmRecordDO.setDeleted(false);
|
|
|
|
|
+ iotAlarmRecordMapper.insert(iotAlarmRecordDO);
|
|
|
|
|
+ //发送告警消息通知
|
|
|
|
|
+ IotDevicePersonPageReqVO personPageReqVO = new IotDevicePersonPageReqVO();
|
|
|
|
|
+ personPageReqVO.setDeviceIds(ImmutableList.of(iotDeviceDO.getId()));
|
|
|
|
|
+ List<IotDevicePersonDO> persons = iotDevicePersonService.getPersonsByDeviceIds(personPageReqVO);
|
|
|
|
|
+ if (CollUtil.isNotEmpty(persons)) {
|
|
|
|
|
+ AdminUserRespDTO user = adminUserApi.getUser(persons.get(0).getPersonId());
|
|
|
|
|
+ if (Objects.nonNull(user)) {
|
|
|
|
|
+ pmsMessage.sendMessage(iotDeviceDO.getId(), iotDeviceDO.getDeviceCode()+iotDeviceDO.getDeviceName()+","+wsTdPropertyVO.getModelName(), PmsConstants.ALARM_MESSAGE, user.getId(), user.getMobile());
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ });
|
|
|
|
|
+ });
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }});
|
|
|
|
|
+ }
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ log.error("❌ [{}] 处理消息失败", clientId, e);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // ---------------------- 内部类:消息结构 ----------------------
|
|
|
|
|
+ private static class MqttDeviceMessage {
|
|
|
|
|
+ private String deviceCode; // 设备编码
|
|
|
|
|
+ private double value; // 设备数值
|
|
|
|
|
+ private long timestamp; // 时间戳
|
|
|
|
|
+
|
|
|
|
|
+ // Getter & Setter
|
|
|
|
|
+ public String getDeviceCode() { return deviceCode; }
|
|
|
|
|
+ public void setDeviceCode(String deviceCode) { this.deviceCode = deviceCode; }
|
|
|
|
|
+ public double getValue() { return value; }
|
|
|
|
|
+ public void setValue(double value) { this.value = value; }
|
|
|
|
|
+ public long getTimestamp() { return timestamp; }
|
|
|
|
|
+ public void setTimestamp(long timestamp) { this.timestamp = timestamp; }
|
|
|
|
|
+
|
|
|
|
|
+ @Override
|
|
|
|
|
+ public String toString() {
|
|
|
|
|
+ return "MqttDeviceMessage{" +
|
|
|
|
|
+ "deviceCode='" + deviceCode + '\'' +
|
|
|
|
|
+ ", value=" + value +
|
|
|
|
|
+ ", timestamp=" + timestamp +
|
|
|
|
|
+ '}';
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // ---------------------- 资源销毁 ----------------------
|
|
|
|
|
+ @PreDestroy
|
|
|
|
|
+ public void destroy() {
|
|
|
|
|
+ // 关闭重试线程池
|
|
|
|
|
+ connectRetryExecutor.shutdown();
|
|
|
|
|
+ try {
|
|
|
|
|
+ if (!connectRetryExecutor.awaitTermination(3, TimeUnit.SECONDS)) {
|
|
|
|
|
+ connectRetryExecutor.shutdownNow();
|
|
|
|
|
+ }
|
|
|
|
|
+ } catch (InterruptedException e) {
|
|
|
|
|
+ connectRetryExecutor.shutdownNow();
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 断开 MQTT 连接
|
|
|
|
|
+ if (mqttClient != null && mqttClient.isConnected()) {
|
|
|
|
|
+ try {
|
|
|
|
|
+ mqttClient.disconnect();
|
|
|
|
|
+ mqttClient.close();
|
|
|
|
|
+ needSubscribeTopics.clear();
|
|
|
|
|
+ log.info("MQTT 客户端已断开连接,资源已清理");
|
|
|
|
|
+ } catch (MqttException e) {
|
|
|
|
|
+ log.error("MQTT 断开连接失败", e);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // ---------------------- 辅助方法 ----------------------
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 获取 MQTT 连接状态
|
|
|
|
|
+ */
|
|
|
|
|
+ public boolean isConnected() {
|
|
|
|
|
+ return mqttClient != null && mqttClient.isConnected();
|
|
|
|
|
+ }
|
|
|
|
|
+}
|