package com.alibaba.schedulerx.worker.master;

import akka.actor.ActorContext;
import com.alibaba.schedulerx.common.constants.CommonConstants;
import com.alibaba.schedulerx.common.domain.InstanceStatus;
import com.alibaba.schedulerx.common.domain.JobInstanceInfo;
import com.alibaba.schedulerx.common.domain.LimitedQueue;
import com.alibaba.schedulerx.common.domain.MapTaskXAttrs;
import com.alibaba.schedulerx.common.domain.Metrics;
import com.alibaba.schedulerx.common.domain.StreamJobProgress;
import com.alibaba.schedulerx.common.domain.StreamJobProgressDetail;
import com.alibaba.schedulerx.common.domain.TaskDispatchMode;
import com.alibaba.schedulerx.common.domain.TaskProgressCounter;
import com.alibaba.schedulerx.common.domain.TaskStatus;
import com.alibaba.schedulerx.common.domain.TimeType;
import com.alibaba.schedulerx.common.domain.WorkerProgressCounter;
import com.alibaba.schedulerx.common.domain.enums.RouteStrategyEnum;
import com.alibaba.schedulerx.common.util.ConfigUtil;
import com.alibaba.schedulerx.common.util.ExceptionUtil;
import com.alibaba.schedulerx.common.util.HessianUtil;
import com.alibaba.schedulerx.common.util.IdUtil;
import com.alibaba.schedulerx.common.util.JobUtil;
import com.alibaba.schedulerx.common.util.JsonUtil;
import com.alibaba.schedulerx.protocol.Worker;
import com.alibaba.schedulerx.protocol.utils.FutureUtils;
import com.alibaba.schedulerx.shade.com.google.common.collect.Lists;
import com.alibaba.schedulerx.shade.com.google.common.collect.Maps;
import com.alibaba.schedulerx.shade.com.google.protobuf.ByteString;
import com.alibaba.schedulerx.shade.com.mashape.unirest.http.options.Options;
import com.alibaba.schedulerx.shade.org.apache.commons.collections.CollectionUtils;
import com.alibaba.schedulerx.shade.org.apache.commons.collections.MapUtils;
import com.alibaba.schedulerx.shade.org.apache.commons.lang.StringUtils;
import com.alibaba.schedulerx.worker.SchedulerxWorker;
import com.alibaba.schedulerx.worker.batch.ReqQueue;
import com.alibaba.schedulerx.worker.batch.StreamTaskPushReqHandler;
import com.alibaba.schedulerx.worker.batch.TMStatusReqHandler;
import com.alibaba.schedulerx.worker.domain.JavaProcessorProfile;
import com.alibaba.schedulerx.worker.domain.JobContext;
import com.alibaba.schedulerx.worker.domain.TaskInfo;
import com.alibaba.schedulerx.worker.domain.WorkerConstants;
import com.alibaba.schedulerx.worker.log.LogFactory;
import com.alibaba.schedulerx.worker.log.Logger;
import com.alibaba.schedulerx.worker.logcollector.ClientLoggerMessage;
import com.alibaba.schedulerx.worker.logcollector.LogCollector;
import com.alibaba.schedulerx.worker.logcollector.LogCollectorFactory;
import com.alibaba.schedulerx.worker.master.persistence.H2FilePersistence;
import com.alibaba.schedulerx.worker.master.persistence.TaskPersistence;
import com.alibaba.schedulerx.worker.metrics.WorkerLoadRegister;
import com.alibaba.schedulerx.worker.processor.JobProcessor;
import com.alibaba.schedulerx.worker.processor.ProcessResult;
import com.alibaba.schedulerx.worker.processor.StreamJobProcessor;
import com.alibaba.schedulerx.worker.route.Router;
import com.alibaba.schedulerx.worker.route.RouterFactory;
import com.alibaba.schedulerx.worker.util.ActorPathUtil;
import com.alibaba.schedulerx.worker.util.ContanerUtil;
import com.alibaba.schedulerx.worker.util.JobProcessorUtil;
import com.alibaba.schedulerx.worker.util.WorkerConfigUtil;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.joda.time.DateTime;

/* loaded from: input_file:com/alibaba/schedulerx/worker/master/StreamTaskMaster.class */
public class StreamTaskMaster extends TaskMaster {
    private static final Logger LOGGER = LogFactory.getLogger(StreamTaskMaster.class);
    private volatile int index;
    protected ReqQueue<Worker.MasterStartContainerRequest> taskBlockingQueue;
    protected StreamTaskPushReqHandler<Worker.MasterStartContainerRequest> taskDispatchReqHandler;
    protected Thread streamProduceThread;
    private LogCollector logCollector;
    private LimitedQueue<StreamJobProgressDetail> streamJobProgressHistory;
    private Map<Long, StreamJobProgressDetail> streamJobProgressMap;
    protected Router router;
    protected ReqQueue<Worker.ContainerReportTaskStatusRequest> taskStatusReqQueue;
    protected TMStatusReqHandler<Worker.ContainerReportTaskStatusRequest> taskStatusReqBatchHandler;
    protected TaskPersistence taskPersistence;
    protected MapTaskXAttrs xAttrs;
    protected StreamJobProcessor streamJobProcessor;
    private Map<Long, Map<Long, String>> taskResultMap;
    private Map<Long, Map<Long, TaskStatus>> taskStatusMap;
    private TaskProgressCounter totalCounter;

