/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.schedulerx.worker.pull;

import com.alibaba.schedulerx.common.util.ConfigUtil;
import com.alibaba.schedulerx.protocol.Worker;
import com.alibaba.schedulerx.shade.com.google.common.collect.Maps;
import com.alibaba.schedulerx.shade.com.google.common.collect.Sets;
import com.alibaba.schedulerx.worker.batch.ContainerStatusReqHandler;
import com.alibaba.schedulerx.worker.batch.ContainerStatusReqHandlerPool;
import com.alibaba.schedulerx.worker.batch.ReqQueue;
import com.alibaba.schedulerx.worker.container.ContainerFactory;
import com.alibaba.schedulerx.worker.container.ContainerPool;
import com.alibaba.schedulerx.worker.log.LogFactory;
import com.alibaba.schedulerx.worker.log.Logger;
import com.alibaba.schedulerx.worker.pull.BlockingContainerQueue;
import com.alibaba.schedulerx.worker.pull.ConsumerThread;
import com.alibaba.schedulerx.worker.pull.PullThread;
import com.alibaba.schedulerx.worker.util.WorkerConfigUtil;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;

public enum PullManager {
    INSTANCE;

    private Map<Long, BlockingContainerQueue> queueMap = Maps.newConcurrentMap();
    private Map<Long, PullThread> pullThreadMap = Maps.newConcurrentMap();
    private Map<Long, ConsumerThread[]> consumerThreadMap = Maps.newConcurrentMap();
    private ContainerStatusReqHandlerPool statusReqBatchHandlerPool = ContainerStatusReqHandlerPool.INSTANCE;
    private Set<Long> crashedInstanceSet = Sets.newConcurrentHashSet();
    private static Logger LOGGER;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void init(long jobInstanceId, long serialNum, int pageSize, int queueSize, int consumerSize, String taskMasterAkkaPath) throws Exception {
        AtomicLong jobInstanceLock;
        ContainerPool containerPool = ContainerFactory.getContainerPool();
        AtomicLong atomicLong = jobInstanceLock = containerPool.getInstanceLock(jobInstanceId, serialNum);
        synchronized (atomicLong) {
            if (!this.queueMap.containsKey(jobInstanceId)) {
                long statusReqBatchHandlerKey;
                BlockingContainerQueue queue = new BlockingContainerQueue(queueSize);
                this.queueMap.put(jobInstanceId, queue);
                PullThread pullThread = new PullThread(jobInstanceId, serialNum, pageSize, taskMasterAkkaPath, queue);
                pullThread.start();
                this.pullThreadMap.put(jobInstanceId, pullThread);
                boolean enableShareContainerPool = WorkerConfigUtil.isEnableShareContainerPool();
                long l = statusReqBatchHandlerKey = enableShareContainerPool ? 0L : jobInstanceId;
                if (!this.statusReqBatchHandlerPool.contains(statusReqBatchHandlerKey)) {
                    ReqQueue reqQueue = new ReqQueue(statusReqBatchHandlerKey, 100000);
                    reqQueue.init();
                    int batchSize = ConfigUtil.getWorkerConfig().getInt("worker.map.page.size", 1000);
                    this.statusReqBatchHandlerPool.start(statusReqBatchHandlerKey, new ContainerStatusReqHandler<Worker.ContainerReportTaskStatusRequest>(statusReqBatchHandlerKey, 1, 1, batchSize, reqQueue, taskMasterAkkaPath));
                }
                ConsumerThread[] consumers = new ConsumerThread[consumerSize];
                for (int i = 0; i < consumerSize; ++i) {
                    consumers[i] = new ConsumerThread(queue, ContainerFactory.getContainerPool(), taskMasterAkkaPath);
                    new Thread((Runnable)consumers[i], "Schedulerx-ConsumerThread-" + jobInstanceId + "-" + i).start();
                }
                this.consumerThreadMap.put(jobInstanceId, consumers);
            } else {
                PullThread pullThread = this.pullThreadMap.get(jobInstanceId);
                if (pullThread != null) {
                    pullThread.setSerialNum(serialNum);
                }
            }
        }
    }

    public void crash(long jobInstanceId) {
        this.crashedInstanceSet.add(jobInstanceId);
    }

    public void stop(long jobInstanceId) {
        if (this.pullThreadMap.containsKey(jobInstanceId)) {
            this.pullThreadMap.get(jobInstanceId).stopRunning();
            this.pullThreadMap.remove(jobInstanceId);
        }
        if (this.consumerThreadMap.containsKey(jobInstanceId)) {
            ConsumerThread[] consumers;
            for (ConsumerThread consumer : consumers = this.consumerThreadMap.get(jobInstanceId)) {
                consumer.stopRunning();
            }
            this.consumerThreadMap.remove(jobInstanceId);
        }
        if (this.queueMap.containsKey(jobInstanceId)) {
            this.queueMap.get(jobInstanceId).clear();
            this.queueMap.remove(jobInstanceId);
        }
        this.crashedInstanceSet.remove(jobInstanceId);
    }

    public void stopAll() {
        for (Long jobInstanceId : this.pullThreadMap.keySet()) {
            this.stop(jobInstanceId);
        }
    }

    public boolean contains(long jobInstanceId) {
        return this.queueMap.containsKey(jobInstanceId);
    }

    public boolean isCrashed(long jobInstanceId) {
        return this.crashedInstanceSet.contains(jobInstanceId);
    }

    public boolean hasConsumer() {
        return this.consumerThreadMap.size() > 0;
    }

    static {
        LOGGER = LogFactory.getLogger(PullManager.class);
    }
}

