/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.schedulerx.worker.actor;

import akka.actor.UntypedActor;
import com.alibaba.schedulerx.common.domain.InstanceStatus;
import com.alibaba.schedulerx.common.domain.JobInstanceData;
import com.alibaba.schedulerx.common.domain.JobInstanceInfo;
import com.alibaba.schedulerx.common.util.ConfigUtil;
import com.alibaba.schedulerx.common.util.ExceptionUtil;
import com.alibaba.schedulerx.common.util.IdUtil;
import com.alibaba.schedulerx.common.util.ManagementUtil;
import com.alibaba.schedulerx.common.util.ReflectionUtil;
import com.alibaba.schedulerx.protocol.Common;
import com.alibaba.schedulerx.protocol.Server;
import com.alibaba.schedulerx.protocol.Worker;
import com.alibaba.schedulerx.shade.com.google.common.collect.Lists;
import com.alibaba.schedulerx.worker.SchedulerxWorker;
import com.alibaba.schedulerx.worker.container.ThreadContainerPool;
import com.alibaba.schedulerx.worker.domain.JobContext;
import com.alibaba.schedulerx.worker.log.LogFactory;
import com.alibaba.schedulerx.worker.log.Logger;
import com.alibaba.schedulerx.worker.logcollector.ClientLoggerMessage;
import com.alibaba.schedulerx.worker.logcollector.LogCollector;
import com.alibaba.schedulerx.worker.logcollector.LogCollectorFactory;
import com.alibaba.schedulerx.worker.master.TaskMaster;
import com.alibaba.schedulerx.worker.master.TaskMasterFactory;
import com.alibaba.schedulerx.worker.master.TaskMasterPool;
import com.alibaba.schedulerx.worker.processor.CalendarProcessor;
import com.alibaba.schedulerx.worker.processor.JobProcessor;
import com.alibaba.schedulerx.worker.processor.JobProcessorEx;
import com.alibaba.schedulerx.worker.pull.PullManager;
import com.alibaba.schedulerx.worker.service.SyncK8sTaskService;
import com.alibaba.schedulerx.worker.util.JobProcessorUtil;
import com.alibaba.schedulerx.worker.util.SpringContext;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import org.joda.time.DateTime;

