/*
 * Decompiled with CFR 0.152.
 */
package cn.iocoder.yudao.module.iot.mqttrpc.client;

import cn.iocoder.yudao.module.iot.mqttrpc.common.RpcRequest;
import cn.iocoder.yudao.module.iot.mqttrpc.common.RpcResponse;
import cn.iocoder.yudao.module.iot.mqttrpc.common.SerializationUtils;
import cn.iocoder.yudao.module.iot.mqttrpc.config.MqttConfig;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import lombok.Generated;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttClientPersistence;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

@Service
public class RpcClient {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(RpcClient.class);
    private final MqttConfig mqttConfig;
    private final MqttClient mqttClient;
    private final ConcurrentMap<String, CompletableFuture<RpcResponse>> pendingRequests = new ConcurrentHashMap<String, CompletableFuture<RpcResponse>>();
    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

    public RpcClient(MqttConfig mqttConfig) throws MqttException {
        this.mqttConfig = mqttConfig;
        this.mqttClient = new MqttClient(mqttConfig.getBroker(), mqttConfig.getClientId(), (MqttClientPersistence)new MemoryPersistence());
        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
        mqttConnectOptions.setAutomaticReconnect(true);
        mqttConnectOptions.setCleanSession(true);
        mqttConnectOptions.setUserName(mqttConfig.getUsername());
        mqttConnectOptions.setPassword(mqttConfig.getPassword().toCharArray());
        this.mqttClient.connect(mqttConnectOptions);
    }

    @PostConstruct
    public void init() throws MqttException {
        this.mqttClient.subscribe(this.mqttConfig.getResponseTopicPrefix() + "#", this::handleResponse);
        log.info("RPC Client subscribed to topics: {}", (Object)(this.mqttConfig.getResponseTopicPrefix() + "#"));
    }

    private void handleResponse(String string, MqttMessage mqttMessage) {
        String string2 = string.substring(this.mqttConfig.getResponseTopicPrefix().length());
        RpcResponse rpcResponse = (RpcResponse)SerializationUtils.deserialize((String)new String(mqttMessage.getPayload()), RpcResponse.class);
        CompletableFuture completableFuture = (CompletableFuture)this.pendingRequests.remove(string2);
        if (completableFuture != null) {
            if (rpcResponse.getError() != null) {
                completableFuture.completeExceptionally(new RuntimeException(rpcResponse.getError()));
            } else {
                completableFuture.complete(rpcResponse);
            }
        } else {
            log.warn("Received response for unknown correlationId: {}", (Object)string2);
        }
    }

    public CompletableFuture<Object> call(String string, Object[] objectArray, int n) throws MqttException {
        String string2 = UUID.randomUUID().toString();
        String string3 = this.mqttConfig.getResponseTopicPrefix() + string2;
        RpcRequest rpcRequest = new RpcRequest(string, objectArray, string2, string3);
        String string4 = SerializationUtils.serialize((Object)rpcRequest);
        MqttMessage mqttMessage = new MqttMessage(string4.getBytes());
        mqttMessage.setQos(1);
        this.mqttClient.publish(this.mqttConfig.getRequestTopic(), mqttMessage);
        CompletableFuture completableFuture = new CompletableFuture();
        this.pendingRequests.put(string2, completableFuture);
        this.scheduler.schedule(() -> {
            CompletableFuture completableFuture = (CompletableFuture)this.pendingRequests.remove(string2);
            if (completableFuture != null) {
                completableFuture.completeExceptionally(new TimeoutException("RPC call timed out"));
            }
        }, (long)n, TimeUnit.SECONDS);
        return completableFuture.thenApply(RpcResponse::getResult);
    }

    @PreDestroy
    public void cleanup() throws MqttException {
        this.mqttClient.disconnect();
        this.scheduler.shutdown();
        log.info("RPC Client disconnected");
    }
}

