/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.schedulerx.worker.master;

import akka.actor.ActorContext;
import akka.actor.ActorSelection;
import com.alibaba.schedulerx.common.domain.InstanceStatus;
import com.alibaba.schedulerx.common.domain.JobInstanceInfo;
import com.alibaba.schedulerx.common.domain.MapTaskProgress;
import com.alibaba.schedulerx.common.domain.TaskStatus;
import com.alibaba.schedulerx.common.domain.TimeType;
import com.alibaba.schedulerx.common.domain.WorkerProgressCounter;
import com.alibaba.schedulerx.common.util.ConfigUtil;
import com.alibaba.schedulerx.common.util.ExceptionUtil;
import com.alibaba.schedulerx.common.util.IdUtil;
import com.alibaba.schedulerx.common.util.JobUtil;
import com.alibaba.schedulerx.common.util.JsonUtil;
import com.alibaba.schedulerx.protocol.Worker;
import com.alibaba.schedulerx.protocol.utils.FutureUtils;
import com.alibaba.schedulerx.shade.com.google.common.collect.Lists;
import com.alibaba.schedulerx.shade.com.google.common.collect.Maps;
import com.alibaba.schedulerx.shade.org.apache.commons.lang.StringUtils;
import com.alibaba.schedulerx.worker.SchedulerxWorker;
import com.alibaba.schedulerx.worker.domain.JavaProcessorProfile;
import com.alibaba.schedulerx.worker.domain.JobContext;
import com.alibaba.schedulerx.worker.log.LogFactory;
import com.alibaba.schedulerx.worker.log.Logger;
import com.alibaba.schedulerx.worker.logcollector.ClientLoggerMessage;
import com.alibaba.schedulerx.worker.logcollector.LogCollector;
import com.alibaba.schedulerx.worker.logcollector.LogCollectorFactory;
import com.alibaba.schedulerx.worker.master.TaskMaster;
import com.alibaba.schedulerx.worker.processor.JobProcessor;
import com.alibaba.schedulerx.worker.processor.JobProcessorEx;
import com.alibaba.schedulerx.worker.processor.MapJobProcessor;
import com.alibaba.schedulerx.worker.processor.ProcessResult;
import com.alibaba.schedulerx.worker.util.ActorPathUtil;
import com.alibaba.schedulerx.worker.util.JobProcessorUtil;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;

