package org.apache.inlong.tubemq.client.factory;

import java.util.Iterator;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.inlong.tubemq.client.config.ConsumerConfig;
import org.apache.inlong.tubemq.client.config.TubeClientConfig;
import org.apache.inlong.tubemq.client.consumer.ClientBalanceConsumer;
import org.apache.inlong.tubemq.client.consumer.PullMessageConsumer;
import org.apache.inlong.tubemq.client.consumer.PushMessageConsumer;
import org.apache.inlong.tubemq.client.consumer.SimpleClientBalanceConsumer;
import org.apache.inlong.tubemq.client.consumer.SimplePullMessageConsumer;
import org.apache.inlong.tubemq.client.consumer.SimplePushMessageConsumer;
import org.apache.inlong.tubemq.client.exception.TubeClientException;
import org.apache.inlong.tubemq.client.producer.MessageProducer;
import org.apache.inlong.tubemq.client.producer.ProducerManager;
import org.apache.inlong.tubemq.client.producer.SimpleMessageProducer;
import org.apache.inlong.tubemq.client.producer.qltystats.DefaultBrokerRcvQltyStats;
import org.apache.inlong.tubemq.corebase.Shutdownable;
import org.apache.inlong.tubemq.corebase.cluster.MasterInfo;
import org.apache.inlong.tubemq.corerpc.RpcConfig;
import org.apache.inlong.tubemq.corerpc.RpcServiceFactory;
import org.apache.inlong.tubemq.corerpc.client.ClientFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/tubemq/client/factory/TubeBaseSessionFactory.class */
public class TubeBaseSessionFactory implements InnerSessionFactory {
    private static final Logger logger = LoggerFactory.getLogger(TubeBaseSessionFactory.class);
    private final RpcServiceFactory rpcServiceFactory;
    private final ProducerManager producerManager;
    private final TubeClientConfig tubeClientConfig;
    private final DefaultBrokerRcvQltyStats brokerRcvQltyStats;
    private final CopyOnWriteArrayList<Shutdownable> clientLst = new CopyOnWriteArrayList<>();
    private final AtomicBoolean shutdown = new AtomicBoolean(false);

    public TubeBaseSessionFactory(ClientFactory clientFactory, TubeClientConfig tubeClientConfig) throws TubeClientException {
        checkConfig(tubeClientConfig);
        this.tubeClientConfig = tubeClientConfig;
        RpcConfig rpcConfig = new RpcConfig();
        rpcConfig.put("rpc.link.quality.stats.duration", Long.valueOf(tubeClientConfig.getLinkStatsDurationMs()));
        rpcConfig.put("rpc.link.quality.forbidden.duration", Long.valueOf(tubeClientConfig.getLinkStatsForbiddenDurationMs()));
        rpcConfig.put("rpc.link.quality.max.allowed.fail.count", Integer.valueOf(tubeClientConfig.getLinkStatsMaxAllowedFailTimes()));
        rpcConfig.put("rpc.link.quality.max.fail.forbidden.rate", Double.valueOf(tubeClientConfig.getLinkStatsMaxForbiddenRate()));
        rpcConfig.put("rpc.unavailable.service.forbidden.duration", Long.valueOf(tubeClientConfig.getUnAvailableFbdDurationMs()));
        this.rpcServiceFactory = new RpcServiceFactory(clientFactory, rpcConfig);
        this.producerManager = new ProducerManager(this, this.tubeClientConfig);
        this.brokerRcvQltyStats = new DefaultBrokerRcvQltyStats(getRpcServiceFactory(), this.tubeClientConfig);
        logger.info(new StringBuilder(512).append("Created Session Factory, the config is: ").append(tubeClientConfig.toJsonString()).toString());
    }

    public TubeClientConfig getTubeClientConfig() {
        return this.tubeClientConfig;
    }

    public CopyOnWriteArrayList<Shutdownable> getCurrClients() {
        return this.clientLst;
    }

    private void checkConfig(TubeClientConfig tubeClientConfig) throws TubeClientException {
        if (tubeClientConfig == null) {
            throw new TubeClientException("null configuration");
        }
        MasterInfo masterInfo = tubeClientConfig.getMasterInfo();
        if (masterInfo == null || masterInfo.getAddrMap4Failover().isEmpty()) {
            throw new TubeClientException("Blank MasterInfo content in ClientConfig");
        }
    }

    @Override // org.apache.inlong.tubemq.client.factory.InnerSessionFactory
    public DefaultBrokerRcvQltyStats getBrokerRcvQltyStats() {
        return this.brokerRcvQltyStats;
    }