public class JobInstanceActor
extends UntypedActor {
    private TaskMasterPool masterPool = TaskMasterPool.INSTANCE;
    private LogCollector logCollector = LogCollectorFactory.get();
    private static final Logger LOGGER = LogFactory.getLogger(JobInstanceActor.class);

    @Override
    public void onReceive(Object obj) throws Throwable {
        if (obj instanceof Server.ServerSubmitJobInstanceRequest) {
            this.handleSubmitJobInstance((Server.ServerSubmitJobInstanceRequest)obj);
        } else if (obj instanceof Server.ServerKillJobInstanceRequest) {
            this.handleKillJobInstance((Server.ServerKillJobInstanceRequest)obj);
        } else if (obj instanceof Server.ServerRetryTasksRequest) {
            this.handleRetryTasks((Server.ServerRetryTasksRequest)obj);
        } else if (obj instanceof Server.ServerKillTaskRequest) {
            this.handleKillTask((Server.ServerKillTaskRequest)obj);
        } else if (obj instanceof Server.ServerCheckTaskMasterRequest) {
            this.handCheckTaskMaster((Server.ServerCheckTaskMasterRequest)obj);
        } else if (obj instanceof Worker.MasterNotifyWorkerPullRequest) {
            this.handleInitPull((Worker.MasterNotifyWorkerPullRequest)obj);
        } else if (obj instanceof Server.ServerThreadDumpRequest) {
            this.handleThreadDump((Server.ServerThreadDumpRequest)obj);
        } else if (obj instanceof Server.ServerSyncK8sJobRequest) {
            this.syncK8sJob((Server.ServerSyncK8sJobRequest)obj);
        } else if (obj instanceof Server.ServerSyncK8sCronJobRequest) {
            this.syncK8sCronJob((Server.ServerSyncK8sCronJobRequest)obj);
        } else if (obj instanceof Server.ServerCallbackCalendarRequest) {
            this.handleCallbackCalendar((Server.ServerCallbackCalendarRequest)obj);
        }
    }

    private void handleSubmitJobInstance(Server.ServerSubmitJobInstanceRequest request) {
        LOGGER.info("handleSubmitJobInstance, jobInstanceId=" + request.getJobInstanceId());
        Server.ServerSubmitJobInstanceResponse response = null;
        if (!SchedulerxWorker.INITED) {
            LOGGER.warn("worker not ready for running.");
            response = Server.ServerSubmitJobInstanceResponse.newBuilder().setSuccess(false).setMessage("WORKER_NOT_RUNNING:worker not ready for running.").build();
            this.getSender().tell(response, this.getSelf());
            return;
        }
        if (this.masterPool.contains(request.getJobInstanceId())) {
            String errMsg = "jobInstanceId=" + request.getJobInstanceId() + " is still running!";
            LOGGER.debug(errMsg);
            this.logCollector.collect(request.getAppGroupId(), IdUtil.getUniqueIdWithoutTask(request.getJobId(), request.getJobInstanceId()), ClientLoggerMessage.appendMessage("server trigger client fail:", errMsg));
            response = Server.ServerSubmitJobInstanceResponse.newBuilder().setSuccess(false).setMessage(errMsg).build();
            this.getSender().tell(response, this.getSelf());
        } else {
            response = Server.ServerSubmitJobInstanceResponse.newBuilder().setSuccess(true).build();
            this.getSender().tell(response, this.getSelf());
            try {
                JobInstanceInfo jobInstanceInfo = this.convet2JobInstanceInfo(request);
                TaskMaster taskMaster = TaskMasterFactory.create(jobInstanceInfo, this.getContext());
                this.masterPool.put(jobInstanceInfo.getJobInstanceId(), taskMaster);
                taskMaster.submitInstance(jobInstanceInfo);
                LOGGER.debug("submit jobInstanceId={} successfully", request.getJobInstanceId());
                this.logCollector.collect(request.getAppGroupId(), IdUtil.getUniqueIdWithoutTask(request.getJobId(), request.getJobInstanceId()), "submit jobInstance success:");
            }
            catch (Throwable e) {
                LOGGER.error("handleSubmitJobInstance error, jobInstanceId={}, ", request.getJobInstanceId(), e);
                this.logCollector.collect(request.getAppGroupId(), IdUtil.getUniqueIdWithoutTask(request.getJobId(), request.getJobInstanceId()), "server trigger client fail:", e);
                Worker.WorkerReportJobInstanceStatusRequest req = Worker.WorkerReportJobInstanceStatusRequest.newBuilder().setJobId(request.getJobId()).setJobInstanceId(request.getJobInstanceId()).setStatus(InstanceStatus.FAILED.getValue()).setResult(ExceptionUtil.getMessage(e)).setGroupId(request.getGroupId()).build();
                SchedulerxWorker.AtLeastDeliveryRoutingActor.tell(req, null);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleKillJobInstance(Server.ServerKillJobInstanceRequest request) {
        LOGGER.info("handleKillJobInstance, jobInstanceId=" + request.getJobInstanceId());
        Server.ServerKillJobInstanceResponse response = null;
        String uniqueId = IdUtil.getUniqueIdWithoutTask(request.getJobId(), request.getJobInstanceId());
        try {
            long jobInstanceId = request.getJobInstanceId();
            if (request.hasDestoryJob() && request.getDestoryJob()) {
                JobContext context = JobContext.newBuilder().setJobType(request.getJobType()).setContent(request.getContent()).setXAttrs(request.getXattrs()).setExecuteMode(request.getExecuteMode()).build();
                JobProcessor jobProcessor = JobProcessorUtil.getJobProcessor(context);
                if (jobProcessor instanceof JobProcessorEx) {
                    ((JobProcessorEx)jobProcessor).kill(context);
                }
            } else if (!this.masterPool.contains(jobInstanceId)) {
                response = Server.ServerKillJobInstanceResponse.newBuilder().setSuccess(false).setMessage(jobInstanceId + " is not exist").build();
                if (request.hasAppGroupId()) {
                    this.logCollector.collect(request.getAppGroupId(), uniqueId, ClientLoggerMessage.appendMessage("server kill instance start fail:", response.getMessage()));
                }
            } else {
                this.masterPool.get(jobInstanceId).killInstance(true, "killed from server");
                response = Server.ServerKillJobInstanceResponse.newBuilder().setSuccess(true).build();
                if (request.hasAppGroupId()) {
                    this.logCollector.collect(request.getAppGroupId(), uniqueId, "server kill instance start success:");
                }
            }
            this.getSender().tell(response, this.getSelf());
        }
        catch (Throwable e) {
            try {
                LOGGER.error("[JobInstanceActor]handleKillJobInstance error, uniqueId:{}", uniqueId, e);
                response = Server.ServerKillJobInstanceResponse.newBuilder().setSuccess(false).setMessage(ExceptionUtil.getMessage(e)).build();
                this.getSender().tell(response, this.getSelf());
            }
            catch (Throwable throwable) {
                this.getSender().tell(response, this.getSelf());
                throw throwable;
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleKillTask(Server.ServerKillTaskRequest request) {
        LOGGER.info("handleKillTask, jobInstanceId=" + request.getJobInstanceId());
        Server.ServerKillTaskResponse response = null;
        try {
            long jobInstanceId = request.getJobInstanceId();
            if (!this.masterPool.contains(jobInstanceId)) {
                response = Server.ServerKillTaskResponse.newBuilder().setSuccess(false).setMessage(jobInstanceId + " is not exist").build();
            } else {
                this.masterPool.get(jobInstanceId).killTask(IdUtil.getUniqueId(request.getJobId(), request.getJobInstanceId(), request.getTaskId()), request.getWorkerId(), request.getWorkerAddr());
                response = Server.ServerKillTaskResponse.newBuilder().setSuccess(true).build();
            }
            this.getSender().tell(response, this.getSelf());
        }
        catch (Throwable e) {
            try {
                LOGGER.error("", e);
                response = Server.ServerKillTaskResponse.newBuilder().setSuccess(false).setMessage(ExceptionUtil.getMessage(e)).build();
                this.getSender().tell(response, this.getSelf());
            }
            catch (Throwable throwable) {
                this.getSender().tell(response, this.getSelf());
                throw throwable;
            }
        }
    }

    private void handleRetryTasks(Server.ServerRetryTasksRequest request) {
        JobInstanceInfo jobInstanceInfo = this.convet2JobInstanceInfo(request);
        Server.ServerRetryTasksResponse response = null;
        TaskMaster taskMaster = this.masterPool.get(jobInstanceInfo.getJobInstanceId());
        if (taskMaster == null) {
            try {
                taskMaster = TaskMasterFactory.create(jobInstanceInfo, this.getContext());
                this.masterPool.put(jobInstanceInfo.getJobInstanceId(), taskMaster);
            }
            catch (Exception e) {
                LOGGER.error("", e);
                response = Server.ServerRetryTasksResponse.newBuilder().setSuccess(false).setMessage(ExceptionUtil.getMessage(e)).build();
            }
        }
        if (taskMaster != null) {
            taskMaster.retryTasks(request.getRetryTaskEntityList());
            response = Server.ServerRetryTasksResponse.newBuilder().setSuccess(true).build();
        }
        this.getSender().tell(response, this.getSelf());
    }

    private void handCheckTaskMaster(Server.ServerCheckTaskMasterRequest request) {
        long jobInstanceId = request.getJobInstanceId();
        Server.ServerCheckTaskMasterResponse response = null;
        response = !this.masterPool.contains(jobInstanceId) ? Server.ServerCheckTaskMasterResponse.newBuilder().setSuccess(false).setMessage("TaskMaster is not existed of jobInstance=" + jobInstanceId).build() : Server.ServerCheckTaskMasterResponse.newBuilder().setSuccess(true).build();
        this.getSender().tell(response, this.getSelf());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleInitPull(Worker.MasterNotifyWorkerPullRequest request) {
        Worker.MasterNotifyWorkerPullResponse response;
        block5: {
            response = null;
            if (SchedulerxWorker.INITED) break block5;
            LOGGER.warn("worker not ready for running.");
            response = Worker.MasterNotifyWorkerPullResponse.newBuilder().setSuccess(false).setMessage("WORKER_NOT_RUNNING:worker not ready for running.").build();
            this.getSender().tell(response, this.getSelf());
            this.getSender().tell(response, this.getSelf());
            return;
        }
        try {
            PullManager.INSTANCE.init(request.getJobInstanceId(), request.getSerialNum(), request.getPageSize(), request.getQueueSize(), request.getConsumerSize(), request.getTaskMasterAkkaPath());
            response = Worker.MasterNotifyWorkerPullResponse.newBuilder().setSuccess(true).build();
            this.getSender().tell(response, this.getSelf());
        }
        catch (Exception e) {
            try {
                LOGGER.error("", e);
                response = Worker.MasterNotifyWorkerPullResponse.newBuilder().setSuccess(false).setMessage(ExceptionUtil.getMessage(e)).build();
                this.getSender().tell(response, this.getSelf());
            }
            catch (Throwable throwable) {
                this.getSender().tell(response, this.getSelf());
                throw throwable;
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleThreadDump(Server.ServerThreadDumpRequest request) {
        Server.ServerThreadDumpResponse response = null;
        try {
            long jobId = request.getJobId();
            long jobInstanceId = request.getJobInstanceId();
            String threadName = ThreadContainerPool.getInstance().genThreadName(jobId, jobInstanceId, null);
            List<String> resultLines = ManagementUtil.getStackTraces(threadName);
            response = Server.ServerThreadDumpResponse.newBuilder().setSuccess(true).addAllLine(resultLines).build();
            this.getSender().tell(response, this.getSelf());
        }
        catch (Exception e) {
            try {
                LOGGER.error("", e);
                response = Server.ServerThreadDumpResponse.newBuilder().setSuccess(false).setMessage(ExceptionUtil.getMessage(e)).build();
                this.getSender().tell(response, this.getSelf());
            }
            catch (Throwable throwable) {
                this.getSender().tell(response, this.getSelf());
                throw throwable;
            }
        }
    }

    private JobInstanceInfo convet2JobInstanceInfo(Server.ServerSubmitJobInstanceRequest request) {
        CopyOnWriteArrayList<String> workers = Lists.newCopyOnWriteArrayList(request.getWorkersList());
        Collections.shuffle(workers);
        JobInstanceInfo.JobInstanceInfoBuilder builder = JobInstanceInfo.newBuilder();
        builder.setJobId(request.getJobId());
        builder.setJobInstanceId(request.getJobInstanceId());
        builder.setExecuteMode(request.getExecuteMode());
        builder.setJobType(request.getJobType());
        builder.setContent(request.getContent());
        builder.setUser(request.getUser());
        builder.setScheduleTime(new DateTime(request.getScheduleTime()));
        builder.setDataTime(new DateTime(request.getDataTime()));
        builder.setAllWorkers(workers);
        builder.setJobConcurrency(request.getJobConcurrency());
        builder.setRegionId(request.getRegionId());
        builder.setAppGroupId(request.getAppGroupId());
        builder.setTimeType(request.hasTimeType() ? request.getTimeType() : 0);
        builder.setTimeExpression(request.hasTimeExpression() ? request.getTimeExpression() : null);
        builder.setGroupId(request.getGroupId());
        builder.setTriggerType(request.getTriggerType());
        if (request.hasParameters()) {
            builder.setParameters(request.getParameters());
        }
        if (request.hasXattrs()) {
            builder.setXattrs(request.getXattrs());
        }
        if (request.hasInstanceParameters()) {
            builder.setInstanceParameters(request.getInstanceParameters());
        }
        List<Common.UpstreamData> upstreamDataList = request.getUpstreamDataList();
        ArrayList<JobInstanceData> upstreamDataPairList = Lists.newArrayList();
        for (Common.UpstreamData upstreamData : upstreamDataList) {
            upstreamDataPairList.add(new JobInstanceData(upstreamData.getJobName(), upstreamData.getData()));
        }
        builder.setUpstreamData(upstreamDataPairList);
        if (request.hasMaxAttempt()) {
            builder.setMaxAttempt(request.getMaxAttempt());
        }
        if (request.hasAttempt()) {
            builder.setAttempt(request.getAttempt());
        }
        if (request.hasWfInstanceId()) {
            builder.setWfInstanceId(request.getWfInstanceId());
        }
        if (request.hasJobName()) {
            builder.setJobName(request.getJobName());
        }
        if (request.hasNamespace()) {
            builder.setNamespace(request.getNamespace());
        }
        if (request.hasRouteStrategyType()) {
            builder.setRouteStrategyType(request.getRouteStrategyType());
        }
        if (request.hasRouteStrategyContent()) {
            builder.setRouteStrategyContent(request.getRouteStrategyContent());
        }
        List<Common.LabelWorkerAddrsEntry> labelWorkerAddrsEntryList = request.getLabelWorkerAddrsMapList();
        HashMap<String, List<String>> labelWorkerAddrsMap = new HashMap<String, List<String>>();
        for (Common.LabelWorkerAddrsEntry entry : labelWorkerAddrsEntryList) {
            labelWorkerAddrsMap.put(entry.getLabel(), entry.getWorkerAddrsList());
        }
        builder.setTargetWorkerAddrsMap(labelWorkerAddrsMap);
        builder.setTemplate(request.getTemplate());
        if (request.hasWorkflowId()) {
            builder.setWorkflowId(request.getWorkflowId());
        }
        return builder.build();
    }

    private JobInstanceInfo convet2JobInstanceInfo(Server.ServerRetryTasksRequest request) {
        CopyOnWriteArrayList<String> workers = Lists.newCopyOnWriteArrayList(request.getWorkersList());
        Collections.shuffle(workers);
        JobInstanceInfo.JobInstanceInfoBuilder builder = JobInstanceInfo.newBuilder();
        builder.setJobId(request.getJobId());
        builder.setJobInstanceId(request.getJobInstanceId());
        builder.setExecuteMode(request.getExecuteMode());
        builder.setJobType(request.getJobType());
        builder.setContent(request.getContent());
        builder.setUser(request.getUser());
        builder.setScheduleTime(new DateTime(request.getScheduleTime()));
        builder.setDataTime(new DateTime(request.getDataTime()));
        builder.setAllWorkers(workers);
        builder.setJobConcurrency(request.getJobConcurrency());
        builder.setRegionId(request.getRegionId());
        builder.setAppGroupId(request.getAppGroupId());
        builder.setTimeType(request.hasTimeType() ? request.getTimeType() : 0);
        builder.setTimeExpression(request.hasTimeExpression() ? request.getTimeExpression() : null);
        builder.setGroupId(request.getGroupId());
        if (request.hasParameters()) {
            builder.setParameters(request.getParameters());
        }
        if (request.hasXattrs()) {
            builder.setXattrs(request.getXattrs());
        }
        if (request.hasInstanceParameters()) {
            builder.setInstanceParameters(request.getInstanceParameters());
        }
        List<Common.UpstreamData> upstreamDataList = request.getUpstreamDataList();
        ArrayList<JobInstanceData> upstreamDataPairList = Lists.newArrayList();
        for (Common.UpstreamData upstreamData : upstreamDataList) {
            upstreamDataPairList.add(new JobInstanceData(upstreamData.getJobName(), upstreamData.getData()));
        }
        builder.setUpstreamData(upstreamDataPairList);
        if (request.hasMaxAttempt()) {
            builder.setMaxAttempt(request.getMaxAttempt());
        }
        if (request.hasAttempt()) {
            builder.setAttempt(request.getAttempt());
        }
        if (request.hasWfInstanceId()) {
            builder.setWfInstanceId(request.getWfInstanceId());
        }
        if (request.hasJobName()) {
            builder.setJobName(request.getJobName());
        }
        return builder.build();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void syncK8sJob(Server.ServerSyncK8sJobRequest obj) throws Throwable {
        Server.ServerSyncK8sJobResponse response = null;
        try {
            String className = ConfigUtil.getWorkerConfig().getString("sync.k8s.task.service.impl");
            if (className == null || className.length() == 0) {
                response = Server.ServerSyncK8sJobResponse.newBuilder().setSuccess(false).setMessage("schedulerx2-plugin-kubernetes needs to be upgraded to above 1.0.3").build();
                this.getSender().tell(response, this.getSelf());
            } else {
                SyncK8sTaskService syncK8sTaskService = (SyncK8sTaskService)Class.forName(className).newInstance();
                response = syncK8sTaskService.syncK8sJob(obj);
            }
            this.getSender().tell(response, this.getSelf());
        }
        catch (Exception e) {
            try {
                LOGGER.error("sync k8s  job error", e);
                response = Server.ServerSyncK8sJobResponse.newBuilder().setSuccess(false).setMessage(ExceptionUtil.getMessage(e)).build();
                this.getSender().tell(response, this.getSelf());
            }
            catch (Throwable throwable) {
                this.getSender().tell(response, this.getSelf());
                throw throwable;
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void syncK8sCronJob(Server.ServerSyncK8sCronJobRequest obj) throws Throwable {
        Server.ServerSyncK8sCronJobResponse response = null;
        try {
            String className = ConfigUtil.getWorkerConfig().getString("sync.k8s.task.service.impl");
            if (className == null || className.length() == 0) {
                response = Server.ServerSyncK8sCronJobResponse.newBuilder().setSuccess(false).setMessage("schedulerx2-plugin-kubernetes needs to be upgraded to above 1.0.3").build();
            } else {
                SyncK8sTaskService syncK8sTaskService = (SyncK8sTaskService)Class.forName(className).newInstance();
                response = syncK8sTaskService.syncK8sCronJob(obj);
            }
            this.getSender().tell(response, this.getSelf());
        }
        catch (Exception e) {
            try {
                LOGGER.error("sync k8s cron job error", e);
                response = Server.ServerSyncK8sCronJobResponse.newBuilder().setSuccess(false).setMessage(ExceptionUtil.getMessage(e)).build();
                this.getSender().tell(response, this.getSelf());
            }
            catch (Throwable throwable) {
                this.getSender().tell(response, this.getSelf());
                throw throwable;
            }
        }
    }

    private void handleCallbackCalendar(Server.ServerCallbackCalendarRequest request) {
        Server.ServerCallbackCalendarResponse response;
        String className = request.getProcessor();
        CalendarProcessor processor = null;
        try {
            if (SpringContext.context != null) {
                processor = (CalendarProcessor)SpringContext.getBean(className, SchedulerxWorker.CUSTOMER_CLASS_LOADER);
            }
            if (processor == null) {
                processor = (CalendarProcessor)ReflectionUtil.getInstanceByClassName(className, SchedulerxWorker.CUSTOMER_CLASS_LOADER);
            }
            if (processor != null) {
                boolean result2 = processor.isTimeIncluded(request.getTimestamp());
                response = Server.ServerCallbackCalendarResponse.newBuilder().setSuccess(true).setResult(result2).build();
            } else {
                response = Server.ServerCallbackCalendarResponse.newBuilder().setSuccess(false).setMessage("can't found className=" + className).build();
            }
        }
        catch (Exception e) {
            response = Server.ServerCallbackCalendarResponse.newBuilder().setSuccess(false).setMessage(ExceptionUtil.getMessage(e)).build();
        }
        this.getSender().tell(response, this.getSelf());
    }
}

