package org.apache.inlong.tubemq.client.consumer;

import java.util.Arrays;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.inlong.tubemq.client.config.ConsumerConfig;
import org.apache.inlong.tubemq.client.exception.TubeClientException;
import org.apache.inlong.tubemq.corebase.cluster.Partition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/tubemq/client/consumer/MessageFetchManager.class */
public class MessageFetchManager {
    private static final Logger logger = LoggerFactory.getLogger(MessageFetchManager.class);
    private final ConsumerConfig consumerConfig;
    private final SimplePushMessageConsumer pushConsumer;
    private Thread[] fetchWorkerPool;
    private final ConcurrentHashMap<Long, Integer> fetchWorkerStatusMap = new ConcurrentHashMap<>();
    private AtomicInteger managerStatus = new AtomicInteger(-1);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/inlong/tubemq/client/consumer/MessageFetchManager$FetchTaskWorker.class */
    public class FetchTaskWorker implements Runnable {
        private FetchTaskWorker() {
        }

        @Override // java.lang.Runnable
        public void run() {
            StringBuilder sb = new StringBuilder(256);
            Long valueOf = Long.valueOf(Thread.currentThread().getId());
            MessageFetchManager.this.fetchWorkerStatusMap.put(valueOf, 0);
            while (!MessageFetchManager.this.isShutdown()) {
                PartitionSelectResult partitionSelectResult = null;
                MessageFetchManager.this.fetchWorkerStatusMap.put(valueOf, 0);
                try {
                } catch (Throwable th) {
                    if (partitionSelectResult != null) {
                        MessageFetchManager.this.pushConsumer.getBaseConsumer().pushReqReleasePartition(partitionSelectResult.getPartition().getPartitionKey(), partitionSelectResult.getUsedToken(), false);
                    }
                    sb.delete(0, sb.length());
                    MessageFetchManager.logger.warn(sb.append("Thread {} has been interrupted 3.").append(Thread.currentThread().getName()).toString());
                    sb.delete(0, sb.length());
                }
                if (MessageFetchManager.this.isShutdown()) {
                    break;
                }
                MessageFetchManager.this.fetchWorkerStatusMap.put(valueOf, 1);
                MessageFetchManager.this.pushConsumer.allowConsumeWait();
                partitionSelectResult = MessageFetchManager.this.pushConsumer.getBaseConsumer().pushSelectPartition();
                if (partitionSelectResult != null) {
                    Partition partition = partitionSelectResult.getPartition();
                    long usedToken = partitionSelectResult.getUsedToken();
                    boolean isLastPackConsumed = partitionSelectResult.isLastPackConsumed();
                    if (MessageFetchManager.this.isShutdown()) {
                        MessageFetchManager.this.pushConsumer.getBaseConsumer().pushReqReleasePartition(partition.getPartitionKey(), usedToken, isLastPackConsumed);
                        partitionSelectResult = null;
                        break;
                    } else if (MessageFetchManager.this.pushConsumer.isConsumePaused()) {
                        boolean isLastPackConsumed2 = partitionSelectResult.isLastPackConsumed();
                        if (isLastPackConsumed2) {
                            isLastPackConsumed2 = MessageFetchManager.this.pushConsumer.getBaseConsumer().flushLastRequest(partition);
                        }
                        MessageFetchManager.this.pushConsumer.getBaseConsumer().pushReqReleasePartition(partition.getPartitionKey(), usedToken, isLastPackConsumed2);
                    } else {
                        MessageFetchManager.this.fetchWorkerStatusMap.put(valueOf, 2);
                        if (partitionSelectResult != null) {
                            MessageFetchManager.this.pushConsumer.processRequest(partitionSelectResult, sb);
                        }
                    }
                }
            }
            MessageFetchManager.this.fetchWorkerStatusMap.remove(valueOf);
        }
    }

    public MessageFetchManager(ConsumerConfig consumerConfig, SimplePushMessageConsumer simplePushMessageConsumer) {
        this.consumerConfig = consumerConfig;
        this.pushConsumer = simplePushMessageConsumer;
    }

    public void startFetchWorkers() throws TubeClientException {
        this.pushConsumer.getBaseConsumer().checkClientRunning();
        if (this.managerStatus.get() == 1) {
            logger.info("Duplicated call startFetchWorkers, MessageFetchManager has started !");
            return;
        }
        if (this.managerStatus.compareAndSet(-1, 1)) {
            StringBuilder sb = new StringBuilder(256);
            logger.info("Starting Fetch Worker Pool !");
            this.fetchWorkerPool = new Thread[this.consumerConfig.getPushFetchThreadCnt()];
            logger.info(sb.append("Prepare to start Fetch Worker Pool, total count:").append(this.fetchWorkerPool.length).toString());
            sb.delete(0, sb.length());
            for (int i = 0; i < this.fetchWorkerPool.length; i++) {
                this.fetchWorkerPool[i] = new Thread(new FetchTaskWorker());
                this.fetchWorkerStatusMap.put(Long.valueOf(this.fetchWorkerPool[i].getId()), -1);
                this.fetchWorkerPool[i].setName(sb.append("Fetch_Worker_").append(this.consumerConfig.getConsumerGroup()).append("-").append(i).toString());
                sb.delete(0, sb.length());
            }
            for (Thread thread : this.fetchWorkerPool) {
                thread.start();
            }
            logger.info("Fetch Worker Pool started !");
        }
    }

    public boolean isShutdown() {
        return this.managerStatus.get() == 0;
    }

    public void stopFetchWorkers(boolean z) throws InterruptedException {
        if (z) {
            if (this.managerStatus.get() == 0) {
                return;
            }
            this.managerStatus.set(0);
            if (this.pushConsumer.isConsumePaused()) {
                this.pushConsumer.resumeConsume();
                return;
            }
            return;
        }
        this.managerStatus.set(0);
        if (this.fetchWorkerPool == null) {
            return;
        }
        StringBuilder sb = new StringBuilder(256);
        logger.info(sb.append("[STOP_FetchWorker] All fetch workers:").append(Arrays.toString(this.fetchWorkerPool)).toString());
        sb.delete(0, sb.length());
        if (this.pushConsumer.isConsumePaused()) {
            this.pushConsumer.resumeConsume();
        }
        logger.info("[STOP_FetchWorker] Wait all fetch workers exist:");
        if (waitAllFetchRequestHolds(this.consumerConfig.getPushListenerWaitPeriodMs())) {
            for (Thread thread : this.fetchWorkerPool) {
                if (thread != null) {
                    thread.interrupt();
                }
            }
            for (Thread thread2 : this.fetchWorkerPool) {
                if (thread2 != null) {
                    thread2.join();
                    logger.info(sb.append("[STOP_FetchWorker]").append(thread2).toString());
                    sb.delete(0, sb.length());
                }
            }
        }
        this.pushConsumer.getBaseConsumer().notifyAllMessageListenerStopped();
        Thread.sleep(200L);
        logger.info("[STOP_FetchWorker] All fetch workers are stopped.");
    }

    private boolean waitAllFetchRequestHolds(long j) {
        boolean z;
        long currentTimeMillis = System.currentTimeMillis();
        do {
            z = !this.fetchWorkerStatusMap.isEmpty();
            if (z) {
                try {
                    Thread.sleep(150L);
                } catch (InterruptedException e) {
                }
            }
            if (!z) {
                break;
            }
        } while (System.currentTimeMillis() - currentTimeMillis < j);
        return z;
    }
}