    @Override // org.apache.inlong.tubemq.client.factory.MessageSessionFactory
    public void shutdown() throws TubeClientException {
        logger.info("[SHUTDOWN_TUBE] Shutting down tube factory...");
        if (!this.shutdown.get() && this.shutdown.compareAndSet(false, true)) {
            Iterator<Shutdownable> it = this.clientLst.iterator();
            while (it.hasNext()) {
                try {
                    it.next().shutdown();
                } catch (Throwable th) {
                    logger.error("[SHUTDOWN_TUBE] child shutdown failed", th);
                }
            }
            try {
                this.producerManager.shutdown();
            } catch (Throwable th2) {
            }
            this.brokerRcvQltyStats.stopBrokerStatistic();
            try {
                this.rpcServiceFactory.destroy();
            } catch (Exception e) {
                logger.error("Fail to destroy RpcServiceFactory!", e);
            }
        }
    }

    @Override // org.apache.inlong.tubemq.client.factory.MessageSessionFactory
    public MessageProducer createProducer() throws TubeClientException {
        this.brokerRcvQltyStats.startBrokerStatistic();
        try {
            this.producerManager.start();
            return (MessageProducer) addClient(new SimpleMessageProducer(this, this.tubeClientConfig));
        } catch (Throwable th) {
            if (th instanceof TubeClientException) {
                throw ((TubeClientException) th);
            }
            throw new TubeClientException("Create Producer failure, ", th);
        }
    }

    @Override // org.apache.inlong.tubemq.client.factory.InnerSessionFactory
    public RpcServiceFactory getRpcServiceFactory() {
        return this.rpcServiceFactory;
    }

    @Override // org.apache.inlong.tubemq.client.factory.InnerSessionFactory
    public ProducerManager getProducerManager() {
        return this.producerManager;
    }

    @Override // org.apache.inlong.tubemq.client.factory.MessageSessionFactory
    public <T extends Shutdownable> void removeClient(T t) {
        this.clientLst.remove(t);
    }

    @Override // org.apache.inlong.tubemq.client.factory.MessageSessionFactory
    public PullMessageConsumer createPullConsumer(ConsumerConfig consumerConfig) throws TubeClientException {
        if (this.tubeClientConfig.getMasterInfo().equals(consumerConfig.getMasterInfo())) {
            return (PullMessageConsumer) addClient(new SimplePullMessageConsumer(this, consumerConfig));
        }
        throw new TubeClientException(new StringBuilder(512).append("consumerConfig's masterInfo not equal!").append(" SessionFactory's masterInfo is ").append(this.tubeClientConfig.getMasterInfo().getMasterClusterStr()).append(", consumerConfig's masterInfo is ").append(consumerConfig.getMasterInfo().getMasterClusterStr()).toString());
    }

    @Override // org.apache.inlong.tubemq.client.factory.MessageSessionFactory
    public PushMessageConsumer createPushConsumer(ConsumerConfig consumerConfig) throws TubeClientException {
        if (this.tubeClientConfig.getMasterInfo().equals(consumerConfig.getMasterInfo())) {
            return (PushMessageConsumer) addClient(new SimplePushMessageConsumer(this, consumerConfig));
        }
        throw new TubeClientException(new StringBuilder(512).append("consumerConfig's masterInfo not equal!").append(" SessionFactory's masterInfo is ").append(this.tubeClientConfig.getMasterInfo().getMasterClusterStr()).append(", consumerConfig's masterInfo is ").append(consumerConfig.getMasterInfo().getMasterClusterStr()).toString());
    }

    @Override // org.apache.inlong.tubemq.client.factory.MessageSessionFactory
    public ClientBalanceConsumer createBalanceConsumer(ConsumerConfig consumerConfig) throws TubeClientException {
        if (this.tubeClientConfig.getMasterInfo().equals(consumerConfig.getMasterInfo())) {
            return (ClientBalanceConsumer) addClient(new SimpleClientBalanceConsumer(this, consumerConfig));
        }
        throw new TubeClientException(new StringBuilder(512).append("consumerConfig's masterInfo not equal!").append(" SessionFactory's masterInfo is ").append(this.tubeClientConfig.getMasterInfo().getMasterClusterStr()).append(", consumerConfig's masterInfo is ").append(consumerConfig.getMasterInfo().getMasterClusterStr()).toString());
    }

    public boolean isShutdown() {
        return this.shutdown.get();
    }

    private <T extends Shutdownable> T addClient(T t) {
        this.clientLst.add(t);
        return t;
    }
}