public class BroadcastTaskMaster
extends TaskMaster {
    private static final Logger LOGGER = LogFactory.getLogger(BroadcastTaskMaster.class);
    private Map<String, String> worker2uniqueIdMap = Maps.newConcurrentMap();
    private Map<String, WorkerProgressCounter> workerProgressMap = Maps.newConcurrentMap();
    private LogCollector logCollector = LogCollectorFactory.get();
    private volatile boolean running = false;
    private volatile boolean monitor = false;
    private Map<Long, String> taskIdResultMap = Maps.newHashMap();
    private Map<Long, TaskStatus> taskIdStatusMap = Maps.newHashMap();
    private static ThreadPoolExecutor dispatchThreadPool;
    private List<String> allWorkers = Lists.newArrayList();

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    public BroadcastTaskMaster(JobInstanceInfo jobInstanceInfo, ActorContext actorContext) throws Exception {
        super(jobInstanceInfo, actorContext);
        Boolean enableDispatchThreadPool = ConfigUtil.getWorkerConfig().getBoolean("broadcast.dispatch.thread.enable", false);
        if (!enableDispatchThreadPool.booleanValue() || dispatchThreadPool != null) return;
        Class<BroadcastTaskMaster> clazz = BroadcastTaskMaster.class;
        synchronized (BroadcastTaskMaster.class) {
            if (dispatchThreadPool != null) return;
            int threadNum = ConfigUtil.getWorkerConfig().getInt("broadcast.dispatch.thread.num", 4);
            dispatchThreadPool = new ThreadPoolExecutor(threadNum, threadNum * 2, 30L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(1024), new ThreadFactory(){
                private final AtomicInteger nextId = new AtomicInteger(1);
                private final String namePrefix = "Schedulerx-BroadcastTaskMaster-dispatch-thread-";

                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, "Schedulerx-BroadcastTaskMaster-dispatch-thread-" + this.nextId.getAndIncrement());
                }
            }, new ThreadPoolExecutor.CallerRunsPolicy());
            // ** MonitorExit[var4_4] (shouldn't be in output)
            return;
        }
    }

    @Override
    public synchronized void submitInstance(final JobInstanceInfo info) {
        if ("java".equalsIgnoreCase(info.getJobType())) {
            try {
                this.preProcess(info);
            }
            catch (Exception e) {
                LOGGER.error("BroadcastTaskMaster.preProcess failed, jobInstanceId={}", info.getJobInstanceId(), e);
                String uniqueId = IdUtil.getUniqueId(info.getJobId(), info.getJobInstanceId(), 0L);
                this.logCollector.collect(info.getAppGroupId(), uniqueId, ClientLoggerMessage.appendMessage("broadcast taskMaster init fail worker addr is ", SchedulerxWorker.WORKER_ADDR, ExceptionUtil.getMessage(e)));
                this.updateNewInstanceStatus(this.getSerialNum(), this.jobInstanceInfo.getJobInstanceId(), InstanceStatus.FAILED, "Preprocess failed. " + e.getMessage());
                return;
            }
        }
        super.init();
        this.allWorkers = info.getAllWorkers();
        boolean enable = ConfigUtil.getWorkerConfig().getBoolean("broadcast.master.exec.enable", true);
        if (!enable) {
            this.allWorkers.remove(this.getLocalWorkerIdAddr());
        }
        final HashMap<String, Long> taskIdMap = new HashMap<String, Long>();
        Collections.sort(this.allWorkers);
        for (final String workerIdAddr : this.allWorkers) {
            String[] workerInfo = workerIdAddr.split("@");
            String workerAddr = workerInfo[1];
            long taskId = this.aquireTaskId();
            String uniqueId = IdUtil.getUniqueId(info.getJobId(), info.getJobInstanceId(), taskId);
            this.taskStatusMap.put(uniqueId, TaskStatus.INIT);
            if (!this.workerProgressMap.containsKey(workerAddr)) {
                WorkerProgressCounter workerProgressCounter = new WorkerProgressCounter(workerAddr);
                this.workerProgressMap.put(workerAddr, workerProgressCounter);
            }
            this.workerProgressMap.get(workerAddr).incrementTotal();
            taskIdMap.put(workerIdAddr, taskId);
        }
        for (final String workerIdAddr : this.allWorkers) {
            if (dispatchThreadPool != null) {
                dispatchThreadPool.execute(new Runnable(){

                    @Override
                    public void run() {
                        BroadcastTaskMaster.this.dispatchTask(info, workerIdAddr, taskIdMap);
                    }
                });
                continue;
            }
            this.dispatchTask(info, workerIdAddr, taskIdMap);
        }
        this.startMonitorThreads();
    }

    private void dispatchTask(JobInstanceInfo info, String workerIdAddr, Map<String, Long> taskIdMap) {
        String[] workerInfo = workerIdAddr.split("@");
        String workerAddr = workerInfo[1];
        String workerId = workerInfo[0];
        ActorSelection selection = this.getActorContext().actorSelection(ActorPathUtil.getContainerRouterPath(workerIdAddr));
        long taskId = taskIdMap.get(workerIdAddr);
        String uniqueId = IdUtil.getUniqueId(info.getJobId(), info.getJobInstanceId(), taskId);
        Worker.MasterStartContainerRequest.Builder builder = this.convert2StartContainerRequestBuilder(info, taskId);
        builder.setShardingNum(this.allWorkers.size());
        Worker.MasterStartContainerRequest request = builder.build();
        this.taskIdStatusMap.put(taskId, TaskStatus.INIT);
        int retryTimes = 0;
        String result2 = "";
        int maxRetryTimes = ConfigUtil.getWorkerConfig().getInt("broadcast.dispatch.retry.times", 1);
        while (retryTimes++ < maxRetryTimes) {
            try {
                Worker.MasterStartContainerResponse response = (Worker.MasterStartContainerResponse)FutureUtils.awaitResult(selection, (Object)request, 5L);
                if (response.getSuccess()) {
                    this.worker2uniqueIdMap.put(workerIdAddr, uniqueId);
                    this.logCollector.collect(info.getAppGroupId(), uniqueId, ClientLoggerMessage.appendMessage("broadcast taskMaster init success worker addr is ", workerAddr));
                    return;
                }
                result2 = response.getMessage();
                LOGGER.error("submitTask[{}] serialNum={} to worker error, {}", uniqueId, this.getSerialNum(), workerAddr, response.getMessage());
                this.logCollector.collect(info.getAppGroupId(), uniqueId, ClientLoggerMessage.appendMessage("broadcast taskMaster init fail worker addr is ", workerAddr, response.getMessage()));
                TimeUnit.SECONDS.sleep(2L);
            }
            catch (Throwable e) {
                result2 = e.getMessage();
                LOGGER.error("start container failed, worker:{}, uniqueId:{}, serialNum={}", workerAddr, this.getSerialNum(), uniqueId, e);
                this.logCollector.collect(info.getAppGroupId(), uniqueId, ClientLoggerMessage.appendMessage("broadcast taskMaster init fail worker addr is ", workerAddr), e);
            }
        }
        this.existInvalidWorker = true;
        Worker.ContainerReportTaskStatusRequest faileRequest = Worker.ContainerReportTaskStatusRequest.newBuilder().setJobId(info.getJobId()).setJobInstanceId(info.getJobInstanceId()).setTaskId(taskId).setStatus(TaskStatus.FAILED.getValue()).setResult(result2).setWorkerId(workerId).setWorkerAddr(workerAddr).setSerialNum(this.getSerialNum()).build();
        this.updateTaskStatus(faileRequest);
    }

    @Override
    public void killInstance(boolean mayInterruptIfRunning, String reason) {
        super.killInstance(mayInterruptIfRunning, reason);
        this.sendKillContainerRequest(mayInterruptIfRunning, this.allWorkers);
        this.updateNewInstanceStatus(this.getSerialNum(), this.jobInstanceInfo.getJobInstanceId(), InstanceStatus.FAILED, reason);
        this.taskStatusMap.clear();
    }

    @Override
    public void destroyContainerPool() {
        for (String workerIdAddr : this.allWorkers) {
            Worker.MasterDestroyContainerPoolRequest request = Worker.MasterDestroyContainerPoolRequest.newBuilder().setJobInstanceId(this.jobInstanceInfo.getJobInstanceId()).setJobId(this.jobInstanceInfo.getJobId()).setWorkerIdAddr(workerIdAddr).setSerialNum(this.getSerialNum()).build();
            SchedulerxWorker.AtLeastDeliveryRoutingActor.tell(request, null);
        }
    }

    @Override
    public synchronized void updateTaskStatus(Worker.ContainerReportTaskStatusRequest request) {
        if (request.getSerialNum() != this.getSerialNum()) {
            LOGGER.warn("ignore ContainerReportTaskStatusRequest, current serialNum={}, but request serialNum={}.", this.getSerialNum(), request.getSerialNum());
            return;
        }
        long jobId = request.getJobId();
        long jobInstanceId = request.getJobInstanceId();
        long taskId = request.getTaskId();
        String workerAddr = request.getWorkerAddr();
        TaskStatus taskStatus = TaskStatus.parseValue(request.getStatus());
        String uniqueId = IdUtil.getUniqueId(jobId, jobInstanceId, taskId);
        LOGGER.info("update task status serialNum={}, uniqueId={}, status={}, workerAddr={}", request.getSerialNum(), uniqueId, taskStatus.getDescription(), workerAddr);
        if (this.taskStatusMap.containsKey(uniqueId)) {
            if (((TaskStatus)this.taskStatusMap.get(uniqueId)).equals(taskStatus)) {
                LOGGER.warn("duplicated ContainerReportTaskStatusRequest, uniqueId={}, taskStatus={}", uniqueId, taskStatus);
            } else {
                if (taskStatus.equals(TaskStatus.SUCCESS)) {
                    this.taskStatusMap.remove(uniqueId);
                } else {
                    this.taskStatusMap.put(uniqueId, taskStatus);
                }
                if (!this.workerProgressMap.containsKey(workerAddr)) {
                    WorkerProgressCounter workerProgressCounter = new WorkerProgressCounter(workerAddr);
                    this.workerProgressMap.put(workerAddr, workerProgressCounter);
                }
                if (taskStatus.equals(TaskStatus.RUNNING)) {
                    this.workerProgressMap.get(workerAddr).incrementRunning();
                } else if (taskStatus.equals(TaskStatus.SUCCESS)) {
                    this.workerProgressMap.get(workerAddr).incrementSuccess();
                } else if (taskStatus.equals(TaskStatus.FAILED)) {
                    this.workerProgressMap.get(workerAddr).incrementFailed();
                }
                if (StringUtils.isNotBlank(request.getTraceId())) {
                    this.workerProgressMap.get(workerAddr).setTraceId(request.getTraceId());
                }
                this.taskIdResultMap.put(request.getTaskId(), request.getResult());
                this.taskIdStatusMap.put(request.getTaskId(), taskStatus);
                this.updateNewInstanceStatus(request.getSerialNum(), jobInstanceId, request.getResult());
            }
        }
    }

    private synchronized void updateNewInstanceStatus(long serialNum, long jobInstanceId, String result2) {
        InstanceStatus newStatus;
        InstanceStatus instanceStatus = newStatus = this.killed ? InstanceStatus.FAILED : InstanceStatus.SUCCESS;
        if (this.taskStatusMap.size() > 0) {
            if (!this.isJobInstanceFinished()) {
                newStatus = InstanceStatus.RUNNING;
            } else {
                newStatus = InstanceStatus.SUCCESS;
                for (TaskStatus status : this.taskStatusMap.values()) {
                    if (!status.equals(TaskStatus.FAILED)) continue;
                    newStatus = InstanceStatus.FAILED;
                    break;
                }
            }
        }
        LOGGER.info("update serialNum={}, jobInstanceId={} status={}", serialNum, jobInstanceId, newStatus.getDescription());
        this.updateNewInstanceStatus(serialNum, jobInstanceId, newStatus, result2);
    }

    @Override
    public String getJobInstanceProgress() {
        MapTaskProgress detail = new MapTaskProgress();
        detail.setWorkerProgress(this.workerProgressMap.values());
        return JsonUtil.toJson(detail);
    }

    private synchronized void startMonitorThreads() {
        this.monitor = true;
        if (this.running) {
            return;
        }
        final String jobIdAndInstanceId = this.jobInstanceInfo.getJobId() + "_" + this.jobInstanceInfo.getJobInstanceId();
        final BroadcastTaskMaster taskMaster = this;
        new Thread(new Runnable(){

            @Override
            public void run() {
                while (!BroadcastTaskMaster.this.isFinished()) {
                    if (BroadcastTaskMaster.this.monitor) {
                        BroadcastTaskMaster.this.aliveCheckWorkerSet.addAll(BroadcastTaskMaster.this.worker2uniqueIdMap.keySet());
                        for (String workerIdAddr : BroadcastTaskMaster.this.aliveCheckWorkerSet) {
                            try {
                                ActorSelection selection = BroadcastTaskMaster.this.getActorContext().actorSelection(ActorPathUtil.getWorkerHeartbeatRouterPath(workerIdAddr));
                                Worker.MasterCheckWorkerAliveRequest request = Worker.MasterCheckWorkerAliveRequest.newBuilder().setJobInstanceId(BroadcastTaskMaster.this.jobInstanceInfo.getJobInstanceId()).build();
                                FutureUtils.awaitResult(selection, (Object)request, 5L);
                            }
                            catch (TimeoutException e) {
                                BroadcastTaskMaster.this.handleWorkerShutdown(workerIdAddr, true);
                            }
                            catch (Throwable e) {
                                LOGGER.error("check worker alive failed.", e);
                            }
                        }
                    }
                    try {
                        Thread.sleep(5000L);
                    }
                    catch (InterruptedException e) {
                        LOGGER.error("", e);
                        break;
                    }
                }
            }
        }, "Schedulerx-BroadcastTaskMaster-check-worker-alive-thread-" + this.jobInstanceInfo.getJobId() + "_" + this.jobInstanceInfo.getJobInstanceId()).start();
        if (!JobUtil.isSecondTypeJob(TimeType.parseValue(this.jobInstanceInfo.getTimeType()))) {
            new Thread(new Runnable(){

                @Override
                public void run() {
                    while (!BroadcastTaskMaster.this.isFinished()) {
                        Worker.WorkerReportJobInstanceProgressRequest request = Worker.WorkerReportJobInstanceProgressRequest.newBuilder().setJobId(BroadcastTaskMaster.this.jobInstanceInfo.getJobId()).setJobInstanceId(BroadcastTaskMaster.this.jobInstanceInfo.getJobInstanceId()).setProgress(BroadcastTaskMaster.this.getJobInstanceProgress()).build();
                        BroadcastTaskMaster.this.SERVER_DISCOVERY.getMapMasterRouter().tell(request, null);
                        try {
                            Thread.sleep(5000L);
                        }
                        catch (InterruptedException e) {
                            LOGGER.error("report status error, uniqueId={}", jobIdAndInstanceId, e);
                            break;
                        }
                    }
                }
            }, "Schedulerx-BroadcastTaskMaster-report-progress-thread-" + jobIdAndInstanceId).start();
        }
        new Thread(new Runnable(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                while (!BroadcastTaskMaster.this.isFinished()) {
                    try {
                        Thread.sleep(5000L);
                        BroadcastTaskMaster broadcastTaskMaster = taskMaster;
                        synchronized (broadcastTaskMaster) {
                            if (!BroadcastTaskMaster.this.monitor) {
                                continue;
                            }
                            if (BroadcastTaskMaster.this.taskStatusMap.size() < 10) {
                                LOGGER.info("taskStatusMap=" + BroadcastTaskMaster.this.taskStatusMap);
                            }
                            BroadcastTaskMaster.this.updateNewInstanceStatus(BroadcastTaskMaster.this.getSerialNum(), BroadcastTaskMaster.this.jobInstanceInfo.getJobInstanceId(), "");
                        }
                    }
                    catch (Throwable e) {
                        LOGGER.error("status check error, uniqueId:{}", jobIdAndInstanceId, e);
                    }
                }
            }
        }, "Schedulerx-BroadcastTaskMaster-status-check-thread-" + jobIdAndInstanceId).start();
        this.running = true;
    }

    public Map<String, WorkerProgressCounter> getWorkerProgressMap() {
        return this.workerProgressMap;
    }

    @Override
    protected void checkProcessor() throws Exception {
        JavaProcessorProfile profile;
        if ("java".equalsIgnoreCase(this.jobInstanceInfo.getJobType()) && JobProcessorUtil.checkJavaProcessor((profile = JsonUtil.fromJson(this.jobInstanceInfo.getContent(), JavaProcessorProfile.class)).getClassName(), MapJobProcessor.class).booleanValue()) {
            throw new IOException(profile.getClassName() + " shouldn't extends MapJobProcessor or MapReduceJobProcessor");
        }
    }

    @Override
    public ProcessResult postFinish(long jobInstanceId) {
        ProcessResult postResult = new ProcessResult(true);
        if ("java".equalsIgnoreCase(this.jobInstanceInfo.getJobType())) {
            try {
                JobContext context = JobContext.newBuilder().setJobId(this.jobInstanceInfo.getJobId()).setJobInstanceId(jobInstanceId).setJobType(this.jobInstanceInfo.getJobType()).setContent(this.jobInstanceInfo.getContent()).setScheduleTime(this.jobInstanceInfo.getScheduleTime()).setDataTime(this.jobInstanceInfo.getDataTime()).setJobParameters(this.jobInstanceInfo.getParameters()).setInstanceParameters(this.jobInstanceInfo.getInstanceParameters()).setUser(this.jobInstanceInfo.getUser()).setTaskResults(this.taskIdResultMap).setTaskStatuses(this.taskIdStatusMap).setSerialNum(this.getSerialNum()).build();
                JobProcessor jobProcessor = JobProcessorUtil.getJavaProcessor(context.getContent());
                if (jobProcessor instanceof JobProcessorEx) {
                    postResult = ((JobProcessorEx)jobProcessor).postProcess(context);
                }
            }
            catch (Throwable e) {
                LOGGER.error("", e);
            }
        }
        return postResult;
    }

    private void preProcess(JobInstanceInfo jobInstanceInfo) throws Exception {
        JobContext context = JobContext.newBuilder().setJobId(jobInstanceInfo.getJobId()).setJobInstanceId(jobInstanceInfo.getJobInstanceId()).setJobType(jobInstanceInfo.getJobType()).setContent(jobInstanceInfo.getContent()).setScheduleTime(jobInstanceInfo.getScheduleTime()).setDataTime(jobInstanceInfo.getDataTime()).setJobParameters(jobInstanceInfo.getParameters()).setInstanceParameters(jobInstanceInfo.getInstanceParameters()).setUser(jobInstanceInfo.getUser()).setSerialNum(this.getSerialNum()).build();
        JobProcessor jobProcessor = JobProcessorUtil.getJavaProcessor(context.getContent());
        if (jobProcessor instanceof JobProcessorEx) {
            ((JobProcessorEx)jobProcessor).preProcess(context);
        }
    }

    @Override
    public void clear() {
        super.clear();
        this.worker2uniqueIdMap.clear();
        this.workerProgressMap.clear();
        this.monitor = false;
        if (this.taskIdResultMap != null) {
            this.taskIdResultMap.clear();
        }
        if (this.taskIdStatusMap != null) {
            this.taskIdStatusMap.clear();
        }
    }

    @Override
    public void handleWorkerShutdown(String workerIdAddr, boolean withFailover) {
        this.existInvalidWorker = true;
        String uniqueId = this.worker2uniqueIdMap.get(workerIdAddr);
        if (uniqueId != null) {
            String[] workerInfo = workerIdAddr.split("@");
            String workerAddr = workerInfo[1];
            String workerId = workerInfo[0];
            String[] tokens = uniqueId.split("_");
            Worker.ContainerReportTaskStatusRequest request = Worker.ContainerReportTaskStatusRequest.newBuilder().setJobId(Long.valueOf(tokens[0])).setJobInstanceId(Long.valueOf(tokens[1])).setTaskId(Long.valueOf(tokens[2])).setStatus(TaskStatus.FAILED.getValue()).setWorkerAddr(workerAddr).setWorkerId(workerId).setSerialNum(this.getSerialNum()).build();
            this.updateTaskStatus(request);
            LOGGER.warn("worker[{}] is down, set {} to failed", workerAddr, uniqueId);
        } else {
            LOGGER.error("can't found workerAddr of uniqueId={}", uniqueId);
        }
    }
}

