/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.schedulerx.worker.master;

import akka.actor.ActorContext;
import akka.actor.ActorSelection;
import com.alibaba.schedulerx.common.domain.InstanceStatus;
import com.alibaba.schedulerx.common.domain.JobInstanceInfo;
import com.alibaba.schedulerx.common.domain.LimitedQueue;
import com.alibaba.schedulerx.common.domain.MapTaskXAttrs;
import com.alibaba.schedulerx.common.domain.Metrics;
import com.alibaba.schedulerx.common.domain.StreamJobProgress;
import com.alibaba.schedulerx.common.domain.StreamJobProgressDetail;
import com.alibaba.schedulerx.common.domain.TaskDispatchMode;
import com.alibaba.schedulerx.common.domain.TaskProgressCounter;
import com.alibaba.schedulerx.common.domain.TaskStatus;
import com.alibaba.schedulerx.common.domain.TimeType;
import com.alibaba.schedulerx.common.domain.WorkerProgressCounter;
import com.alibaba.schedulerx.common.domain.enums.RouteStrategyEnum;
import com.alibaba.schedulerx.common.util.ConfigUtil;
import com.alibaba.schedulerx.common.util.ExceptionUtil;
import com.alibaba.schedulerx.common.util.HessianUtil;
import com.alibaba.schedulerx.common.util.IdUtil;
import com.alibaba.schedulerx.common.util.JobUtil;
import com.alibaba.schedulerx.common.util.JsonUtil;
import com.alibaba.schedulerx.protocol.Worker;
import com.alibaba.schedulerx.protocol.utils.FutureUtils;
import com.alibaba.schedulerx.shade.com.google.common.collect.Lists;
import com.alibaba.schedulerx.shade.com.google.common.collect.Maps;
import com.alibaba.schedulerx.shade.com.google.protobuf.ByteString;
import com.alibaba.schedulerx.shade.org.apache.commons.collections.CollectionUtils;
import com.alibaba.schedulerx.shade.org.apache.commons.collections.MapUtils;
import com.alibaba.schedulerx.shade.org.apache.commons.lang.StringUtils;
import com.alibaba.schedulerx.worker.SchedulerxWorker;
import com.alibaba.schedulerx.worker.batch.ReqQueue;
import com.alibaba.schedulerx.worker.batch.StreamTaskPushReqHandler;
import com.alibaba.schedulerx.worker.batch.TMStatusReqHandler;
import com.alibaba.schedulerx.worker.domain.JavaProcessorProfile;
import com.alibaba.schedulerx.worker.domain.JobContext;
import com.alibaba.schedulerx.worker.domain.TaskInfo;
import com.alibaba.schedulerx.worker.log.LogFactory;
import com.alibaba.schedulerx.worker.log.Logger;
import com.alibaba.schedulerx.worker.logcollector.LogCollector;
import com.alibaba.schedulerx.worker.logcollector.LogCollectorFactory;
import com.alibaba.schedulerx.worker.master.TaskMaster;
import com.alibaba.schedulerx.worker.master.persistence.H2FilePersistence;
import com.alibaba.schedulerx.worker.master.persistence.TaskPersistence;
import com.alibaba.schedulerx.worker.metrics.WorkerLoadRegister;
import com.alibaba.schedulerx.worker.processor.JobProcessor;
import com.alibaba.schedulerx.worker.processor.ProcessResult;
import com.alibaba.schedulerx.worker.processor.StreamJobProcessor;
import com.alibaba.schedulerx.worker.route.Router;
import com.alibaba.schedulerx.worker.route.RouterFactory;
import com.alibaba.schedulerx.worker.util.ActorPathUtil;
import com.alibaba.schedulerx.worker.util.ContanerUtil;
import com.alibaba.schedulerx.worker.util.JobProcessorUtil;
import com.alibaba.schedulerx.worker.util.WorkerConfigUtil;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.joda.time.DateTime;

