Browse Source

【代码优化】IoT: 基于 guava 对 producer 做 cache

puhui999 6 months ago
parent
commit
4be18af236

+ 50 - 19
yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/execute/IotRocketMQDataBridgeExecute.java

@@ -3,6 +3,9 @@ package cn.iocoder.yudao.module.iot.service.rule.execute;
 import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataBridgeDO;
 import cn.iocoder.yudao.module.iot.enums.rule.IotDataBridgTypeEnum;
 import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
 import org.apache.rocketmq.client.producer.SendResult;
@@ -11,7 +14,9 @@ import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
 import org.springframework.stereotype.Component;
 
+import java.time.Duration;
 import java.time.LocalDateTime;
+import java.util.concurrent.Executors;
 
 /**
  * RocketMQ 的 {@link IotDataBridgeExecute} 实现类
@@ -22,6 +27,36 @@ import java.time.LocalDateTime;
 @Slf4j
 public class IotRocketMQDataBridgeExecute implements IotDataBridgeExecute {
 
+    /**
+     * 针对 {@link IotDataBridgeDO.RocketMQConfig} 的 DefaultMQProducer 缓存
+     */
+    private final LoadingCache<IotDataBridgeDO.RocketMQConfig, DefaultMQProducer> PRODUCER_CACHE = CacheBuilder.newBuilder()
+            // 只阻塞当前数据加载线程,其他线程返回旧值
+            .refreshAfterWrite(Duration.ofMinutes(10))
+            // 增加移除监听器,自动关闭 producer
+            .removalListener(notification -> {
+                DefaultMQProducer producer = (DefaultMQProducer) notification.getValue();
+                if (producer != null) {
+                    try {
+                        producer.shutdown();
+                        log.info("[PRODUCER_CACHE][配置({}) 对应的 producer 已关闭]", notification.getKey());
+                    } catch (Exception e) {
+                        log.error("[PRODUCER_CACHE][配置({}) 对应的 producer 关闭失败]", notification.getKey(), e);
+                    }
+                }
+            })
+            // 通过 asyncReloading 实现全异步加载,包括 refreshAfterWrite 被阻塞的加载线程
+            .build(CacheLoader.asyncReloading(new CacheLoader<IotDataBridgeDO.RocketMQConfig, DefaultMQProducer>() {
+                @Override
+                public DefaultMQProducer load(IotDataBridgeDO.RocketMQConfig config) throws Exception {
+                    DefaultMQProducer producer = new DefaultMQProducer(config.getGroup());
+                    producer.setNamesrvAddr(config.getNameServer());
+                    producer.start();
+                    log.info("[PRODUCER_CACHE][配置({}) 对应的 producer 已创建并启动]", config);
+                    return producer;
+                }
+            }, Executors.newCachedThreadPool()));
+
     @Override
     public void execute(IotDeviceMessage message, IotDataBridgeDO dataBridge) {
         // 1.1 校验数据桥接的类型 == ROCKETMQ
@@ -33,14 +68,9 @@ public class IotRocketMQDataBridgeExecute implements IotDataBridgeExecute {
     }
 
     private void executeRocketMQ(IotDeviceMessage message, IotDataBridgeDO.RocketMQConfig config) {
-        // 1.1 创建生产者实例,指定生产者组名
-        DefaultMQProducer producer = new DefaultMQProducer(config.getGroup());
-        // TODO @puhui999:可以考虑,基于 guava 做 cache,使用 config 作为 key,然后假个 listener 超时,销毁 producer
         try {
-            // 1.2 设置 NameServer 地址
-            producer.setNamesrvAddr(config.getNameServer());
-            // 1.3 启动生产者
-            producer.start();
+            // 1. 获取或创建 Producer
+            DefaultMQProducer producer = PRODUCER_CACHE.get(config);
 
             // 2.1 创建消息对象,指定Topic、Tag和消息体
             Message msg = new Message(
@@ -58,18 +88,22 @@ public class IotRocketMQDataBridgeExecute implements IotDataBridgeExecute {
             }
         } catch (Exception e) {
             log.error("[executeRocketMQ][message({}) config({}) 发送异常]", message, config, e);
-        } finally {
-            // 3. 关闭生产者
-            producer.shutdown();
         }
     }
 
     // TODO @芋艿:测试代码,后续清理
     public static void main(String[] args) {
-        // 1. 创建 IotRocketMQDataBridgeExecute 实例
+        // 1. 创建一个共享的实例
         IotRocketMQDataBridgeExecute action = new IotRocketMQDataBridgeExecute();
 
-        // 2. 创建测试消息
+        // 2. 创建共享的配置
+        IotDataBridgeDO.RocketMQConfig config = new IotDataBridgeDO.RocketMQConfig();
+        config.setNameServer("127.0.0.1:9876");
+        config.setGroup("test-group");
+        config.setTopic("test-topic");
+        config.setTags("test-tag");
+
+        // 3. 创建共享的消息
         IotDeviceMessage message = IotDeviceMessage.builder()
                 .requestId("TEST-001")
                 .productKey("testProduct")
@@ -82,14 +116,11 @@ public class IotRocketMQDataBridgeExecute implements IotDataBridgeExecute {
                 .tenantId(1L)
                 .build();
 
-        // 3. 创建 RocketMQ 配置
-        IotDataBridgeDO.RocketMQConfig config = new IotDataBridgeDO.RocketMQConfig();
-        config.setNameServer("127.0.0.1:9876");
-        config.setGroup("test-group");
-        config.setTopic("test-topic");
-        config.setTags("test-tag");
+        // 4. 执行两次测试,验证缓存
+        log.info("[main][第一次执行,应该会创建新的 producer]");
+        action.executeRocketMQ(message, config);
 
-        // 4. 执行测试
+        log.info("[main][第二次执行,应该会复用缓存的 producer]"); 
         action.executeRocketMQ(message, config);
     }