    public StreamTaskMaster(JobInstanceInfo jobInstanceInfo, ActorContext actorContext) throws Exception {
        super(jobInstanceInfo, actorContext);
        this.index = 0;
        this.logCollector = LogCollectorFactory.get();
        this.streamJobProgressHistory = new LimitedQueue<>(10);
        this.streamJobProgressMap = Maps.newConcurrentMap();
        this.xAttrs = null;
        this.taskResultMap = Maps.newHashMap();
        this.taskStatusMap = Maps.newHashMap();
        this.totalCounter = new TaskProgressCounter("TotalCounter");
        this.taskPersistence = H2FilePersistence.getInstance();
        this.taskPersistence.initTable();
        this.streamJobProcessor = (StreamJobProcessor) JobProcessorUtil.getJavaProcessor(jobInstanceInfo.getContent());
        if (jobInstanceInfo.getXattrs() != null) {
            this.xAttrs = (MapTaskXAttrs) JsonUtil.fromJson(jobInstanceInfo.getXattrs(), MapTaskXAttrs.class);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void initTaskProgress(Long l, String str, int i) {
        if (this.streamJobProgressMap.containsKey(l)) {
            this.streamJobProgressMap.get(l).getTaskProgressCounter().incrementTotal(i);
        } else {
            synchronized (this) {
                if (!this.streamJobProgressMap.containsKey(l)) {
                    TaskProgressCounter taskProgressCounter = new TaskProgressCounter(str);
                    taskProgressCounter.incrementTotal(i);
                    this.streamJobProgressMap.put(l, new StreamJobProgressDetail(l.longValue(), DateTime.now().getMillis(), taskProgressCounter));
                }
            }
        }
        if (WorkerConstants.MAP_TASK_ROOT_NAME.equals(str)) {
            return;
        }
        this.totalCounter.incrementTotal(i);
    }

    @Override // com.alibaba.schedulerx.worker.master.TaskMaster
    public void batchUpdateTaskStatus(Worker.ContainerBatchReportTaskStatuesRequest containerBatchReportTaskStatuesRequest) throws Exception {
        setWorkerLoad(containerBatchReportTaskStatuesRequest.getWorkerId() + "@" + containerBatchReportTaskStatuesRequest.getWorkerAddr(), containerBatchReportTaskStatuesRequest.getMetricsJson(), null);
        super.batchUpdateTaskStatus(containerBatchReportTaskStatuesRequest);
    }

    @Override // com.alibaba.schedulerx.worker.master.TaskMaster
    public void updateTaskStatus(Worker.ContainerReportTaskStatusRequest containerReportTaskStatusRequest) {
        try {
            this.taskStatusReqQueue.submitRequest(containerReportTaskStatusRequest);
        } catch (Throwable th) {
            LOGGER.error("", th);
        }
    }

    @Override // com.alibaba.schedulerx.worker.master.TaskMaster
    public void batchUpdateTaskStatues(List<Worker.ContainerReportTaskStatusRequest> list) {
        HashMap newHashMap = Maps.newHashMap();
        for (Worker.ContainerReportTaskStatusRequest containerReportTaskStatusRequest : list) {
            try {
                TaskStatus parseValue = TaskStatus.parseValue(containerReportTaskStatusRequest.getStatus());
                if (!newHashMap.containsKey(Long.valueOf(containerReportTaskStatusRequest.getTaskId())) || parseValue.isFinish()) {
                    newHashMap.put(Long.valueOf(containerReportTaskStatusRequest.getTaskId()), containerReportTaskStatusRequest);
                }
                String workerAddr = containerReportTaskStatusRequest.getWorkerAddr();
                LOGGER.debug("report task status:{} from worker:{}, uniqueId:{}", parseValue.getDescription(), workerAddr, IdUtil.getUniqueId(containerReportTaskStatusRequest.getJobId(), containerReportTaskStatusRequest.getJobInstanceId(), containerReportTaskStatusRequest.getTaskId()));
                StreamJobProgressDetail streamJobProgressDetail = this.streamJobProgressMap.get(Long.valueOf(containerReportTaskStatusRequest.getSerialNum()));
                TaskProgressCounter taskProgressCounter = streamJobProgressDetail.getTaskProgressCounter();
                streamJobProgressDetail.setStatus(Integer.valueOf(TaskStatus.RUNNING.getValue()));
                Map<String, WorkerProgressCounter> workerProgressMap = streamJobProgressDetail.getWorkerProgressMap();
                if (parseValue.equals(TaskStatus.RUNNING)) {
                    taskProgressCounter.incrementRunning();
                    this.totalCounter.incrementRunning();
                    if (workerAddr != null) {
                        workerProgressMap.get(workerAddr).incrementRunning();
                    }
                } else if (parseValue.equals(TaskStatus.SUCCESS)) {
                    taskProgressCounter.incrementSuccess();
                    this.totalCounter.incrementSuccess();
                    this.taskDispatchReqHandler.release();
                    if (workerAddr != null) {
                        workerProgressMap.get(workerAddr).incrementSuccess();
                    }
                } else if (parseValue.equals(TaskStatus.FAILED)) {
                    taskProgressCounter.incrementFailed();
                    this.totalCounter.incrementFailed();
                    this.taskDispatchReqHandler.release();
                    if (workerAddr != null) {
                        workerProgressMap.get(workerAddr).incrementFailed();
                        if (StringUtils.isNotBlank(containerReportTaskStatusRequest.getTraceId())) {
                            workerProgressMap.get(workerAddr).setTraceId(containerReportTaskStatusRequest.getTraceId());
                        }
                    }
                }
                if (TaskStatus.FAILED.equals(parseValue)) {
                    String uniqueIdWithoutTask = IdUtil.getUniqueIdWithoutTask(this.jobInstanceInfo.getJobId(), this.jobInstanceInfo.getJobInstanceId());
                    LOGGER.info("jobInstanceId={}, taskId={}, report status failed. result:{}", Long.valueOf(this.jobInstanceInfo.getJobInstanceId()), Long.valueOf(containerReportTaskStatusRequest.getTaskId()), containerReportTaskStatusRequest.getResult());
                    this.logCollector.collect(this.jobInstanceInfo.getAppGroupId(), uniqueIdWithoutTask, ClientLoggerMessage.JOB_PROCESSOR_EXEC_FAIL + containerReportTaskStatusRequest.getTaskId() + ", " + containerReportTaskStatusRequest.getResult());
                }
                if (this.streamJobProcessor.needReduce() && parseValue.isFinish()) {
                    Map<Long, String> map = this.taskResultMap.get(Long.valueOf(containerReportTaskStatusRequest.getSerialNum()));
                    if (map == null) {
                        synchronized (this.taskResultMap) {
                            map = this.taskResultMap.get(Long.valueOf(containerReportTaskStatusRequest.getSerialNum()));
                            if (map == null) {
                                map = new HashMap();
                                this.taskResultMap.put(Long.valueOf(containerReportTaskStatusRequest.getSerialNum()), map);
                            }
                        }
                    }
                    map.put(Long.valueOf(containerReportTaskStatusRequest.getTaskId()), containerReportTaskStatusRequest.getResult());
                    Map<Long, TaskStatus> map2 = this.taskStatusMap.get(Long.valueOf(containerReportTaskStatusRequest.getSerialNum()));
                    if (map2 == null) {
                        synchronized (this.taskStatusMap) {
                            map2 = this.taskStatusMap.get(Long.valueOf(containerReportTaskStatusRequest.getSerialNum()));
                            if (map2 == null) {
                                map2 = new HashMap();
                                this.taskStatusMap.put(Long.valueOf(containerReportTaskStatusRequest.getSerialNum()), map2);
                            }
                        }
                    }
                    map2.put(Long.valueOf(containerReportTaskStatusRequest.getTaskId()), parseValue);
                }
            } catch (Throwable th) {
                LOGGER.error("jobInstanceId={}, batchNo={}, taskId={}, update progressMap error.", Long.valueOf(this.jobInstanceInfo.getJobInstanceId()), Long.valueOf(containerReportTaskStatusRequest.getSerialNum()), Long.valueOf(containerReportTaskStatusRequest.getTaskId()), th);
                updateNewInstanceStatus(getSerialNum(), InstanceStatus.FAILED, "update progressMap error." + th.getMessage());
            }
        }
        try {
            long currentTimeMillis = System.currentTimeMillis();
            boolean z = false;
            for (int i = 0; i < 3; i++) {
                try {
                    this.taskPersistence.updateTaskStatues(Lists.newArrayList(newHashMap.values()));
                    z = true;
                    break;
                } catch (Throwable th2) {
                    LOGGER.error("jobInstanceId={}, persistent batch updateTaskStatus error.", th2);
                }
            }
            if (!z) {
                updateNewInstanceStatus(getSerialNum(), InstanceStatus.FAILED, "persistent batch update TaskStatus error up to 3 times");
            }
            LOGGER.debug("{} batch update status db cost:{}", Long.valueOf(this.jobInstanceInfo.getJobInstanceId()), Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
        } catch (Throwable th3) {
            LOGGER.error("jobInstanceId={}, batch updateTaskStatus error.", Long.valueOf(this.jobInstanceInfo.getJobInstanceId()), th3);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.alibaba.schedulerx.worker.master.TaskMaster
    public void init() {
        if (this.INITED) {
            return;
        }
        this.INITED = true;
        this.taskBlockingQueue = new ReqQueue<>(this.jobInstanceInfo.getJobInstanceId(), this.xAttrs.getQueueSize());
        this.taskBlockingQueue.init();
        this.taskStatusReqQueue = new ReqQueue<>(this.jobInstanceInfo.getJobInstanceId(), 100000);
        this.taskStatusReqQueue.init();
        this.taskStatusReqBatchHandler = new TMStatusReqHandler<>(this.jobInstanceInfo.getJobInstanceId(), 1, 1, 3000, this.taskStatusReqQueue);
        this.taskDispatchReqHandler = new StreamTaskPushReqHandler<>(this.jobInstanceInfo.getJobInstanceId(), this.xAttrs.getGlobalConsumerSize(), this.jobInstanceInfo.getAllWorkers().size(), this.taskBlockingQueue);
        if (WorkerConfigUtil.isEnableShareContainerPool()) {
            this.router = RouterFactory.getRouter(this.jobInstanceInfo.getAppGroupId(), this.jobInstanceInfo.getJobId(), this.xAttrs.getRouteType().intValue(), this.jobInstanceInfo.getRouteStrategyContent());
        } else {
            this.router = RouterFactory.getRouter(this.jobInstanceInfo.getAppGroupId(), this.jobInstanceInfo.getJobId(), RouteStrategyEnum.ROUND_ROBIN.getValue().intValue(), this.jobInstanceInfo.getRouteStrategyContent());
        }
        if (this.router != null && (this.router instanceof WorkerLoadRegister)) {
            ((WorkerLoadRegister) this.router).clear();
        }
        final String str = this.jobInstanceInfo.getJobId() + IdUtil.SPLITTER_TOKEN + this.jobInstanceInfo.getJobInstanceId();
        new Thread(new Runnable() { // from class: com.alibaba.schedulerx.worker.master.StreamTaskMaster.1
            @Override // java.lang.Runnable
            public void run() {
                int i;
                loop0: while (!StreamTaskMaster.this.instanceStatus.isFinish()) {
                    try {
                        for (String str2 : StreamTaskMaster.this.aliveCheckWorkerSet) {
                            try {
                                String[] split = str2.split("@")[1].split(CommonConstants.ADDRESS_SEPARATOR);
                                String str3 = split[0];
                                int intValue = Integer.valueOf(split[1]).intValue();
                                i = 0;
                                while (true) {
                                    if (i >= 3) {
                                        break;
                                    }
                                    Socket socket = new Socket();
                                    try {
                                        try {
                                            socket.connect(new InetSocketAddress(str3, intValue), 5000);
                                            StreamTaskMaster.LOGGER.info("socket to {}:{} is reachable, times={}", str3, Integer.valueOf(intValue), Integer.valueOf(i));
                                            if (socket != null) {
                                                socket.close();
                                            }
                                        } catch (Throwable th) {
                                            if (socket != null) {
                                                socket.close();
                                            }
                                            throw th;
                                            break loop0;
                                        }
                                    } catch (Exception e) {
                                        StreamTaskMaster.LOGGER.info("socket to {}:{} is not reachable, times={}", str3, Integer.valueOf(intValue), Integer.valueOf(i));
                                        Thread.sleep(5000L);
                                        i++;
                                        if (socket != null) {
                                            socket.close();
                                        }
                                    }
                                }
                            } catch (Exception e2) {
                                StreamTaskMaster.LOGGER.error("Alive worker check failed.", e2);
                                StreamTaskMaster.this.handleWorkerShutdown(str2, true);
                                SchedulerxWorker.AtLeastDeliveryRoutingActor.tell(Worker.MasterDestroyContainerPoolRequest.newBuilder().setJobInstanceId(StreamTaskMaster.this.jobInstanceInfo.getJobInstanceId()).setJobId(StreamTaskMaster.this.jobInstanceInfo.getJobId()).setWorkerIdAddr(str2).setSerialNum(StreamTaskMaster.this.getSerialNum()).build(), null);
                            }
                            if (i >= 3) {
                                StreamTaskMaster.LOGGER.warn("worker[{}] is down, start to remove this worker and failover tasks, jobInstanceId={}", str2, Long.valueOf(StreamTaskMaster.this.jobInstanceInfo.getJobInstanceId()));
                                StreamTaskMaster.this.handleWorkerShutdown(str2, true);
                            } else {
                                long currentTimeMillis = System.currentTimeMillis();
                                Worker.MasterCheckWorkerAliveResponse masterCheckWorkerAliveResponse = (Worker.MasterCheckWorkerAliveResponse) FutureUtils.awaitResult(StreamTaskMaster.this.getActorContext().actorSelection(ActorPathUtil.getWorkerHeartbeatRouterPath(str2)), Worker.MasterCheckWorkerAliveRequest.newBuilder().setJobInstanceId(StreamTaskMaster.this.jobInstanceInfo.getJobInstanceId()).setDispatchMode(StreamTaskMaster.this.xAttrs.getTaskDispatchMode()).build(), 10L);
                                if (masterCheckWorkerAliveResponse.getSuccess()) {
                                    StreamTaskMaster.this.setWorkerLoad(str2, masterCheckWorkerAliveResponse.getMetricsJson(), Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
                                } else {
                                    StreamTaskMaster.LOGGER.warn("jobInstanceId={} of worker={} is not alive", Long.valueOf(StreamTaskMaster.this.jobInstanceInfo.getJobInstanceId()), str2, masterCheckWorkerAliveResponse.getMessage());
                                    StreamTaskMaster.this.handleWorkerShutdown(str2, true);
                                    SchedulerxWorker.AtLeastDeliveryRoutingActor.tell(Worker.MasterDestroyContainerPoolRequest.newBuilder().setJobInstanceId(StreamTaskMaster.this.jobInstanceInfo.getJobInstanceId()).setJobId(StreamTaskMaster.this.jobInstanceInfo.getJobId()).setWorkerIdAddr(str2).setSerialNum(StreamTaskMaster.this.getSerialNum()).build(), null);
                                }
                            }
                        }
                        Thread.sleep(Options.CONNECTION_TIMEOUT);
                    } catch (Throwable th2) {
                        StreamTaskMaster.LOGGER.error("check worker error, jobInstanceId={}", Long.valueOf(StreamTaskMaster.this.jobInstanceInfo.getJobInstanceId()), th2);
                    }
                }
            }
        }, "Schedulerx-StreamTaskMaster-check-worker-alive-thread-" + str).start();
        new Thread(new Runnable() { // from class: com.alibaba.schedulerx.worker.master.StreamTaskMaster.2
            @Override // java.lang.Runnable
            public void run() {
                int i = ConfigUtil.getWorkerConfig().getInt(WorkerConstants.MAP_MASTER_PAGE_SIZE, 100);
                while (!StreamTaskMaster.this.instanceStatus.isFinish()) {
                    try {
                        long currentTimeMillis = System.currentTimeMillis();
                        List<TaskInfo> pull = StreamTaskMaster.this.taskPersistence.pull(StreamTaskMaster.this.jobInstanceInfo.getJobInstanceId(), i);
                        StreamTaskMaster.LOGGER.debug("jobInstanceId={}, pull cost={}ms", Long.valueOf(StreamTaskMaster.this.jobInstanceInfo.getJobInstanceId()), Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
                        if (pull.isEmpty()) {
                            StreamTaskMaster.LOGGER.debug("pull task empty of jobInstanceId={}, sleep 10000 ms ...", Long.valueOf(StreamTaskMaster.this.jobInstanceInfo.getJobInstanceId()));
                            Thread.sleep(Options.CONNECTION_TIMEOUT);
                        } else {
                            StreamTaskMaster.LOGGER.info("jobInstanceId={}, failover retry dispatch taskList, size:{} , cost={}ms", Long.valueOf(StreamTaskMaster.this.jobInstanceInfo.getJobInstanceId()), Integer.valueOf(pull.size()), Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
                            for (TaskInfo taskInfo : pull) {
                                ByteString byteString = null;
                                if (taskInfo.getTaskBody() != null) {
                                    byteString = ByteString.copyFrom(taskInfo.getTaskBody());
                                }
                                Worker.MasterStartContainerRequest.Builder convert2StartContainerRequestBuilder = StreamTaskMaster.this.convert2StartContainerRequestBuilder(StreamTaskMaster.this.jobInstanceInfo, taskInfo.getTaskId(), taskInfo.getTaskName(), byteString, true);
                                convert2StartContainerRequestBuilder.setSerialNum(taskInfo.getBatchNo());
                                StreamTaskMaster.this.taskBlockingQueue.submitRequest(convert2StartContainerRequestBuilder.build());
                            }
                        }
                    } catch (TimeoutException e) {
                        StreamTaskMaster.LOGGER.error("pull task timeout, uniqueId:{}", str, e);
                        StreamTaskMaster.this.logCollector.collect(StreamTaskMaster.this.jobInstanceInfo.getAppGroupId(), str, ClientLoggerMessage.MAP_INSTANCE_PULL_JOB_FAIL, e);
                        try {
                            Thread.sleep(Options.CONNECTION_TIMEOUT);
                        } catch (InterruptedException e2) {
                        }
                    } catch (Throwable th) {
                        StreamTaskMaster.this.updateNewInstanceStatus(StreamTaskMaster.this.getSerialNum(), InstanceStatus.FAILED, ExceptionUtil.getMessage(th));
                        StreamTaskMaster.this.logCollector.collect(StreamTaskMaster.this.jobInstanceInfo.getAppGroupId(), str, ClientLoggerMessage.MAP_INSTANCE_PULL_JOB_FAIL, th);
                        StreamTaskMaster.LOGGER.error("pull task error, uniqueId:{}", str, th);
                    }
                }
            }
        }, "Schedulerx-StreamTaskMaster-pull-thread-" + str).start();
    }

    @Override // com.alibaba.schedulerx.worker.master.TaskMaster
    public void submitInstance(final JobInstanceInfo jobInstanceInfo) throws Exception {
        try {
            init();
            this.taskDispatchReqHandler.start();
            this.taskStatusReqBatchHandler.start();
            createProduceTask();
            final String str = jobInstanceInfo.getJobId() + IdUtil.SPLITTER_TOKEN + jobInstanceInfo.getJobInstanceId();
            new Thread(new Runnable() { // from class: com.alibaba.schedulerx.worker.master.StreamTaskMaster.3
                @Override // java.lang.Runnable
                public void run() {
                    int i = ConfigUtil.getWorkerConfig().getInt(WorkerConstants.Map_MASTER_STATUS_CHECK_INTERVAL, 3000);
                    while (!StreamTaskMaster.this.instanceStatus.isFinish()) {
                        try {
                            Thread.sleep(i);
                            if (MapUtils.isNotEmpty(StreamTaskMaster.this.streamJobProgressMap)) {
                                for (Map.Entry entry : StreamTaskMaster.this.streamJobProgressMap.entrySet()) {
                                    boolean allTasksPushed = StreamTaskMaster.this.taskDispatchReqHandler.allTasksPushed((Long) entry.getKey());
                                    StreamJobProgressDetail streamJobProgressDetail = (StreamJobProgressDetail) entry.getValue();
                                    TaskProgressCounter taskProgressCounter = streamJobProgressDetail.getTaskProgressCounter();
                                    if ((allTasksPushed || taskProgressCounter.getTotal() <= taskProgressCounter.getFailed() + taskProgressCounter.getSuccess()) && StreamTaskMaster.this.taskPersistence.checkInstanceStatus(Long.valueOf(jobInstanceInfo.getJobInstanceId()), (Long) entry.getKey()).isFinish()) {
                                        ProcessResult processResult = new ProcessResult(true);
                                        if (((Long) entry.getKey()).longValue() > 0 && StreamTaskMaster.this.streamJobProcessor.needReduce()) {
                                            try {
                                                processResult = StreamTaskMaster.this.streamJobProcessor.reduce(JobContext.newBuilder().setJobId(jobInstanceInfo.getJobId()).setJobInstanceId(jobInstanceInfo.getJobInstanceId()).setJobType(jobInstanceInfo.getJobType()).setContent(jobInstanceInfo.getContent()).setScheduleTime(jobInstanceInfo.getScheduleTime()).setDataTime(jobInstanceInfo.getDataTime()).setJobParameters(jobInstanceInfo.getParameters()).setInstanceParameters(jobInstanceInfo.getInstanceParameters()).setUser(jobInstanceInfo.getUser()).setTaskResults((Map) StreamTaskMaster.this.taskResultMap.get(entry.getKey())).setTaskStatuses((Map) StreamTaskMaster.this.taskStatusMap.get(entry.getKey())).setSerialNum(((Long) entry.getKey()).longValue()).build());
                                                if (processResult == null) {
                                                    processResult = new ProcessResult(false, "Reduce can not return NULL.");
                                                }
                                            } catch (Throwable th) {
                                                StreamTaskMaster.LOGGER.error("Stream job jobId={} jobInstanceId={} batchNo={} reduce exception.", Long.valueOf(jobInstanceInfo.getJobId()), Long.valueOf(jobInstanceInfo.getJobInstanceId()), entry.getKey(), th);
                                                processResult = new ProcessResult(false, th.getMessage());
                                            }
                                        }
                                        if (InstanceStatus.FAILED.equals(processResult.getStatus())) {
                                            StreamTaskMaster.LOGGER.error("Stream job jobId={} jobInstanceId={} batchNo={} reduce failed. Result:{}", Long.valueOf(jobInstanceInfo.getJobId()), Long.valueOf(jobInstanceInfo.getJobInstanceId()), entry.getKey(), processResult.getResult());
                                        }
                                        if (streamJobProgressDetail.getTaskProgressCounter().getFailed() > 0 || InstanceStatus.FAILED.equals(processResult.getStatus())) {
                                            streamJobProgressDetail.setStatus(Integer.valueOf(InstanceStatus.FAILED.getValue()));
                                        } else {
                                            streamJobProgressDetail.setStatus(Integer.valueOf(InstanceStatus.SUCCESS.getValue()));
                                        }
                                        streamJobProgressDetail.setEndTime(DateTime.now().getMillis());
                                        StreamTaskMaster.this.streamJobProgressHistory.add(streamJobProgressDetail);
                                        StreamTaskMaster.this.streamJobProgressMap.remove(entry.getKey());
                                        StreamTaskMaster.this.taskStatusMap.remove(entry.getKey());
                                        StreamTaskMaster.this.taskResultMap.remove(entry.getKey());
                                    }
                                }
                            } else if (!StreamTaskMaster.this.streamProduceThread.isAlive()) {
                                StreamTaskMaster.this.updateNewInstanceStatus(StreamTaskMaster.this.getSerialNum(), jobInstanceInfo.getJobInstanceId(), InstanceStatus.FAILED, SchedulerxWorker.INITED ? "Produce task is stopped." : "Worker master shutdown.");
                            }
                        } catch (Throwable th2) {
                            StreamTaskMaster.LOGGER.error("status check error, uniqueId:{}", str, th2);
                        }
                    }
                }
            }, "Schedulerx-StreamTaskMaster-status-check-thread-" + str).start();
            if (!JobUtil.isSecondTypeJob(TimeType.parseValue(jobInstanceInfo.getTimeType()))) {
                new Thread(new Runnable() { // from class: com.alibaba.schedulerx.worker.master.StreamTaskMaster.4
                    @Override // java.lang.Runnable
                    public void run() {
                        while (!StreamTaskMaster.this.instanceStatus.isFinish()) {
                            StreamTaskMaster.this.SERVER_DISCOVERY.getMapMasterRouter().tell(Worker.WorkerReportJobInstanceProgressRequest.newBuilder().setJobId(jobInstanceInfo.getJobId()).setJobInstanceId(jobInstanceInfo.getJobInstanceId()).setProgress(StreamTaskMaster.this.getJobInstanceProgress()).build(), null);
                            try {
                                Thread.sleep(5000L);
                            } catch (InterruptedException e) {
                                StreamTaskMaster.LOGGER.error("report status error, uniqueId={}", str, e);
                                return;
                            }
                        }
                    }
                }, "Schedulerx-StreamTaskMaster-report-progress-thread-" + str).start();
            }
        } catch (Throwable th) {
            String str2 = jobInstanceInfo.getJobId() + IdUtil.SPLITTER_TOKEN + jobInstanceInfo.getJobInstanceId();
            LOGGER.error("submit instance failed.", th);
            updateNewInstanceStatus(getSerialNum(), InstanceStatus.FAILED, ExceptionUtil.getMessage(th));
            this.logCollector.collect(jobInstanceInfo.getAppGroupId(), str2, ClientLoggerMessage.INSTANCE_INIT_FAIL, th);
        }
    }

    protected void createProduceTask() throws Exception {
        initTaskProgress(Long.valueOf(getSerialNum()), WorkerConstants.MAP_TASK_ROOT_NAME, 1);
        final Worker.MasterStartContainerRequest convert2StartContainerRequest = convert2StartContainerRequest(this.jobInstanceInfo, aquireTaskId(), WorkerConstants.MAP_TASK_ROOT_NAME, null);
        String localWorkerIdAddr = getLocalWorkerIdAddr();
        final String str = localWorkerIdAddr.split("@")[0];
        final String str2 = localWorkerIdAddr.split("@")[1];
        batchHandlePersistence(str, str2, Lists.newArrayList(convert2StartContainerRequest), false);
        batchHandlePulledProgress(Lists.newArrayList(convert2StartContainerRequest), Maps.newHashMap(), Maps.newHashMap(), localWorkerIdAddr);
        this.streamProduceThread = new Thread(new Runnable() { // from class: com.alibaba.schedulerx.worker.master.StreamTaskMaster.5
            @Override // java.lang.Runnable
            public void run() {
                try {
                    ((StreamJobProgressDetail) StreamTaskMaster.this.streamJobProgressMap.get(Long.valueOf(StreamTaskMaster.this.getSerialNum()))).setStatus(Integer.valueOf(TaskStatus.RUNNING.getValue()));
                    ((StreamJobProgressDetail) StreamTaskMaster.this.streamJobProgressMap.get(Long.valueOf(StreamTaskMaster.this.getSerialNum()))).getTaskProgressCounter().incrementRunning();
                    ((StreamJobProgressDetail) StreamTaskMaster.this.streamJobProgressMap.get(Long.valueOf(StreamTaskMaster.this.getSerialNum()))).getWorkerProgressMap().get(str2).incrementRunning();
                    JobContext convert2JobContext = ContanerUtil.convert2JobContext(convert2StartContainerRequest);
                    long produceInterval = StreamTaskMaster.this.xAttrs.getProduceInterval();
                    while (!StreamTaskMaster.this.instanceStatus.isFinish() && SchedulerxWorker.INITED) {
                        if (StreamTaskMaster.this.streamJobProgressMap.size() < 15) {
                            ((StreamJobProgressDetail) StreamTaskMaster.this.streamJobProgressMap.get(0L)).setStartTime(DateTime.now().getMillis());
                            convert2JobContext.setSerialNum(StreamTaskMaster.this.aquireSerialNum());
                            List<Object> produce = StreamTaskMaster.this.streamJobProcessor.produce(convert2JobContext);
                            if (!CollectionUtils.isEmpty(produce)) {
                                StreamTaskMaster.this.initTaskProgress(Long.valueOf(StreamTaskMaster.this.getSerialNum()), "SubTask", produce.size());
                                Iterator<Object> it = produce.iterator();
                                while (it.hasNext()) {
                                    StreamTaskMaster.this.taskBlockingQueue.submitRequest(StreamTaskMaster.this.convert2StartContainerRequest(StreamTaskMaster.this.jobInstanceInfo, StreamTaskMaster.this.aquireTaskId(), "SubTask", ByteString.copyFrom(HessianUtil.toBytes(it.next()))));
                                }
                            }
                        }
                        TimeUnit.SECONDS.sleep(((StreamTaskMaster.this.streamJobProgressMap.size() / 5) + 1) * produceInterval);
                    }
                    StreamTaskMaster.this.updateTaskStatus(Worker.ContainerReportTaskStatusRequest.newBuilder().setJobId(StreamTaskMaster.this.jobInstanceInfo.getJobId()).setJobInstanceId(StreamTaskMaster.this.jobInstanceInfo.getJobInstanceId()).setTaskId(convert2StartContainerRequest.getTaskId()).setStatus(TaskStatus.SUCCESS.getValue()).setWorkerId(str).setTaskName(convert2StartContainerRequest.getTaskName()).setWorkerAddr(str2).setSerialNum(convert2StartContainerRequest.getSerialNum()).build());
                } catch (Throwable th) {
                    StreamTaskMaster.LOGGER.error("stream job produce running failed.", th);
                    String localWorkerIdAddr2 = StreamTaskMaster.this.getLocalWorkerIdAddr();
                    String str3 = localWorkerIdAddr2.split("@")[0];
                    String str4 = localWorkerIdAddr2.split("@")[1];
                    if (convert2StartContainerRequest != null) {
                        StreamTaskMaster.this.updateTaskStatus(Worker.ContainerReportTaskStatusRequest.newBuilder().setJobId(StreamTaskMaster.this.jobInstanceInfo.getJobId()).setJobInstanceId(StreamTaskMaster.this.jobInstanceInfo.getJobInstanceId()).setTaskId(convert2StartContainerRequest.getTaskId()).setStatus(TaskStatus.FAILED.getValue()).setWorkerId(str3).setTaskName(convert2StartContainerRequest.getTaskName()).setWorkerAddr(str4).setSerialNum(convert2StartContainerRequest.getSerialNum()).build());
                    } else {
                        String str5 = StreamTaskMaster.this.jobInstanceInfo.getJobId() + IdUtil.SPLITTER_TOKEN + StreamTaskMaster.this.jobInstanceInfo.getJobInstanceId();
                        StreamTaskMaster.this.updateNewInstanceStatus(StreamTaskMaster.this.getSerialNum(), InstanceStatus.FAILED, ExceptionUtil.getMessage(th));
                        StreamTaskMaster.this.logCollector.collect(StreamTaskMaster.this.jobInstanceInfo.getAppGroupId(), str5, ClientLoggerMessage.INSTANCE_INIT_FAIL, th);
                    }
                }
            }
        }, "Schedulerx-stream-produce-thread-" + this.jobInstanceInfo.getJobInstanceId());
        this.streamProduceThread.start();
    }

    @Override // com.alibaba.schedulerx.worker.master.TaskMaster
    protected void checkProcessor() throws Exception {
        if (WorkerConstants.WORKER_STARTER_MODE_DEFAULT.equalsIgnoreCase(this.jobInstanceInfo.getJobType())) {
            JavaProcessorProfile javaProcessorProfile = (JavaProcessorProfile) JsonUtil.fromJson(this.jobInstanceInfo.getContent(), JavaProcessorProfile.class);
            if (!JobProcessorUtil.checkJavaProcessor(javaProcessorProfile.getClassName(), StreamJobProcessor.class).booleanValue()) {
                throw new IOException(javaProcessorProfile.getClassName() + " must extends StreamJobProcessor");
            }
        }
    }

    @Override // com.alibaba.schedulerx.worker.master.TaskMaster
    public void destroyContainerPool() {
        Iterator<String> it = this.jobInstanceInfo.getAllWorkers().iterator();
        while (it.hasNext()) {
            SchedulerxWorker.AtLeastDeliveryRoutingActor.tell(Worker.MasterDestroyContainerPoolRequest.newBuilder().setJobInstanceId(this.jobInstanceInfo.getJobInstanceId()).setSerialNum(getSerialNum()).setJobId(this.jobInstanceInfo.getJobId()).setWorkerIdAddr(it.next()).build(), null);
        }
    }

    @Override // com.alibaba.schedulerx.worker.master.TaskMaster
    public void clear() {
        super.clear();
        if (this.taskStatusReqQueue != null) {
            this.taskStatusReqQueue.clear();
        }
        if (this.taskBlockingQueue != null) {
            this.taskBlockingQueue.clear();
        }
        if (this.taskDispatchReqHandler != null) {
            this.taskDispatchReqHandler.clear();
        }
        if (this.taskStatusReqBatchHandler != null) {
            this.taskStatusReqBatchHandler.clear();
        }
        clearTasks(this.jobInstanceInfo.getJobInstanceId());
    }

    private void clearTasks(long j) {
        try {
            this.taskPersistence.clearTasks(j);
            LOGGER.info("jobInstanceId={} clearTasks success.", Long.valueOf(j));
        } catch (Throwable th) {
            LOGGER.error("jobInstanceId={} clearTasks error", Long.valueOf(j), th);
        }
    }

    @Override // com.alibaba.schedulerx.worker.master.TaskMaster
    public void stop() {
        if (this.taskDispatchReqHandler != null) {
            this.taskDispatchReqHandler.stop();
        }
        if (this.taskStatusReqBatchHandler != null) {
            this.taskStatusReqBatchHandler.stop();
        }
        LOGGER.info("jobInstanceId:{}, instance master successfully stop.", Long.valueOf(this.jobInstanceInfo.getJobInstanceId()));
    }

    @Override // com.alibaba.schedulerx.worker.master.TaskMaster
    protected void doTerminate() {
        if (this.taskDispatchReqHandler != null) {
            this.taskDispatchReqHandler.stop();
        }
    }

    @Override // com.alibaba.schedulerx.worker.master.TaskMaster
    public String getJobInstanceProgress() {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.putAll(this.streamJobProgressMap);
        Iterator<StreamJobProgressDetail> it = this.streamJobProgressHistory.iterator();
        while (it.hasNext()) {
            StreamJobProgressDetail next = it.next();
            linkedHashMap.put(Long.valueOf(next.getBatchNum()), next);
        }
        return JsonUtil.toJson(new StreamJobProgress(((StreamJobProgressDetail) linkedHashMap.get(0L)).getStatus(), this.totalCounter, this.xAttrs.getQueueSize(), this.taskBlockingQueue.size(), linkedHashMap));
    }

    private void setWorkerInvalid(String str) {
        try {
            this.invalidWorkerSet.add(str);
            if (this.router != null && (this.router instanceof WorkerLoadRegister)) {
                ((WorkerLoadRegister) this.router).setAvailableSize(str, 0);
            }
        } catch (Exception e) {
            LOGGER.warn("Set worker load failed.", e);
        }
    }

    private void initTaskFailover(List<Worker.MasterStartContainerRequest> list, String str) {
        LOGGER.warn("jobInstanceId={}, worker[{}] is down, try another worker, size:{}", Long.valueOf(this.jobInstanceInfo.getJobInstanceId()), str, Integer.valueOf(list.size()));
        String str2 = str.split("@")[0];
        String str3 = str.split("@")[1];
        ArrayList newArrayList = Lists.newArrayList();
        HashMap hashMap = new HashMap();
        for (Worker.MasterStartContainerRequest masterStartContainerRequest : list) {
            newArrayList.add(Long.valueOf(masterStartContainerRequest.getTaskId()));
            Integer num = (Integer) hashMap.get(Long.valueOf(masterStartContainerRequest.getSerialNum()));
            if (num == null) {
                hashMap.put(Long.valueOf(masterStartContainerRequest.getSerialNum()), 1);
            } else {
                hashMap.put(Long.valueOf(masterStartContainerRequest.getSerialNum()), Integer.valueOf(num.intValue() + 1));
            }
        }
        try {
            int updateTaskStatus = this.taskPersistence.updateTaskStatus(this.jobInstanceInfo.getJobInstanceId(), newArrayList, TaskStatus.INIT, str2, str3);
            LOGGER.warn("jobInstanceId={}, worker[{}] is down, reset task status, size:{}", Long.valueOf(this.jobInstanceInfo.getJobInstanceId()), str, Integer.valueOf(updateTaskStatus));
            Iterator it = hashMap.entrySet().iterator();
            while (it.hasNext()) {
                this.streamJobProgressMap.get(((Map.Entry) it.next()).getKey()).getWorkerProgressMap().get(str3).decPulledAndTotal(updateTaskStatus);
            }
        } catch (Exception e) {
            LOGGER.error("jobInstanceId={}, timeout return init error", Long.valueOf(this.jobInstanceInfo.getJobInstanceId()));
            updateNewInstanceStatus(getSerialNum(), InstanceStatus.FAILED, "timeout dispatch return init error");
        }
    }

    private void processDispatchException(String str, List<Worker.MasterStartContainerRequest> list, Throwable th) {
        String str2 = str.split("@")[0];
        String str3 = str.split("@")[1];
        if ((this.xAttrs != null && this.xAttrs.isFailover()) && (th instanceof TimeoutException)) {
            initTaskFailover(list, str);
        } else {
            String uniqueIdWithoutTask = IdUtil.getUniqueIdWithoutTask(this.jobInstanceInfo.getJobId(), this.jobInstanceInfo.getJobInstanceId());
            LOGGER.error("jobInstanceId:{}, batch dispatch Tasks error worker={}, size:{}", Long.valueOf(this.jobInstanceInfo.getJobInstanceId()), str, Integer.valueOf(list.size()), th);
            this.logCollector.collect(this.jobInstanceInfo.getAppGroupId(), uniqueIdWithoutTask, ClientLoggerMessage.MAP_INSTANCE_DISPATCH_JOB_FAIL, th);
            for (Worker.MasterStartContainerRequest masterStartContainerRequest : list) {
                updateTaskStatus(Worker.ContainerReportTaskStatusRequest.newBuilder().setJobId(this.jobInstanceInfo.getJobId()).setJobInstanceId(this.jobInstanceInfo.getJobInstanceId()).setTaskId(masterStartContainerRequest.getTaskId()).setStatus(TaskStatus.FAILED.getValue()).setResult("Dispatch tasks error. Cause by " + th.getMessage()).setWorkerId(str2).setTaskName(masterStartContainerRequest.getTaskName()).setWorkerAddr(str3).setTaskName(masterStartContainerRequest.getTaskName()).setSerialNum(masterStartContainerRequest.getSerialNum()).build());
            }
        }
        setWorkerInvalid(str);
    }

    private void processDispatchResponse(String str, List<Worker.MasterStartContainerRequest> list, Worker.MasterBatchStartContainersResponse masterBatchStartContainersResponse, long j) {
        String str2 = str.split("@")[0];
        String str3 = str.split("@")[1];
        if (masterBatchStartContainersResponse.getSuccess()) {
            LOGGER.info("jobInstanceId={}, batch start containers successfully, size:{} , worker={}, cost={}ms", Long.valueOf(this.jobInstanceInfo.getJobInstanceId()), Integer.valueOf(list.size()), str, Long.valueOf(System.currentTimeMillis() - j));
            this.aliveCheckWorkerSet.add(str);
            setWorkerLoad(str, masterBatchStartContainersResponse.getMetricsJson(), Long.valueOf(System.currentTimeMillis() - j));
            return;
        }
        if ((this.xAttrs != null && this.xAttrs.isFailover()) && masterBatchStartContainersResponse.getMessage() != null && masterBatchStartContainersResponse.getMessage().contains(WorkerConstants.WORKER_NOT_RUNNING_MESSAGE)) {
            initTaskFailover(list, str);
        } else {
            LOGGER.error("jobInstanceId={}, batch start containers failed, worker={}, response={}, size:{}", Long.valueOf(this.jobInstanceInfo.getJobInstanceId()), str, masterBatchStartContainersResponse.getMessage(), Integer.valueOf(list.size()));
            for (Worker.MasterStartContainerRequest masterStartContainerRequest : list) {
                updateTaskStatus(Worker.ContainerReportTaskStatusRequest.newBuilder().setJobId(this.jobInstanceInfo.getJobId()).setJobInstanceId(this.jobInstanceInfo.getJobInstanceId()).setTaskId(masterStartContainerRequest.getTaskId()).setStatus(TaskStatus.FAILED.getValue()).setResult(masterBatchStartContainersResponse.getMessage()).setWorkerId(str2).setTaskName(masterStartContainerRequest.getTaskName()).setWorkerAddr(str3).setTaskName(masterStartContainerRequest.getTaskName()).setSerialNum(masterStartContainerRequest.getSerialNum()).build());
            }
        }
        setWorkerInvalid(str);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void setWorkerLoad(String str, String str2, Long l) {
        try {
            if (this.router != null && (this.router instanceof WorkerLoadRegister) && StringUtils.isNotEmpty(str2)) {
                Metrics metrics = (Metrics) JsonUtil.fromJson(str2, Metrics.class);
                if (metrics != null) {
                    LOGGER.info("update worker load, worker={}, sharePoolAvailableSize={}, cost={}", str, Integer.valueOf(metrics.getSharePoolAvailableSize()), l);
                    ((WorkerLoadRegister) this.router).setAvailableSize(str, Integer.valueOf(metrics.getSharePoolAvailableSize()));
                    ((WorkerLoadRegister) this.router).setRemainCpu(str, Integer.valueOf((int) (metrics.getCpuProcessors() - metrics.getCpuLoad1())));
                    ((WorkerLoadRegister) this.router).setRemainMemory(str, Long.valueOf((long) (100.0d - (metrics.getHeap1Usage() * 100.0d))));
                    if (l != null) {
                        ((WorkerLoadRegister) this.router).setCost(str, l);
                    }
                }
                synchronized (this.router) {
                    this.router.notifyAll();
                }
            }
        } catch (Exception e) {
            LOGGER.warn("Set worker load failed.", e);
        }
    }

    private void batchHandleContainers(String str, List<Worker.MasterStartContainerRequest> list, boolean z, TaskDispatchMode taskDispatchMode) {
        String str2 = str.split("@")[0];
        String str3 = str.split("@")[1];
        LOGGER.debug("jobInstanceId={}, batch dispatch, worker:{}, size:{}", Long.valueOf(this.jobInstanceInfo.getJobInstanceId()), str, Integer.valueOf(list.size()));
        try {
            batchHandlePersistence(str2, str3, list, z);
            try {
                processDispatchResponse(str, list, (Worker.MasterBatchStartContainersResponse) FutureUtils.awaitResult(getActorContext().actorSelection(ActorPathUtil.getContainerRouterPath(str)), Worker.MasterBatchStartContainersRequest.newBuilder().setJobInstanceId(this.jobInstanceInfo.getJobInstanceId()).setJobId(this.jobInstanceInfo.getJobId()).addAllStartReqs(list).build(), 3L), System.currentTimeMillis());
            } catch (Throwable th) {
                processDispatchException(str, list, th);
            }
        } catch (Throwable th2) {
            String uniqueIdWithoutTask = IdUtil.getUniqueIdWithoutTask(this.jobInstanceInfo.getJobId(), this.jobInstanceInfo.getJobInstanceId());
            LOGGER.error("jobInstanceId:{}, batch dispatch Tasks error worker={}, size:{}", Long.valueOf(this.jobInstanceInfo.getJobInstanceId()), str, Integer.valueOf(list.size()), th2);
            this.logCollector.collect(this.jobInstanceInfo.getAppGroupId(), uniqueIdWithoutTask, ClientLoggerMessage.MAP_INSTANCE_DISPATCH_JOB_FAIL, th2);
            for (Worker.MasterStartContainerRequest masterStartContainerRequest : list) {
                updateTaskStatus(Worker.ContainerReportTaskStatusRequest.newBuilder().setJobId(this.jobInstanceInfo.getJobId()).setJobInstanceId(this.jobInstanceInfo.getJobInstanceId()).setTaskId(masterStartContainerRequest.getTaskId()).setStatus(TaskStatus.FAILED.getValue()).setWorkerId(str2).setTaskName(masterStartContainerRequest.getTaskName()).setWorkerAddr(str3).setTaskName(masterStartContainerRequest.getTaskName()).setSerialNum(masterStartContainerRequest.getSerialNum()).build());
            }
        }
    }

    private void batchHandlePersistence(String str, String str2, List<Worker.MasterStartContainerRequest> list, boolean z) throws Exception {
        long currentTimeMillis = System.currentTimeMillis();
        if (z) {
            ArrayList newArrayList = Lists.newArrayList();
            Iterator<Worker.MasterStartContainerRequest> it = list.iterator();
            while (it.hasNext()) {
                newArrayList.add(Long.valueOf(it.next().getTaskId()));
            }
            this.taskPersistence.updateTaskStatus(this.jobInstanceInfo.getJobInstanceId(), newArrayList, TaskStatus.RUNNING, str, str2);
        } else {
            this.taskPersistence.createTasks(list, str, str2);
        }
        LOGGER.debug("jobInstance={}, batch dispatch db cost:{} ms, size:{}", Long.valueOf(this.jobInstanceInfo.getJobInstanceId()), Long.valueOf(System.currentTimeMillis() - currentTimeMillis), Integer.valueOf(list.size()));
    }

    public void batchDispatchTasks(List<Worker.MasterStartContainerRequest> list) {
        HashMap newHashMap = Maps.newHashMap();
        HashMap newHashMap2 = Maps.newHashMap();
        batchHandlePulledProgress(list, newHashMap, newHashMap2, null);
        for (Map.Entry<String, List<Worker.MasterStartContainerRequest>> entry : newHashMap.entrySet()) {
            batchHandleContainers(entry.getKey(), entry.getValue(), false, TaskDispatchMode.PUSH);
        }
        for (Map.Entry<String, List<Worker.MasterStartContainerRequest>> entry2 : newHashMap2.entrySet()) {
            batchHandleContainers(entry2.getKey(), entry2.getValue(), true, TaskDispatchMode.PUSH);
        }
    }

    protected void batchHandlePulledProgress(List<Worker.MasterStartContainerRequest> list, Map<String, List<Worker.MasterStartContainerRequest>> map, Map<String, List<Worker.MasterStartContainerRequest>> map2, String str) {
        for (Worker.MasterStartContainerRequest masterStartContainerRequest : list) {
            String selectWorker = str == null ? selectWorker() : str;
            if (selectWorker == null) {
                updateNewInstanceStatus(getSerialNum(), InstanceStatus.FAILED, "all worker is down!");
                return;
            }
            String str2 = selectWorker.split("@")[1];
            StreamJobProgressDetail streamJobProgressDetail = this.streamJobProgressMap.get(Long.valueOf(masterStartContainerRequest.getSerialNum()));
            if (!masterStartContainerRequest.getFailover()) {
                if (map.containsKey(selectWorker)) {
                    map.get(selectWorker).add(masterStartContainerRequest);
                } else {
                    map.put(selectWorker, Lists.newArrayList(masterStartContainerRequest));
                }
                streamJobProgressDetail.getTaskProgressCounter().incrementPulled();
                if (masterStartContainerRequest.getSerialNum() > 0) {
                    this.totalCounter.incrementPulled();
                }
            } else if (map2.containsKey(selectWorker)) {
                map2.get(selectWorker).add(masterStartContainerRequest);
            } else {
                map2.put(selectWorker, Lists.newArrayList(masterStartContainerRequest));
            }
            streamJobProgressDetail.setStatus(Integer.valueOf(TaskStatus.PULLED.getValue()));
            Map<String, WorkerProgressCounter> workerProgressMap = streamJobProgressDetail.getWorkerProgressMap();
            if (str2 != null && !workerProgressMap.containsKey(str2)) {
                synchronized (this) {
                    if (!workerProgressMap.containsKey(str2)) {
                        workerProgressMap.put(str2, new WorkerProgressCounter(str2));
                    }
                }
            }
            workerProgressMap.get(str2).incrementTotal();
            workerProgressMap.get(str2).incrementPulled();
        }
    }

    private synchronized String selectWorker() {
        if (this.index < 0 || this.index >= Integer.MAX_VALUE) {
            this.index = 0;
        }
        Router router = this.router;
        long appGroupId = this.jobInstanceInfo.getAppGroupId();
        long jobId = this.jobInstanceInfo.getJobId();
        List<String> allWorkers = this.jobInstanceInfo.getAllWorkers();
        Map<String, List<String>> targetWorkerAddrsMap = this.jobInstanceInfo.getTargetWorkerAddrsMap();
        int i = this.index;
        this.index = i + 1;
        return router.route(appGroupId, jobId, allWorkers, targetWorkerAddrsMap, i, getLocalWorkerIdAddr());
    }

    @Override // com.alibaba.schedulerx.worker.master.TaskMaster
    public synchronized void handleWorkerShutdown(String str, boolean z) {
        this.existInvalidWorker = true;
        this.invalidWorkerSet.add(str);
        if (this.aliveCheckWorkerSet.contains(str)) {
            String[] split = str.split("@");
            String str2 = split[1];
            String str3 = split[0];
            this.aliveCheckWorkerSet.remove(str);
            this.jobInstanceInfo.getAllWorkers().remove(str);
            if (z && this.xAttrs != null && this.xAttrs.isFailover()) {
                int batchUpdateTaskStatus = this.taskPersistence.batchUpdateTaskStatus(this.jobInstanceInfo.getJobInstanceId(), TaskStatus.INIT, str3, str2);
                LOGGER.warn("jobInstanceId={}, failover task number:{}, workerId:{}, workerAddr:{}", Long.valueOf(this.jobInstanceInfo.getJobInstanceId()), Integer.valueOf(batchUpdateTaskStatus), str3, str2);
                if (batchUpdateTaskStatus > 0) {
                    Iterator<StreamJobProgressDetail> it = this.streamJobProgressMap.values().iterator();
                    while (it.hasNext()) {
                        WorkerProgressCounter workerProgressCounter = it.next().getWorkerProgressMap().get(str2);
                        workerProgressCounter.decRunningAndTotal(workerProgressCounter.getRunning() + workerProgressCounter.getPulled());
                    }
                    this.taskDispatchReqHandler.release(batchUpdateTaskStatus);
                    return;
                }
                return;
            }
            int batchUpdateTaskStatus2 = this.taskPersistence.batchUpdateTaskStatus(this.jobInstanceInfo.getJobInstanceId(), TaskStatus.FAILED, str3, str2);
            LOGGER.warn("jobInstanceId={}, worker shutdown, failed task number:{}, workerId:{}, workerAddr:{}", Long.valueOf(this.jobInstanceInfo.getJobInstanceId()), Integer.valueOf(batchUpdateTaskStatus2), str3, str2);
            if (batchUpdateTaskStatus2 > 0) {
                Iterator<StreamJobProgressDetail> it2 = this.streamJobProgressMap.values().iterator();
                while (it2.hasNext()) {
                    WorkerProgressCounter workerProgressCounter2 = it2.next().getWorkerProgressMap().get(str2);
                    workerProgressCounter2.incrementFailed(workerProgressCounter2.getRunning() + workerProgressCounter2.getPulled());
                }
                this.taskDispatchReqHandler.release(batchUpdateTaskStatus2);
            }
        }
    }

    @Override // com.alibaba.schedulerx.worker.master.TaskMaster
    public ProcessResult postFinish(long j) {
        ProcessResult processResult = null;
        try {
            JobContext build = JobContext.newBuilder().setJobId(this.jobInstanceInfo.getJobId()).setJobInstanceId(j).setJobType(this.jobInstanceInfo.getJobType()).setContent(this.jobInstanceInfo.getContent()).setScheduleTime(this.jobInstanceInfo.getScheduleTime()).setDataTime(this.jobInstanceInfo.getDataTime()).setJobParameters(this.jobInstanceInfo.getParameters()).setInstanceParameters(this.jobInstanceInfo.getInstanceParameters()).setUser(this.jobInstanceInfo.getUser()).setSerialNum(getSerialNum()).build();
            JobProcessor javaProcessor = JobProcessorUtil.getJavaProcessor(build.getContent());
            if (javaProcessor instanceof StreamJobProcessor) {
                processResult = ((StreamJobProcessor) javaProcessor).postProcess(build);
            }
            return processResult;
        } catch (Throwable th) {
            LOGGER.info("Stream job post finish failed.", th);
            return new ProcessResult(false, "Stream job post finish failed:" + ExceptionUtil.getFixedErrMsgByThrowable(th, 800));
        }
    }

    @Override // com.alibaba.schedulerx.worker.master.TaskMaster
    public void killInstance(boolean z, String str) {
        super.killInstance(z, str);
        sendKillContainerRequest(z, this.jobInstanceInfo.getAllWorkers());
        updateNewInstanceStatus(getSerialNum(), this.jobInstanceInfo.getJobInstanceId(), InstanceStatus.FAILED, str);
    }
}