public class StreamTaskMaster
extends TaskMaster {
    private static final Logger LOGGER = LogFactory.getLogger(StreamTaskMaster.class);
    private volatile int index = 0;
    protected ReqQueue<Worker.MasterStartContainerRequest> taskBlockingQueue;
    protected StreamTaskPushReqHandler<Worker.MasterStartContainerRequest> taskDispatchReqHandler;
    protected Thread streamProduceThread;
    private LogCollector logCollector = LogCollectorFactory.get();
    private LimitedQueue<StreamJobProgressDetail> streamJobProgressHistory = new LimitedQueue(10);
    private Map<Long, StreamJobProgressDetail> streamJobProgressMap = Maps.newConcurrentMap();
    protected Router router;
    protected ReqQueue<Worker.ContainerReportTaskStatusRequest> taskStatusReqQueue;
    protected TMStatusReqHandler<Worker.ContainerReportTaskStatusRequest> taskStatusReqBatchHandler;
    protected TaskPersistence taskPersistence;
    protected MapTaskXAttrs xAttrs = null;
    protected StreamJobProcessor streamJobProcessor;
    private Map<Long, Map<Long, String>> taskResultMap = Maps.newHashMap();
    private Map<Long, Map<Long, TaskStatus>> taskStatusMap = Maps.newHashMap();
    private TaskProgressCounter totalCounter = new TaskProgressCounter("TotalCounter");

    public StreamTaskMaster(JobInstanceInfo jobInstanceInfo, ActorContext actorContext) throws Exception {
        super(jobInstanceInfo, actorContext);
        this.taskPersistence = H2FilePersistence.getInstance();
        this.taskPersistence.initTable();
        this.streamJobProcessor = (StreamJobProcessor)JobProcessorUtil.getJavaProcessor(jobInstanceInfo.getContent());
        if (jobInstanceInfo.getXattrs() != null) {
            this.xAttrs = JsonUtil.fromJson(jobInstanceInfo.getXattrs(), MapTaskXAttrs.class);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void initTaskProgress(Long batchNo, String taskName, int delta) {
        if (!this.streamJobProgressMap.containsKey(batchNo)) {
            StreamTaskMaster streamTaskMaster = this;
            synchronized (streamTaskMaster) {
                if (!this.streamJobProgressMap.containsKey(batchNo)) {
                    TaskProgressCounter taskProgressCounter = new TaskProgressCounter(taskName);
                    taskProgressCounter.incrementTotal(delta);
                    StreamJobProgressDetail streamJobProgressDetail = new StreamJobProgressDetail(batchNo, DateTime.now().getMillis(), taskProgressCounter);
                    this.streamJobProgressMap.put(batchNo, streamJobProgressDetail);
                }
            }
        } else {
            TaskProgressCounter taskProgressCounter = this.streamJobProgressMap.get(batchNo).getTaskProgressCounter();
            taskProgressCounter.incrementTotal(delta);
        }
        if (!"MAP_TASK_ROOT".equals(taskName)) {
            this.totalCounter.incrementTotal(delta);
        }
    }

    @Override
    public void batchUpdateTaskStatus(Worker.ContainerBatchReportTaskStatuesRequest request) throws Exception {
        String workerIdAddr = request.getWorkerId() + "@" + request.getWorkerAddr();
        this.setWorkerLoad(workerIdAddr, request.getMetricsJson(), null);
        super.batchUpdateTaskStatus(request);
    }

    @Override
    public void updateTaskStatus(Worker.ContainerReportTaskStatusRequest request) {
        try {
            this.taskStatusReqQueue.submitRequest(request);
        }
        catch (Throwable e) {
            LOGGER.error("", e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void batchUpdateTaskStatues(List<Worker.ContainerReportTaskStatusRequest> requests) {
        HashMap<Long, Worker.ContainerReportTaskStatusRequest> finalTaskStatus = Maps.newHashMap();
        for (Worker.ContainerReportTaskStatusRequest request : requests) {
            try {
                TaskStatus taskStatus = TaskStatus.parseValue(request.getStatus());
                if (!finalTaskStatus.containsKey(request.getTaskId()) || taskStatus.isFinish()) {
                    finalTaskStatus.put(request.getTaskId(), request);
                }
                String workerAddr = request.getWorkerAddr();
                LOGGER.debug("report task status:{} from worker:{}, uniqueId:{}", taskStatus.getDescription(), workerAddr, IdUtil.getUniqueId(request.getJobId(), request.getJobInstanceId(), request.getTaskId()));
                StreamJobProgressDetail streamJobProgressDetail = this.streamJobProgressMap.get(request.getSerialNum());
                TaskProgressCounter taskProgressCounter = streamJobProgressDetail.getTaskProgressCounter();
                streamJobProgressDetail.setStatus(TaskStatus.RUNNING.getValue());
                Map<String, WorkerProgressCounter> workerProgressMap = streamJobProgressDetail.getWorkerProgressMap();
                if (taskStatus.equals(TaskStatus.RUNNING)) {
                    taskProgressCounter.incrementRunning();
                    this.totalCounter.incrementRunning();
                    if (workerAddr != null) {
                        workerProgressMap.get(workerAddr).incrementRunning();
                    }
                } else if (taskStatus.equals(TaskStatus.SUCCESS)) {
                    taskProgressCounter.incrementSuccess();
                    this.totalCounter.incrementSuccess();
                    this.taskDispatchReqHandler.release();
                    if (workerAddr != null) {
                        workerProgressMap.get(workerAddr).incrementSuccess();
                    }
                } else if (taskStatus.equals(TaskStatus.FAILED)) {
                    taskProgressCounter.incrementFailed();
                    this.totalCounter.incrementFailed();
                    this.taskDispatchReqHandler.release();
                    if (workerAddr != null) {
                        workerProgressMap.get(workerAddr).incrementFailed();
                        if (StringUtils.isNotBlank(request.getTraceId())) {
                            workerProgressMap.get(workerAddr).setTraceId(request.getTraceId());
                        }
                    }
                }
                if (TaskStatus.FAILED.equals(taskStatus)) {
                    String uniqueIdWithoutTask = IdUtil.getUniqueIdWithoutTask(this.jobInstanceInfo.getJobId(), this.jobInstanceInfo.getJobInstanceId());
                    LOGGER.info("jobInstanceId={}, taskId={}, report status failed. result:{}", this.jobInstanceInfo.getJobInstanceId(), request.getTaskId(), request.getResult());
                    this.logCollector.collect(this.jobInstanceInfo.getAppGroupId(), uniqueIdWithoutTask, "job processor exec fail:" + request.getTaskId() + ", " + request.getResult());
                }
                if (!this.streamJobProcessor.needReduce() || !taskStatus.isFinish()) continue;
                Map<Long, String> batchTaskResultMap = this.taskResultMap.get(request.getSerialNum());
                if (batchTaskResultMap == null) {
                    Map<Long, Map<Long, String>> map2 = this.taskResultMap;
                    synchronized (map2) {
                        batchTaskResultMap = this.taskResultMap.get(request.getSerialNum());
                        if (batchTaskResultMap == null) {
                            batchTaskResultMap = new HashMap<Long, String>();
                            this.taskResultMap.put(request.getSerialNum(), batchTaskResultMap);
                        }
                    }
                }
                batchTaskResultMap.put(request.getTaskId(), request.getResult());
                Map<Long, TaskStatus> batchTaskStatusMap = this.taskStatusMap.get(request.getSerialNum());
                if (batchTaskStatusMap == null) {
                    Map<Long, Map<Long, TaskStatus>> map3 = this.taskStatusMap;
                    synchronized (map3) {
                        batchTaskStatusMap = this.taskStatusMap.get(request.getSerialNum());
                        if (batchTaskStatusMap == null) {
                            batchTaskStatusMap = new HashMap<Long, TaskStatus>();
                            this.taskStatusMap.put(request.getSerialNum(), batchTaskStatusMap);
                        }
                    }
                }
                batchTaskStatusMap.put(request.getTaskId(), taskStatus);
            }
            catch (Throwable e) {
                LOGGER.error("jobInstanceId={}, batchNo={}, taskId={}, update progressMap error.", this.jobInstanceInfo.getJobInstanceId(), request.getSerialNum(), request.getTaskId(), e);
                this.updateNewInstanceStatus(this.getSerialNum(), InstanceStatus.FAILED, "update progressMap error." + e.getMessage());
            }
        }
        try {
            long startTime = System.currentTimeMillis();
            boolean updateSuccess = false;
            for (int i = 0; i < 3; ++i) {
                try {
                    this.taskPersistence.updateTaskStatues(Lists.newArrayList(finalTaskStatus.values()));
                    updateSuccess = true;
                    break;
                }
                catch (Throwable t) {
                    LOGGER.error("jobInstanceId={}, persistent batch updateTaskStatus error.", t);
                    continue;
                }
            }
            if (!updateSuccess) {
                this.updateNewInstanceStatus(this.getSerialNum(), InstanceStatus.FAILED, "persistent batch update TaskStatus error up to 3 times");
            }
            LOGGER.debug("{} batch update status db cost:{}", this.jobInstanceInfo.getJobInstanceId(), System.currentTimeMillis() - startTime);
        }
        catch (Throwable e) {
            LOGGER.error("jobInstanceId={}, batch updateTaskStatus error.", this.jobInstanceInfo.getJobInstanceId(), e);
        }
    }

    @Override
    protected void init() {
        if (this.INITED) {
            return;
        }
        this.INITED = true;
        int queueSize = this.xAttrs.getQueueSize();
        this.taskBlockingQueue = new ReqQueue(this.jobInstanceInfo.getJobInstanceId(), queueSize);
        this.taskBlockingQueue.init();
        this.taskStatusReqQueue = new ReqQueue(this.jobInstanceInfo.getJobInstanceId(), 100000);
        this.taskStatusReqQueue.init();
        this.taskStatusReqBatchHandler = new TMStatusReqHandler<Worker.ContainerReportTaskStatusRequest>(this.jobInstanceInfo.getJobInstanceId(), 1, 1, 3000, this.taskStatusReqQueue);
        int globalConsumerSize = this.xAttrs.getGlobalConsumerSize();
        this.taskDispatchReqHandler = new StreamTaskPushReqHandler<Worker.MasterStartContainerRequest>(this.jobInstanceInfo.getJobInstanceId(), globalConsumerSize, this.jobInstanceInfo.getAllWorkers().size(), this.taskBlockingQueue);
        boolean enableShareContainerPool = WorkerConfigUtil.isEnableShareContainerPool();
        this.router = enableShareContainerPool ? RouterFactory.getRouter(this.jobInstanceInfo.getAppGroupId(), this.jobInstanceInfo.getJobId(), this.xAttrs.getRouteType(), this.jobInstanceInfo.getRouteStrategyContent()) : RouterFactory.getRouter(this.jobInstanceInfo.getAppGroupId(), this.jobInstanceInfo.getJobId(), RouteStrategyEnum.ROUND_ROBIN.getValue(), this.jobInstanceInfo.getRouteStrategyContent());
        if (this.router != null && this.router instanceof WorkerLoadRegister) {
            ((WorkerLoadRegister)((Object)this.router)).clear();
        }
        final String jobIdAndInstanceId = this.jobInstanceInfo.getJobId() + "_" + this.jobInstanceInfo.getJobInstanceId();
        new Thread(new Runnable(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                while (!StreamTaskMaster.this.instanceStatus.isFinish()) {
                    try {
                        for (String workerIdAddr : StreamTaskMaster.this.aliveCheckWorkerSet) {
                            try {
                                Worker.MasterCheckWorkerAliveRequest request;
                                String workerAddr = workerIdAddr.split("@")[1];
                                String[] tokens = workerAddr.split(":");
                                String host = tokens[0];
                                int port = Integer.valueOf(tokens[1]);
                                int times = 0;
                                while (times < 3) {
                                    Socket socket = new Socket();
                                    try {
                                        socket.connect(new InetSocketAddress(host, port), 5000);
                                        LOGGER.info("socket to {}:{} is reachable, times={}", host, port, times);
                                        break;
                                    }
                                    catch (Exception e) {
                                        LOGGER.info("socket to {}:{} is not reachable, times={}", host, port, times);
                                        Thread.sleep(5000L);
                                        ++times;
                                    }
                                    finally {
                                        if (socket == null) continue;
                                        socket.close();
                                    }
                                }
                                if (times >= 3) {
                                    LOGGER.warn("worker[{}] is down, start to remove this worker and failover tasks, jobInstanceId={}", workerIdAddr, StreamTaskMaster.this.jobInstanceInfo.getJobInstanceId());
                                    StreamTaskMaster.this.handleWorkerShutdown(workerIdAddr, true);
                                    continue;
                                }
                                long startTime = System.currentTimeMillis();
                                ActorSelection selection = StreamTaskMaster.this.getActorContext().actorSelection(ActorPathUtil.getWorkerHeartbeatRouterPath(workerIdAddr));
                                Worker.MasterCheckWorkerAliveResponse response = (Worker.MasterCheckWorkerAliveResponse)FutureUtils.awaitResult(selection, (Object)(request = Worker.MasterCheckWorkerAliveRequest.newBuilder().setJobInstanceId(StreamTaskMaster.this.jobInstanceInfo.getJobInstanceId()).setDispatchMode(StreamTaskMaster.this.xAttrs.getTaskDispatchMode()).build()), 10L);
                                if (!response.getSuccess()) {
                                    LOGGER.warn("jobInstanceId={} of worker={} is not alive", StreamTaskMaster.this.jobInstanceInfo.getJobInstanceId(), workerIdAddr, response.getMessage());
                                    StreamTaskMaster.this.handleWorkerShutdown(workerIdAddr, true);
                                    Worker.MasterDestroyContainerPoolRequest destroyContainerPoolRequest = Worker.MasterDestroyContainerPoolRequest.newBuilder().setJobInstanceId(StreamTaskMaster.this.jobInstanceInfo.getJobInstanceId()).setJobId(StreamTaskMaster.this.jobInstanceInfo.getJobId()).setWorkerIdAddr(workerIdAddr).setSerialNum(StreamTaskMaster.this.getSerialNum()).build();
                                    SchedulerxWorker.AtLeastDeliveryRoutingActor.tell(destroyContainerPoolRequest, null);
                                    continue;
                                }
                                StreamTaskMaster.this.setWorkerLoad(workerIdAddr, response.getMetricsJson(), System.currentTimeMillis() - startTime);
                            }
                            catch (Exception e) {
                                LOGGER.error("Alive worker check failed.", e);
                                StreamTaskMaster.this.handleWorkerShutdown(workerIdAddr, true);
                                Worker.MasterDestroyContainerPoolRequest destroyContainerPoolRequest = Worker.MasterDestroyContainerPoolRequest.newBuilder().setJobInstanceId(StreamTaskMaster.this.jobInstanceInfo.getJobInstanceId()).setJobId(StreamTaskMaster.this.jobInstanceInfo.getJobId()).setWorkerIdAddr(workerIdAddr).setSerialNum(StreamTaskMaster.this.getSerialNum()).build();
                                SchedulerxWorker.AtLeastDeliveryRoutingActor.tell(destroyContainerPoolRequest, null);
                            }
                        }
                        Thread.sleep(10000L);
                    }
                    catch (Throwable e) {
                        LOGGER.error("check worker error, jobInstanceId={}", StreamTaskMaster.this.jobInstanceInfo.getJobInstanceId(), e);
                    }
                }
            }
        }, "Schedulerx-StreamTaskMaster-check-worker-alive-thread-" + jobIdAndInstanceId).start();
        new Thread(new Runnable(){

            @Override
            public void run() {
                int pageSize = ConfigUtil.getWorkerConfig().getInt("map.master.page.size", 100);
                while (!StreamTaskMaster.this.instanceStatus.isFinish()) {
                    try {
                        long startTime = System.currentTimeMillis();
                        List<TaskInfo> taskInfos = StreamTaskMaster.this.taskPersistence.pull(StreamTaskMaster.this.jobInstanceInfo.getJobInstanceId(), pageSize);
                        LOGGER.debug("jobInstanceId={}, pull cost={}ms", StreamTaskMaster.this.jobInstanceInfo.getJobInstanceId(), System.currentTimeMillis() - startTime);
                        if (taskInfos.isEmpty()) {
                            LOGGER.debug("pull task empty of jobInstanceId={}, sleep 10000 ms ...", StreamTaskMaster.this.jobInstanceInfo.getJobInstanceId());
                            Thread.sleep(10000L);
                            continue;
                        }
                        LOGGER.info("jobInstanceId={}, failover retry dispatch taskList, size:{} , cost={}ms", StreamTaskMaster.this.jobInstanceInfo.getJobInstanceId(), taskInfos.size(), System.currentTimeMillis() - startTime);
                        for (TaskInfo taskInfo : taskInfos) {
                            ByteString taskBody = null;
                            if (taskInfo.getTaskBody() != null) {
                                taskBody = ByteString.copyFrom(taskInfo.getTaskBody());
                            }
                            Worker.MasterStartContainerRequest.Builder builder = StreamTaskMaster.this.convert2StartContainerRequestBuilder(StreamTaskMaster.this.jobInstanceInfo, taskInfo.getTaskId(), taskInfo.getTaskName(), taskBody, true);
                            builder.setSerialNum(taskInfo.getBatchNo());
                            StreamTaskMaster.this.taskBlockingQueue.submitRequest(builder.build());
                        }
                    }
                    catch (TimeoutException te) {
                        LOGGER.error("pull task timeout, uniqueId:{}", jobIdAndInstanceId, te);
                        StreamTaskMaster.this.logCollector.collect(StreamTaskMaster.this.jobInstanceInfo.getAppGroupId(), jobIdAndInstanceId, "map task pull fail:", te);
                        try {
                            Thread.sleep(10000L);
                        }
                        catch (InterruptedException interruptedException) {}
                    }
                    catch (Throwable e) {
                        StreamTaskMaster.this.updateNewInstanceStatus(StreamTaskMaster.this.getSerialNum(), InstanceStatus.FAILED, ExceptionUtil.getMessage(e));
                        StreamTaskMaster.this.logCollector.collect(StreamTaskMaster.this.jobInstanceInfo.getAppGroupId(), jobIdAndInstanceId, "map task pull fail:", e);
                        LOGGER.error("pull task error, uniqueId:{}", jobIdAndInstanceId, e);
                    }
                }
            }
        }, "Schedulerx-StreamTaskMaster-pull-thread-" + jobIdAndInstanceId).start();
    }

    @Override
    public void submitInstance(final JobInstanceInfo jobInstanceInfo) throws Exception {
        try {
            this.init();
            this.taskDispatchReqHandler.start();
            this.taskStatusReqBatchHandler.start();
            this.createProduceTask();
            final String jobIdAndInstanceId = jobInstanceInfo.getJobId() + "_" + jobInstanceInfo.getJobInstanceId();
            new Thread(new Runnable(){

                @Override
                public void run() {
                    int checkInterval = ConfigUtil.getWorkerConfig().getInt("map.master.status.check.interval", 3000);
                    while (!StreamTaskMaster.this.instanceStatus.isFinish()) {
                        try {
                            Thread.sleep(checkInterval);
                            if (MapUtils.isNotEmpty(StreamTaskMaster.this.streamJobProgressMap)) {
                                for (Map.Entry entry : StreamTaskMaster.this.streamJobProgressMap.entrySet()) {
                                    long failedCount;
                                    InstanceStatus newStatus;
                                    boolean allTasksPushed = StreamTaskMaster.this.taskDispatchReqHandler.allTasksPushed((Long)entry.getKey());
                                    StreamJobProgressDetail streamJobProgressDetail = (StreamJobProgressDetail)entry.getValue();
                                    TaskProgressCounter taskProgressCounter = streamJobProgressDetail.getTaskProgressCounter();
                                    if (!allTasksPushed && taskProgressCounter.getTotal() > taskProgressCounter.getFailed() + taskProgressCounter.getSuccess() || !(newStatus = StreamTaskMaster.this.taskPersistence.checkInstanceStatus(jobInstanceInfo.getJobInstanceId(), (Long)entry.getKey())).isFinish()) continue;
                                    ProcessResult processResult = new ProcessResult(true);
                                    if ((Long)entry.getKey() > 0L && StreamTaskMaster.this.streamJobProcessor.needReduce()) {
                                        try {
                                            JobContext context = JobContext.newBuilder().setJobId(jobInstanceInfo.getJobId()).setJobInstanceId(jobInstanceInfo.getJobInstanceId()).setJobType(jobInstanceInfo.getJobType()).setContent(jobInstanceInfo.getContent()).setScheduleTime(jobInstanceInfo.getScheduleTime()).setDataTime(jobInstanceInfo.getDataTime()).setJobParameters(jobInstanceInfo.getParameters()).setInstanceParameters(jobInstanceInfo.getInstanceParameters()).setUser(jobInstanceInfo.getUser()).setTaskResults((Map)StreamTaskMaster.this.taskResultMap.get(entry.getKey())).setTaskStatuses((Map)StreamTaskMaster.this.taskStatusMap.get(entry.getKey())).setSerialNum((Long)entry.getKey()).build();
                                            processResult = StreamTaskMaster.this.streamJobProcessor.reduce(context);
                                            if (processResult == null) {
                                                processResult = new ProcessResult(false, "Reduce can not return NULL.");
                                            }
                                        }
                                        catch (Throwable t) {
                                            LOGGER.error("Stream job jobId={} jobInstanceId={} batchNo={} reduce exception.", jobInstanceInfo.getJobId(), jobInstanceInfo.getJobInstanceId(), entry.getKey(), t);
                                            processResult = new ProcessResult(false, t.getMessage());
                                        }
                                    }
                                    if (InstanceStatus.FAILED.equals(processResult.getStatus())) {
                                        LOGGER.error("Stream job jobId={} jobInstanceId={} batchNo={} reduce failed. Result:{}", jobInstanceInfo.getJobId(), jobInstanceInfo.getJobInstanceId(), entry.getKey(), processResult.getResult());
                                    }
                                    if ((failedCount = (long)streamJobProgressDetail.getTaskProgressCounter().getFailed()) > 0L || InstanceStatus.FAILED.equals(processResult.getStatus())) {
                                        streamJobProgressDetail.setStatus(InstanceStatus.FAILED.getValue());
                                    } else {
                                        streamJobProgressDetail.setStatus(InstanceStatus.SUCCESS.getValue());
                                    }
                                    streamJobProgressDetail.setEndTime(DateTime.now().getMillis());
                                    StreamTaskMaster.this.streamJobProgressHistory.add(streamJobProgressDetail);
                                    StreamTaskMaster.this.streamJobProgressMap.remove(entry.getKey());
                                    StreamTaskMaster.this.taskStatusMap.remove(entry.getKey());
                                    StreamTaskMaster.this.taskResultMap.remove(entry.getKey());
                                }
                                continue;
                            }
                            if (StreamTaskMaster.this.streamProduceThread.isAlive()) continue;
                            String result2 = SchedulerxWorker.INITED ? "Produce task is stopped." : "Worker master shutdown.";
                            StreamTaskMaster.this.updateNewInstanceStatus(StreamTaskMaster.this.getSerialNum(), jobInstanceInfo.getJobInstanceId(), InstanceStatus.FAILED, result2);
                        }
                        catch (Throwable e) {
                            LOGGER.error("status check error, uniqueId:{}", jobIdAndInstanceId, e);
                        }
                    }
                }
            }, "Schedulerx-StreamTaskMaster-status-check-thread-" + jobIdAndInstanceId).start();
            if (!JobUtil.isSecondTypeJob(TimeType.parseValue(jobInstanceInfo.getTimeType()))) {
                new Thread(new Runnable(){

                    @Override
                    public void run() {
                        while (!StreamTaskMaster.this.instanceStatus.isFinish()) {
                            Worker.WorkerReportJobInstanceProgressRequest request = Worker.WorkerReportJobInstanceProgressRequest.newBuilder().setJobId(jobInstanceInfo.getJobId()).setJobInstanceId(jobInstanceInfo.getJobInstanceId()).setProgress(StreamTaskMaster.this.getJobInstanceProgress()).build();
                            StreamTaskMaster.this.SERVER_DISCOVERY.getMapMasterRouter().tell(request, null);
                            try {
                                Thread.sleep(5000L);
                            }
                            catch (InterruptedException e) {
                                LOGGER.error("report status error, uniqueId={}", jobIdAndInstanceId, e);
                                break;
                            }
                        }
                    }
                }, "Schedulerx-StreamTaskMaster-report-progress-thread-" + jobIdAndInstanceId).start();
            }
        }
        catch (Throwable t) {
            String jobIdAndInstanceId = jobInstanceInfo.getJobId() + "_" + jobInstanceInfo.getJobInstanceId();
            LOGGER.error("submit instance failed.", t);
            this.updateNewInstanceStatus(this.getSerialNum(), InstanceStatus.FAILED, ExceptionUtil.getMessage(t));
            this.logCollector.collect(jobInstanceInfo.getAppGroupId(), jobIdAndInstanceId, "instance init fail:", t);
        }
    }

    protected void createProduceTask() throws Exception {
        this.initTaskProgress(this.getSerialNum(), "MAP_TASK_ROOT", 1);
        final Worker.MasterStartContainerRequest startContainerRequest = this.convert2StartContainerRequest(this.jobInstanceInfo, this.aquireTaskId(), "MAP_TASK_ROOT", null);
        String workerIdAddr = this.getLocalWorkerIdAddr();
        final String workerId = workerIdAddr.split("@")[0];
        final String workerAddr = workerIdAddr.split("@")[1];
        this.batchHandlePersistence(workerId, workerAddr, Lists.newArrayList(startContainerRequest), false);
        this.batchHandlePulledProgress(Lists.newArrayList(startContainerRequest), Maps.newHashMap(), Maps.newHashMap(), workerIdAddr);
        this.streamProduceThread = new Thread(new Runnable(){

            @Override
            public void run() {
                try {
                    ((StreamJobProgressDetail)StreamTaskMaster.this.streamJobProgressMap.get(StreamTaskMaster.this.getSerialNum())).setStatus(TaskStatus.RUNNING.getValue());
                    ((StreamJobProgressDetail)StreamTaskMaster.this.streamJobProgressMap.get(StreamTaskMaster.this.getSerialNum())).getTaskProgressCounter().incrementRunning();
                    ((StreamJobProgressDetail)StreamTaskMaster.this.streamJobProgressMap.get(StreamTaskMaster.this.getSerialNum())).getWorkerProgressMap().get(workerAddr).incrementRunning();
                    JobContext context = ContanerUtil.convert2JobContext(startContainerRequest);
                    long produceInterval = StreamTaskMaster.this.xAttrs.getProduceInterval();
                    while (!StreamTaskMaster.this.instanceStatus.isFinish() && SchedulerxWorker.INITED) {
                        if (StreamTaskMaster.this.streamJobProgressMap.size() < 15) {
                            ((StreamJobProgressDetail)StreamTaskMaster.this.streamJobProgressMap.get(0L)).setStartTime(DateTime.now().getMillis());
                            context.setSerialNum(StreamTaskMaster.this.aquireSerialNum());
                            List<Object> tasks = StreamTaskMaster.this.streamJobProcessor.produce(context);
                            if (!CollectionUtils.isEmpty(tasks)) {
                                StreamTaskMaster.this.initTaskProgress(StreamTaskMaster.this.getSerialNum(), "SubTask", tasks.size());
                                for (Object task : tasks) {
                                    byte[] taskBody = HessianUtil.toBytes(task);
                                    Worker.MasterStartContainerRequest taskContainerRequest = StreamTaskMaster.this.convert2StartContainerRequest(StreamTaskMaster.this.jobInstanceInfo, StreamTaskMaster.this.aquireTaskId(), "SubTask", ByteString.copyFrom(taskBody));
                                    StreamTaskMaster.this.taskBlockingQueue.submitRequest(taskContainerRequest);
                                }
                            }
                        }
                        int plus2 = StreamTaskMaster.this.streamJobProgressMap.size() / 5 + 1;
                        TimeUnit.SECONDS.sleep((long)plus2 * produceInterval);
                    }
                    Worker.ContainerReportTaskStatusRequest rootTaskStatusRequest = Worker.ContainerReportTaskStatusRequest.newBuilder().setJobId(StreamTaskMaster.this.jobInstanceInfo.getJobId()).setJobInstanceId(StreamTaskMaster.this.jobInstanceInfo.getJobInstanceId()).setTaskId(startContainerRequest.getTaskId()).setStatus(TaskStatus.SUCCESS.getValue()).setWorkerId(workerId).setTaskName(startContainerRequest.getTaskName()).setWorkerAddr(workerAddr).setSerialNum(startContainerRequest.getSerialNum()).build();
                    StreamTaskMaster.this.updateTaskStatus(rootTaskStatusRequest);
                }
                catch (Throwable e) {
                    LOGGER.error("stream job produce running failed.", e);
                    String workerIdAddr = StreamTaskMaster.this.getLocalWorkerIdAddr();
                    String workerId2 = workerIdAddr.split("@")[0];
                    String workerAddr2 = workerIdAddr.split("@")[1];
                    if (startContainerRequest != null) {
                        Worker.ContainerReportTaskStatusRequest faileReq = Worker.ContainerReportTaskStatusRequest.newBuilder().setJobId(StreamTaskMaster.this.jobInstanceInfo.getJobId()).setJobInstanceId(StreamTaskMaster.this.jobInstanceInfo.getJobInstanceId()).setTaskId(startContainerRequest.getTaskId()).setStatus(TaskStatus.FAILED.getValue()).setWorkerId(workerId2).setTaskName(startContainerRequest.getTaskName()).setWorkerAddr(workerAddr2).setSerialNum(startContainerRequest.getSerialNum()).build();
                        StreamTaskMaster.this.updateTaskStatus(faileReq);
                    }
                    String jobIdAndInstanceId = StreamTaskMaster.this.jobInstanceInfo.getJobId() + "_" + StreamTaskMaster.this.jobInstanceInfo.getJobInstanceId();
                    StreamTaskMaster.this.updateNewInstanceStatus(StreamTaskMaster.this.getSerialNum(), InstanceStatus.FAILED, ExceptionUtil.getMessage(e));
                    StreamTaskMaster.this.logCollector.collect(StreamTaskMaster.this.jobInstanceInfo.getAppGroupId(), jobIdAndInstanceId, "instance init fail:", e);
                }
            }
        }, "Schedulerx-stream-produce-thread-" + this.jobInstanceInfo.getJobInstanceId());
        this.streamProduceThread.start();
    }

    @Override
    protected void checkProcessor() throws Exception {
        JavaProcessorProfile profile;
        if ("java".equalsIgnoreCase(this.jobInstanceInfo.getJobType()) && !JobProcessorUtil.checkJavaProcessor((profile = JsonUtil.fromJson(this.jobInstanceInfo.getContent(), JavaProcessorProfile.class)).getClassName(), StreamJobProcessor.class).booleanValue()) {
            throw new IOException(profile.getClassName() + " must extends StreamJobProcessor");
        }
    }

    @Override
    public void destroyContainerPool() {
        List<String> allWorkers = this.jobInstanceInfo.getAllWorkers();
        for (String workerIdAddr : allWorkers) {
            Worker.MasterDestroyContainerPoolRequest request = Worker.MasterDestroyContainerPoolRequest.newBuilder().setJobInstanceId(this.jobInstanceInfo.getJobInstanceId()).setSerialNum(this.getSerialNum()).setJobId(this.jobInstanceInfo.getJobId()).setWorkerIdAddr(workerIdAddr).build();
            SchedulerxWorker.AtLeastDeliveryRoutingActor.tell(request, null);
        }
    }

    @Override
    public void clear() {
        super.clear();
        if (this.taskStatusReqQueue != null) {
            this.taskStatusReqQueue.clear();
        }
        if (this.taskBlockingQueue != null) {
            this.taskBlockingQueue.clear();
        }
        if (this.taskDispatchReqHandler != null) {
            this.taskDispatchReqHandler.clear();
        }
        if (this.taskStatusReqBatchHandler != null) {
            this.taskStatusReqBatchHandler.clear();
        }
        this.clearTasks(this.jobInstanceInfo.getJobInstanceId());
    }

    private void clearTasks(long jobInstanceId) {
        try {
            this.taskPersistence.clearTasks(jobInstanceId);
            LOGGER.info("jobInstanceId={} clearTasks success.", jobInstanceId);
        }
        catch (Throwable ex) {
            LOGGER.error("jobInstanceId={} clearTasks error", jobInstanceId, ex);
        }
    }

    @Override
    public void stop() {
        if (this.taskDispatchReqHandler != null) {
            this.taskDispatchReqHandler.stop();
        }
        if (this.taskStatusReqBatchHandler != null) {
            this.taskStatusReqBatchHandler.stop();
        }
        LOGGER.info("jobInstanceId:{}, instance master successfully stop.", this.jobInstanceInfo.getJobInstanceId());
    }

    @Override
    protected void doTerminate() {
        if (this.taskDispatchReqHandler != null) {
            this.taskDispatchReqHandler.stop();
        }
    }

    @Override
    public String getJobInstanceProgress() {
        LinkedHashMap<Long, StreamJobProgressDetail> detailMap = new LinkedHashMap<Long, StreamJobProgressDetail>();
        detailMap.putAll(this.streamJobProgressMap);
        for (StreamJobProgressDetail streamJobProgressDetail : this.streamJobProgressHistory) {
            detailMap.put(streamJobProgressDetail.getBatchNum(), streamJobProgressDetail);
        }
        int queueSize = this.xAttrs.getQueueSize();
        StreamJobProgressDetail produceProcessDetail = (StreamJobProgressDetail)detailMap.get(0L);
        return JsonUtil.toJson(new StreamJobProgress(produceProcessDetail.getStatus(), this.totalCounter, queueSize, this.taskBlockingQueue.size(), detailMap));
    }

    private void setWorkerInvalid(String workerIdAddr) {
        try {
            this.invalidWorkerSet.add(workerIdAddr);
            if (this.router != null && this.router instanceof WorkerLoadRegister) {
                ((WorkerLoadRegister)((Object)this.router)).setAvailableSize(workerIdAddr, 0);
            }
        }
        catch (Exception e) {
            LOGGER.warn("Set worker load failed.", e);
        }
    }

    private void initTaskFailover(List<Worker.MasterStartContainerRequest> reqs, String workerIdAddr) {
        LOGGER.warn("jobInstanceId={}, worker[{}] is down, try another worker, size:{}", this.jobInstanceInfo.getJobInstanceId(), workerIdAddr, reqs.size());
        String workerId = workerIdAddr.split("@")[0];
        String workerAddr = workerIdAddr.split("@")[1];
        ArrayList<Long> taskIds = Lists.newArrayList();
        HashMap<Long, Integer> affectCntMap = new HashMap<Long, Integer>();
        for (Worker.MasterStartContainerRequest req : reqs) {
            taskIds.add(req.getTaskId());
            Integer count2 = (Integer)affectCntMap.get(req.getSerialNum());
            if (count2 == null) {
                affectCntMap.put(req.getSerialNum(), 1);
                continue;
            }
            affectCntMap.put(req.getSerialNum(), count2 + 1);
        }
        try {
            int affectCnt = this.taskPersistence.updateTaskStatus(this.jobInstanceInfo.getJobInstanceId(), taskIds, TaskStatus.INIT, workerId, workerAddr);
            LOGGER.warn("jobInstanceId={}, worker[{}] is down, reset task status, size:{}", this.jobInstanceInfo.getJobInstanceId(), workerIdAddr, affectCnt);
            for (Map.Entry entry : affectCntMap.entrySet()) {
                this.streamJobProgressMap.get(entry.getKey()).getWorkerProgressMap().get(workerAddr).decPulledAndTotal(affectCnt);
            }
        }
        catch (Exception e1) {
            LOGGER.error("jobInstanceId={}, timeout return init error", this.jobInstanceInfo.getJobInstanceId());
            this.updateNewInstanceStatus(this.getSerialNum(), InstanceStatus.FAILED, "timeout dispatch return init error");
        }
    }

    private void processDispatchException(String workerIdAddr, List<Worker.MasterStartContainerRequest> reqs, Throwable e) {
        boolean failover;
        String workerId = workerIdAddr.split("@")[0];
        String workerAddr = workerIdAddr.split("@")[1];
        boolean bl = failover = this.xAttrs != null && this.xAttrs.isFailover();
        if (failover && e instanceof TimeoutException) {
            this.initTaskFailover(reqs, workerIdAddr);
        } else {
            String uniqueIdWithoutTask = IdUtil.getUniqueIdWithoutTask(this.jobInstanceInfo.getJobId(), this.jobInstanceInfo.getJobInstanceId());
            LOGGER.error("jobInstanceId:{}, batch dispatch Tasks error worker={}, size:{}", this.jobInstanceInfo.getJobInstanceId(), workerIdAddr, reqs.size(), e);
            this.logCollector.collect(this.jobInstanceInfo.getAppGroupId(), uniqueIdWithoutTask, "map task dispatch fail:", e);
            for (Worker.MasterStartContainerRequest req : reqs) {
                Worker.ContainerReportTaskStatusRequest faileReq = Worker.ContainerReportTaskStatusRequest.newBuilder().setJobId(this.jobInstanceInfo.getJobId()).setJobInstanceId(this.jobInstanceInfo.getJobInstanceId()).setTaskId(req.getTaskId()).setStatus(TaskStatus.FAILED.getValue()).setResult("Dispatch tasks error. Cause by " + e.getMessage()).setWorkerId(workerId).setTaskName(req.getTaskName()).setWorkerAddr(workerAddr).setTaskName(req.getTaskName()).setSerialNum(req.getSerialNum()).build();
                this.updateTaskStatus(faileReq);
            }
        }
        this.setWorkerInvalid(workerIdAddr);
    }

    private void processDispatchResponse(String workerIdAddr, List<Worker.MasterStartContainerRequest> reqs, Worker.MasterBatchStartContainersResponse response, long startTime) {
        String workerId = workerIdAddr.split("@")[0];
        String workerAddr = workerIdAddr.split("@")[1];
        if (response.getSuccess()) {
            LOGGER.info("jobInstanceId={}, batch start containers successfully, size:{} , worker={}, cost={}ms", this.jobInstanceInfo.getJobInstanceId(), reqs.size(), workerIdAddr, System.currentTimeMillis() - startTime);
            this.aliveCheckWorkerSet.add(workerIdAddr);
            String metricsJson = response.getMetricsJson();
            this.setWorkerLoad(workerIdAddr, metricsJson, System.currentTimeMillis() - startTime);
        } else {
            boolean failover;
            boolean bl = failover = this.xAttrs != null && this.xAttrs.isFailover();
            if (failover && response.getMessage() != null && response.getMessage().contains("WORKER_NOT_RUNNING:")) {
                this.initTaskFailover(reqs, workerIdAddr);
            } else {
                LOGGER.error("jobInstanceId={}, batch start containers failed, worker={}, response={}, size:{}", this.jobInstanceInfo.getJobInstanceId(), workerIdAddr, response.getMessage(), reqs.size());
                for (Worker.MasterStartContainerRequest req : reqs) {
                    Worker.ContainerReportTaskStatusRequest faileStatusRequest = Worker.ContainerReportTaskStatusRequest.newBuilder().setJobId(this.jobInstanceInfo.getJobId()).setJobInstanceId(this.jobInstanceInfo.getJobInstanceId()).setTaskId(req.getTaskId()).setStatus(TaskStatus.FAILED.getValue()).setResult(response.getMessage()).setWorkerId(workerId).setTaskName(req.getTaskName()).setWorkerAddr(workerAddr).setTaskName(req.getTaskName()).setSerialNum(req.getSerialNum()).build();
                    this.updateTaskStatus(faileStatusRequest);
                }
            }
            this.setWorkerInvalid(workerIdAddr);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void setWorkerLoad(String workerIdAddr, String metricsJson, Long cost) {
        block7: {
            try {
                if (this.router == null || !(this.router instanceof WorkerLoadRegister) || !StringUtils.isNotEmpty(metricsJson)) break block7;
                Metrics metrics = JsonUtil.fromJson(metricsJson, Metrics.class);
                if (metrics != null) {
                    LOGGER.info("update worker load, worker={}, sharePoolAvailableSize={}, cost={}", workerIdAddr, metrics.getSharePoolAvailableSize(), cost);
                    ((WorkerLoadRegister)((Object)this.router)).setAvailableSize(workerIdAddr, metrics.getSharePoolAvailableSize());
                    ((WorkerLoadRegister)((Object)this.router)).setRemainCpu(workerIdAddr, (int)((double)metrics.getCpuProcessors() - metrics.getCpuLoad1()));
                    ((WorkerLoadRegister)((Object)this.router)).setRemainMemory(workerIdAddr, (long)(100.0 - metrics.getHeap1Usage() * 100.0));
                    if (cost != null) {
                        ((WorkerLoadRegister)((Object)this.router)).setCost(workerIdAddr, cost);
                    }
                }
                Router router = this.router;
                synchronized (router) {
                    this.router.notifyAll();
                }
            }
            catch (Exception e) {
                LOGGER.warn("Set worker load failed.", e);
            }
        }
    }

    private void batchHandleContainers(String workerIdAddr, List<Worker.MasterStartContainerRequest> reqs, boolean isFailover, TaskDispatchMode dispatchMode) {
        String workerId = workerIdAddr.split("@")[0];
        String workerAddr = workerIdAddr.split("@")[1];
        LOGGER.debug("jobInstanceId={}, batch dispatch, worker:{}, size:{}", this.jobInstanceInfo.getJobInstanceId(), workerIdAddr, reqs.size());
        try {
            this.batchHandlePersistence(workerId, workerAddr, reqs, isFailover);
            long startTime = System.currentTimeMillis();
            ActorSelection selection = this.getActorContext().actorSelection(ActorPathUtil.getContainerRouterPath(workerIdAddr));
            Worker.MasterBatchStartContainersRequest request = Worker.MasterBatchStartContainersRequest.newBuilder().setJobInstanceId(this.jobInstanceInfo.getJobInstanceId()).setJobId(this.jobInstanceInfo.getJobId()).addAllStartReqs(reqs).build();
            try {
                Worker.MasterBatchStartContainersResponse response = (Worker.MasterBatchStartContainersResponse)FutureUtils.awaitResult(selection, (Object)request, 3L);
                this.processDispatchResponse(workerIdAddr, reqs, response, startTime);
            }
            catch (Throwable e) {
                this.processDispatchException(workerIdAddr, reqs, e);
            }
        }
        catch (Throwable exception) {
            String uniqueIdWithoutTask = IdUtil.getUniqueIdWithoutTask(this.jobInstanceInfo.getJobId(), this.jobInstanceInfo.getJobInstanceId());
            LOGGER.error("jobInstanceId:{}, batch dispatch Tasks error worker={}, size:{}", this.jobInstanceInfo.getJobInstanceId(), workerIdAddr, reqs.size(), exception);
            this.logCollector.collect(this.jobInstanceInfo.getAppGroupId(), uniqueIdWithoutTask, "map task dispatch fail:", exception);
            for (Worker.MasterStartContainerRequest req : reqs) {
                Worker.ContainerReportTaskStatusRequest faileReq = Worker.ContainerReportTaskStatusRequest.newBuilder().setJobId(this.jobInstanceInfo.getJobId()).setJobInstanceId(this.jobInstanceInfo.getJobInstanceId()).setTaskId(req.getTaskId()).setStatus(TaskStatus.FAILED.getValue()).setWorkerId(workerId).setTaskName(req.getTaskName()).setWorkerAddr(workerAddr).setTaskName(req.getTaskName()).setSerialNum(req.getSerialNum()).build();
                this.updateTaskStatus(faileReq);
            }
        }
    }

    private void batchHandlePersistence(String workerId, String workerAddr, List<Worker.MasterStartContainerRequest> reqs, boolean isFailover) throws Exception {
        long startTime = System.currentTimeMillis();
        if (!isFailover) {
            this.taskPersistence.createTasks(reqs, workerId, workerAddr);
        } else {
            ArrayList<Long> taskIds = Lists.newArrayList();
            for (Worker.MasterStartContainerRequest req : reqs) {
                taskIds.add(req.getTaskId());
            }
            this.taskPersistence.updateTaskStatus(this.jobInstanceInfo.getJobInstanceId(), taskIds, TaskStatus.RUNNING, workerId, workerAddr);
        }
        LOGGER.debug("jobInstance={}, batch dispatch db cost:{} ms, size:{}", this.jobInstanceInfo.getJobInstanceId(), System.currentTimeMillis() - startTime, reqs.size());
    }

    public void batchDispatchTasks(List<Worker.MasterStartContainerRequest> masterStartContainerRequests) {
        HashMap<String, List<Worker.MasterStartContainerRequest>> worker2ReqsWithNormal = Maps.newHashMap();
        HashMap<String, List<Worker.MasterStartContainerRequest>> worker2ReqsWithFailover = Maps.newHashMap();
        this.batchHandlePulledProgress(masterStartContainerRequests, worker2ReqsWithNormal, worker2ReqsWithFailover, null);
        for (Map.Entry entry : worker2ReqsWithNormal.entrySet()) {
            this.batchHandleContainers((String)entry.getKey(), (List)entry.getValue(), false, TaskDispatchMode.PUSH);
        }
        for (Map.Entry entry : worker2ReqsWithFailover.entrySet()) {
            this.batchHandleContainers((String)entry.getKey(), (List)entry.getValue(), true, TaskDispatchMode.PUSH);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void batchHandlePulledProgress(List<Worker.MasterStartContainerRequest> masterStartContainerRequests, Map<String, List<Worker.MasterStartContainerRequest>> worker2ReqsWithNormal, Map<String, List<Worker.MasterStartContainerRequest>> worker2ReqsWithFailover, String worker) {
        for (Worker.MasterStartContainerRequest request : masterStartContainerRequests) {
            String workerIdAddr;
            String string2 = workerIdAddr = worker == null ? this.selectWorker() : worker;
            if (workerIdAddr == null) {
                this.updateNewInstanceStatus(this.getSerialNum(), InstanceStatus.FAILED, "all worker is down!");
                break;
            }
            String workerAddr = workerIdAddr.split("@")[1];
            StreamJobProgressDetail streamJobProgressDetail = this.streamJobProgressMap.get(request.getSerialNum());
            if (request.getFailover()) {
                if (!worker2ReqsWithFailover.containsKey(workerIdAddr)) {
                    worker2ReqsWithFailover.put(workerIdAddr, Lists.newArrayList(request));
                } else {
                    worker2ReqsWithFailover.get(workerIdAddr).add(request);
                }
            } else {
                if (!worker2ReqsWithNormal.containsKey(workerIdAddr)) {
                    worker2ReqsWithNormal.put(workerIdAddr, Lists.newArrayList(request));
                } else {
                    worker2ReqsWithNormal.get(workerIdAddr).add(request);
                }
                streamJobProgressDetail.getTaskProgressCounter().incrementPulled();
                if (request.getSerialNum() > 0L) {
                    this.totalCounter.incrementPulled();
                }
            }
            streamJobProgressDetail.setStatus(TaskStatus.PULLED.getValue());
            Map<String, WorkerProgressCounter> workerProgressMap = streamJobProgressDetail.getWorkerProgressMap();
            if (workerAddr != null && !workerProgressMap.containsKey(workerAddr)) {
                StreamTaskMaster streamTaskMaster = this;
                synchronized (streamTaskMaster) {
                    if (!workerProgressMap.containsKey(workerAddr)) {
                        WorkerProgressCounter workerProgressCounter = new WorkerProgressCounter(workerAddr);
                        workerProgressMap.put(workerAddr, workerProgressCounter);
                    }
                }
            }
            workerProgressMap.get(workerAddr).incrementTotal();
            workerProgressMap.get(workerAddr).incrementPulled();
        }
    }

    private synchronized String selectWorker() {
        if (this.index < 0 || this.index >= Integer.MAX_VALUE) {
            this.index = 0;
        }
        String worker = this.router.route(this.jobInstanceInfo.getAppGroupId(), this.jobInstanceInfo.getJobId(), this.jobInstanceInfo.getAllWorkers(), this.jobInstanceInfo.getTargetWorkerAddrsMap(), this.index++, this.getLocalWorkerIdAddr());
        return worker;
    }

    @Override
    public synchronized void handleWorkerShutdown(String workerIdAddr, boolean withFailover) {
        this.existInvalidWorker = true;
        this.invalidWorkerSet.add(workerIdAddr);
        if (!this.aliveCheckWorkerSet.contains(workerIdAddr)) {
            return;
        }
        String[] workerInfo = workerIdAddr.split("@");
        String workerAddr = workerInfo[1];
        String workerId = workerInfo[0];
        this.aliveCheckWorkerSet.remove(workerIdAddr);
        this.jobInstanceInfo.getAllWorkers().remove(workerIdAddr);
        if (withFailover && this.xAttrs != null && this.xAttrs.isFailover()) {
            int affectCnt = this.taskPersistence.batchUpdateTaskStatus(this.jobInstanceInfo.getJobInstanceId(), TaskStatus.INIT, workerId, workerAddr);
            LOGGER.warn("jobInstanceId={}, failover task number:{}, workerId:{}, workerAddr:{}", this.jobInstanceInfo.getJobInstanceId(), affectCnt, workerId, workerAddr);
            if (affectCnt > 0) {
                for (StreamJobProgressDetail progressDetail : this.streamJobProgressMap.values()) {
                    WorkerProgressCounter workerProgressCounter = progressDetail.getWorkerProgressMap().get(workerAddr);
                    int count2 = workerProgressCounter.getRunning() + workerProgressCounter.getPulled();
                    workerProgressCounter.decRunningAndTotal(count2);
                }
                this.taskDispatchReqHandler.release(affectCnt);
            }
        } else {
            int affectCnt = this.taskPersistence.batchUpdateTaskStatus(this.jobInstanceInfo.getJobInstanceId(), TaskStatus.FAILED, workerId, workerAddr);
            LOGGER.warn("jobInstanceId={}, worker shutdown, failed task number:{}, workerId:{}, workerAddr:{}", this.jobInstanceInfo.getJobInstanceId(), affectCnt, workerId, workerAddr);
            if (affectCnt > 0) {
                for (StreamJobProgressDetail progressDetail : this.streamJobProgressMap.values()) {
                    WorkerProgressCounter workerProgressCounter = progressDetail.getWorkerProgressMap().get(workerAddr);
                    int count3 = workerProgressCounter.getRunning() + workerProgressCounter.getPulled();
                    workerProgressCounter.incrementFailed(count3);
                }
                this.taskDispatchReqHandler.release(affectCnt);
            }
        }
    }

    @Override
    public ProcessResult postFinish(long jobInstanceId) {
        ProcessResult reduceResult = null;
        try {
            JobContext context = JobContext.newBuilder().setJobId(this.jobInstanceInfo.getJobId()).setJobInstanceId(jobInstanceId).setJobType(this.jobInstanceInfo.getJobType()).setContent(this.jobInstanceInfo.getContent()).setScheduleTime(this.jobInstanceInfo.getScheduleTime()).setDataTime(this.jobInstanceInfo.getDataTime()).setJobParameters(this.jobInstanceInfo.getParameters()).setInstanceParameters(this.jobInstanceInfo.getInstanceParameters()).setUser(this.jobInstanceInfo.getUser()).setSerialNum(this.getSerialNum()).build();
            JobProcessor jobProcessor = JobProcessorUtil.getJavaProcessor(context.getContent());
            if (jobProcessor instanceof StreamJobProcessor) {
                reduceResult = ((StreamJobProcessor)jobProcessor).postProcess(context);
            }
        }
        catch (Throwable e) {
            LOGGER.info("Stream job post finish failed.", e);
            String fixedErrMsg = ExceptionUtil.getFixedErrMsgByThrowable(e, 800);
            return new ProcessResult(false, "Stream job post finish failed:" + fixedErrMsg);
        }
        return reduceResult;
    }

    @Override
    public void killInstance(boolean mayInterruptIfRunning, String reason) {
        super.killInstance(mayInterruptIfRunning, reason);
        List<String> allWorkers = this.jobInstanceInfo.getAllWorkers();
        this.sendKillContainerRequest(mayInterruptIfRunning, allWorkers);
        this.updateNewInstanceStatus(this.getSerialNum(), this.jobInstanceInfo.getJobInstanceId(), InstanceStatus.FAILED, reason);
    }
}

