/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.schedulerx.worker.batch;

import com.alibaba.schedulerx.worker.batch.ReqQueue;
import com.alibaba.schedulerx.worker.log.LogFactory;
import com.alibaba.schedulerx.worker.log.Logger;
import com.alibaba.schedulerx.worker.master.TaskMasterPool;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

public abstract class BaseReqHandler<T> {
    private long jobInstanceId;
    private int coreBatchThreadNum;
    private int maxBatchThreadNum;
    private int batchSize;
    private String batchProcessThreadName;
    private String batchRetrieveThreadName;
    private ReqQueue<T> reqsQueue;
    private Thread batchRetrieveThread;
    protected TaskMasterPool taskMasterPool = TaskMasterPool.INSTANCE;
    protected ThreadPoolExecutor batchProcessSvc;
    protected long defaultSleepMs = 500L;
    protected long emptySleepMs = 1000L;
    private volatile T latestRequest;
    private static final Logger LOGGER = LogFactory.getLogger(BaseReqHandler.class);
    protected Long dispatchDelay;
    private volatile AtomicBoolean running = new AtomicBoolean(true);
    protected AtomicInteger activeRunnableNum = new AtomicInteger(0);

    public BaseReqHandler(long jobInstanceId, int coreBatchThreadNum, int maxBatchThreadNum, int batchSize, ReqQueue<T> queue, String batchProcessThreadName, String batchRetrieveThreadName) {
        this.jobInstanceId = jobInstanceId;
        this.coreBatchThreadNum = coreBatchThreadNum;
        this.maxBatchThreadNum = maxBatchThreadNum > coreBatchThreadNum ? maxBatchThreadNum : coreBatchThreadNum;
        this.batchSize = batchSize;
        this.batchProcessThreadName = batchProcessThreadName;
        this.batchRetrieveThreadName = batchRetrieveThreadName;
        this.reqsQueue = queue;
    }

    public void start() {
        this.batchProcessSvc = new ThreadPoolExecutor(this.coreBatchThreadNum, this.maxBatchThreadNum, 30L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(10240), new ThreadFactory(){
            private final AtomicInteger nextId = new AtomicInteger(1);
            private final String namePrefix = BaseReqHandler.access$000(BaseReqHandler.this) + BaseReqHandler.access$100(BaseReqHandler.this) + "-";

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, this.namePrefix + this.nextId.getAndIncrement());
            }
        }, new ThreadPoolExecutor.CallerRunsPolicy());
        this.batchProcessSvc.allowCoreThreadTimeOut(true);
        this.batchRetrieveThread = new Thread(new Runnable(){

            @Override
            public void run() {
                try {
                    while (!Thread.currentThread().isInterrupted()) {
                        List reqs = BaseReqHandler.this.asyncHandleReqs();
                        LOGGER.debug("jobInstanceId={}, batch retrieve reqs, size:{}, remain size:{}, batchSize:{}", BaseReqHandler.this.jobInstanceId, reqs.size(), BaseReqHandler.this.reqsQueue.size(), BaseReqHandler.this.batchSize);
                        if ((float)reqs.size() < (float)BaseReqHandler.this.batchSize * 0.8f) {
                            if (reqs.isEmpty()) {
                                if (!BaseReqHandler.this.running.get()) break;
                                Thread.sleep(BaseReqHandler.this.emptySleepMs);
                            } else {
                                Thread.sleep(BaseReqHandler.this.defaultSleepMs);
                            }
                        }
                        if (BaseReqHandler.this.dispatchDelay == null) continue;
                        Thread.sleep(BaseReqHandler.this.dispatchDelay);
                    }
                }
                catch (InterruptedException reqs) {
                }
                catch (Throwable e) {
                    LOGGER.error(e);
                }
            }
        }, this.batchRetrieveThreadName + this.jobInstanceId);
        this.batchRetrieveThread.start();
    }

    public abstract void process(long var1, List<T> var3, String var4);

    public void process(long jobInstanceId, List<T> reqs) {
        this.process(jobInstanceId, reqs, null);
    }

    public void setWorkThreadNum(int workThreadNum) {
        this.coreBatchThreadNum = workThreadNum;
        this.maxBatchThreadNum = workThreadNum;
    }

    public void setBatchSize(int batchSize) {
        this.batchSize = batchSize;
    }

    public void stop() {
        try {
            this.stop(true);
        }
        catch (InterruptedException e) {
            LOGGER.error("handle stop failed.", e);
        }
    }

    public void stop(boolean immediate) throws InterruptedException {
        if (immediate) {
            if (this.batchRetrieveThread != null) {
                this.batchRetrieveThread.interrupt();
            }
            if (this.batchProcessSvc != null) {
                this.batchProcessSvc.shutdownNow();
            }
            if (this.reqsQueue != null) {
                this.reqsQueue.clear();
            }
        } else {
            this.running.set(false);
            this.batchRetrieveThread.join();
            if (this.batchProcessSvc != null) {
                this.batchProcessSvc.shutdown();
                this.batchProcessSvc.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
            }
        }
    }

    public void clear() {
        if (this.reqsQueue != null) {
            this.reqsQueue.clear();
        }
        this.activeRunnableNum.set(0);
        if (this.batchProcessSvc != null) {
            this.batchProcessSvc.purge();
        }
    }

    public void submitRequest(T request) throws Exception {
        if (!this.running.get()) {
            throw new IllegalStateException("this handler is not running.");
        }
        this.latestRequest = request;
        this.reqsQueue.submitRequest(request);
    }

    public T getLatestRequest() {
        return this.latestRequest;
    }

    protected int getBatchSize() {
        return this.batchSize;
    }

    private synchronized List<T> asyncHandleReqs() {
        List<T> reqs = this.reqsQueue.retrieveRequests(this.getBatchSize());
        if (!reqs.isEmpty()) {
            this.activeRunnableNum.incrementAndGet();
            this.process(this.jobInstanceId, reqs);
        }
        return reqs;
    }

    public synchronized List<T> syncHandleReqs(int pageSize, String workerIdAddr) {
        List<T> reqs = this.reqsQueue.retrieveRequests(pageSize);
        if (!reqs.isEmpty()) {
            this.activeRunnableNum.incrementAndGet();
            this.process(this.jobInstanceId, reqs, workerIdAddr);
            this.activeRunnableNum.decrementAndGet();
        }
        return reqs;
    }

    public synchronized boolean isActive() {
        return this.reqsQueue.size() != 0 || this.activeRunnableNum.get() > 0;
    }

    static /* synthetic */ String access$000(BaseReqHandler x0) {
        return x0.batchProcessThreadName;
    }
}

