/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.schedulerx.worker.batch;

import com.alibaba.schedulerx.protocol.Worker;
import com.alibaba.schedulerx.worker.batch.ReqQueue;
import com.alibaba.schedulerx.worker.batch.TaskDispatchReqHandler;
import com.alibaba.schedulerx.worker.log.LogFactory;
import com.alibaba.schedulerx.worker.log.Logger;
import com.alibaba.schedulerx.worker.master.StreamTaskMaster;
import java.util.List;
import java.util.concurrent.Semaphore;

public class StreamTaskPushReqHandler<T>
extends TaskDispatchReqHandler<T> {
    private static final Logger LOGGER = LogFactory.getLogger(StreamTaskPushReqHandler.class);
    private final Semaphore semaphore;
    private Long currentBatchNo;

    public StreamTaskPushReqHandler(long jobInstanceId, int globalConcurrency, int batchSize, ReqQueue<T> queue) {
        super(jobInstanceId, 1, 1, batchSize, queue, "Schedulerx-Batch-Tasks-Dispatch-Thread-", "Schedulerx-Batch-Tasks-Retrieve-Thread-");
        this.semaphore = new Semaphore(globalConcurrency);
    }

    public void release(int permits) {
        this.semaphore.release(permits);
    }

    public void release() {
        this.release(1);
    }

    @Override
    public void process(long jobInstanceId, List<T> reqs, String workerAddr) {
        this.batchProcessSvc.submit(new BatchTasksDispatchRunnable(jobInstanceId, reqs));
    }

    @Override
    protected int getBatchSize() {
        int batchSize = super.getBatchSize();
        return Math.min(batchSize, this.semaphore.availablePermits());
    }

    public synchronized boolean allTasksPushed(Long batchNo) {
        return this.currentBatchNo != null && batchNo < this.currentBatchNo;
    }

    private class BatchTasksDispatchRunnable
    implements Runnable {
        private long jobInstanceId;
        private List<Worker.MasterStartContainerRequest> reqs;

        BatchTasksDispatchRunnable(long jobInstanceId, List<Worker.MasterStartContainerRequest> reqs) {
            this.jobInstanceId = jobInstanceId;
            this.reqs = reqs;
        }

        @Override
        public void run() {
            try {
                long startTime = System.currentTimeMillis();
                StreamTaskPushReqHandler.this.semaphore.acquire(this.reqs.size());
                ((StreamTaskMaster)StreamTaskPushReqHandler.this.taskMasterPool.get(this.jobInstanceId)).batchDispatchTasks(this.reqs);
                LOGGER.info("jobInstance={}, batch dispatch cost:{} ms, dispatchSize:{}, size:{}", this.jobInstanceId, System.currentTimeMillis() - startTime, StreamTaskPushReqHandler.this.dispatchSize, this.reqs.size());
            }
            catch (Throwable e) {
                LOGGER.error(e);
            }
            finally {
                for (Worker.MasterStartContainerRequest req : this.reqs) {
                    if (req.getFailover()) continue;
                    StreamTaskPushReqHandler.this.currentBatchNo = req.getSerialNum();
                }
                StreamTaskPushReqHandler.this.activeRunnableNum.decrementAndGet();
            }
        }
    }
}

