package com.alibaba.schedulerx.worker.container;

import akka.actor.ActorSelection;
import akka.actor.Address;
import com.alibaba.schedulerx.common.constants.CommonConstants;
import com.alibaba.schedulerx.common.domain.InstanceStatus;
import com.alibaba.schedulerx.common.util.ExceptionUtil;
import com.alibaba.schedulerx.protocol.Worker;
import com.alibaba.schedulerx.shade.org.apache.commons.collections.CollectionUtils;
import com.alibaba.schedulerx.shade.org.apache.commons.lang.StringUtils;
import com.alibaba.schedulerx.worker.SchedulerxWorker;
import com.alibaba.schedulerx.worker.batch.ContainerStatusReqHandlerPool;
import com.alibaba.schedulerx.worker.domain.JobContext;
import com.alibaba.schedulerx.worker.domain.WorkerConstants;
import com.alibaba.schedulerx.worker.listener.ListenerServiceLoader;
import com.alibaba.schedulerx.worker.listener.ThreadContainerListener;
import com.alibaba.schedulerx.worker.log.LogFactory;
import com.alibaba.schedulerx.worker.log.Logger;
import com.alibaba.schedulerx.worker.logcollector.ClientLoggerMessage;
import com.alibaba.schedulerx.worker.logcollector.LogCollector;
import com.alibaba.schedulerx.worker.logcollector.LogCollectorFactory;
import com.alibaba.schedulerx.worker.processor.JobProcessor;
import com.alibaba.schedulerx.worker.processor.JobProcessorEx;
import com.alibaba.schedulerx.worker.processor.MapJobProcessor;
import com.alibaba.schedulerx.worker.processor.ProcessResult;
import com.alibaba.schedulerx.worker.util.JobProcessorUtil;
import com.alibaba.schedulerx.worker.util.WorkerConfigUtil;
import com.alibaba.schedulerx.worker.util.WorkerIdGenerator;
import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.Future;

/* loaded from: input_file:com/alibaba/schedulerx/worker/container/ThreadContainer.class */
public class ThreadContainer implements MarkedRunnable<Long>, Container {
    private JobContext context;
    private JobProcessor jobProcessor;
    protected ContainerPool containerPool;
    protected ActorSelection masterActorSelection;
    private static final int RESULT_SIZE_MAX = 1000;
    private LogCollector logCollector = LogCollectorFactory.get();
    private static final Logger LOGGER = LogFactory.getLogger(ThreadContainer.class);
    private Future future;

    public ThreadContainer() {
    }

    public ThreadContainer(JobContext jobContext, ContainerPool containerPool) throws Exception {
        this.context = jobContext;
        this.containerPool = containerPool;
        this.masterActorSelection = SchedulerxWorker.actorSystem.actorSelection(jobContext.getInstanceMasterActorPath());
        if (this.masterActorSelection == null) {
            String str = "get taskMaster akka path error, path=" + jobContext.getInstanceMasterActorPath();
            LOGGER.error(str);
            throw new IOException(str);
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            start();
        } catch (Throwable th) {
            LOGGER.error("start processor thread fail uniqueId={}, serialNum={}", this.context.getUniqueId(), Long.valueOf(this.context.getSerialNum()), th);
            this.containerPool.remove(this.context.getUniqueId());
            Address defaultAddress = SchedulerxWorker.actorSystem.provider().getDefaultAddress();
            reportTaskStatus(new ProcessResult(false, "start processor thread failed, uniqueId=" + this.context.getUniqueId()), defaultAddress.host().get() + CommonConstants.ADDRESS_SEPARATOR + defaultAddress.port().get());
            this.containerPool.removeContext();
        }
    }

    public void executeBeforeListener(JobContext jobContext) {
        Collection<ThreadContainerListener> listeners = ListenerServiceLoader.INSTANCE.getListeners(ThreadContainerListener.class);
        if (CollectionUtils.isNotEmpty(listeners)) {
            for (ThreadContainerListener threadContainerListener : listeners) {
                try {
                    threadContainerListener.before(jobContext);
                } catch (Throwable th) {
                    LOGGER.warn("ThreadContainerListener<{}> before exec failed.", threadContainerListener.getClass().getSimpleName(), th);
                }
            }
        }
    }

    public void executeAfterListener(JobContext jobContext, ProcessResult processResult) {
        try {
            Collection<ThreadContainerListener> listeners = ListenerServiceLoader.INSTANCE.getListeners(ThreadContainerListener.class);
            if (CollectionUtils.isNotEmpty(listeners)) {
                for (ThreadContainerListener threadContainerListener : listeners) {
                    try {
                        threadContainerListener.after(jobContext, processResult);
                    } catch (Throwable th) {
                        LOGGER.warn("ThreadContainerListener<{}> after exec failed.", threadContainerListener.getClass().getSimpleName(), th);
                    }
                }
            }
        } catch (Throwable th2) {
            LOGGER.warn("ThreadContainerListener after exec failed. ", th2);
        }
    }

    @Override // com.alibaba.schedulerx.worker.container.Container
    public void start() {
        ProcessResult processResult;
        Thread.currentThread().setName(ThreadContainerPool.getInstance().genThreadName(Long.valueOf(this.context.getJobId()), Long.valueOf(this.context.getJobInstanceId()), Long.valueOf(this.context.getTaskId())));
        executeBeforeListener(this.context);
        this.containerPool.setContext(this.context);
        long currentTimeMillis = System.currentTimeMillis();
        LOGGER.debug("start run container, uniqueId={}, cost={}ms", this.context.getUniqueId(), Long.valueOf(currentTimeMillis - this.context.getScheduleTime().getMillis()));
        new ProcessResult(false);
        Address defaultAddress = SchedulerxWorker.actorSystem.provider().getDefaultAddress();
        String str = defaultAddress.host().get() + CommonConstants.ADDRESS_SEPARATOR + defaultAddress.port().get();
        String uniqueId = this.context.getUniqueId();
        try {
            if (this.context.getTaskAttempt() == 0) {
                reportTaskStatus(new ProcessResult(InstanceStatus.RUNNING), str);
            }
            this.jobProcessor = JobProcessorUtil.getJobProcessor(this.context);
            if (this.jobProcessor != null) {
                try {
                    if (WorkerConstants.WORKER_STARTER_MODE_DEFAULT.equalsIgnoreCase(this.context.getJobType()) && ((this.jobProcessor instanceof MapJobProcessor) || this.context.getExecuteMode().equals("broadcast"))) {
                        processResult = this.jobProcessor.process(this.context);
                    } else if (this.jobProcessor instanceof JobProcessorEx) {
                        ((JobProcessorEx) this.jobProcessor).preProcess(this.context);
                        processResult = this.jobProcessor.process(this.context);
                        ((JobProcessorEx) this.jobProcessor).postProcess(this.context);
                    } else {
                        processResult = this.jobProcessor.process(this.context);
                    }
                } catch (InterruptedException e) {
                    throw e;
                } catch (Throwable th) {
                    LOGGER.error("run fail uniqueId={}, serialNum={}", uniqueId, Long.valueOf(this.context.getSerialNum()), th);
                    processResult = new ProcessResult(InstanceStatus.FAILED, ExceptionUtil.getFixedErrMsgByThrowable(th, 1000));
                    this.logCollector.collect(this.context.getAppGroupId(), uniqueId, ClientLoggerMessage.JOB_PROCESSOR_EXEC_FAIL, th);
                }
                LOGGER.debug("container run finished, uniqueId={}, cost={}ms", uniqueId, Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
                if (processResult == null) {
                    processResult = new ProcessResult(InstanceStatus.FAILED, "result can't be null");
                }
            } else {
                processResult = new ProcessResult(InstanceStatus.FAILED, "jobProcessor is null");
                this.logCollector.collect(this.context.getAppGroupId(), uniqueId, ClientLoggerMessage.appendMessage(ClientLoggerMessage.JOB_PROCESSOR_EXEC_FAIL, processResult.getResult()));
            }
            if (this.context.getTaskMaxAttempt() > 0 && this.context.getTaskId() > 0 && processResult.getStatus().equals(InstanceStatus.FAILED) && Thread.currentThread().isAlive()) {
                int taskAttempt = this.context.getTaskAttempt();
                if (taskAttempt < this.context.getTaskMaxAttempt()) {
                    Thread.sleep(1000 * this.context.getTaskAttemptInterval());
                    this.context.setTaskAttempt(taskAttempt + 1);
                    start();
                    return;
                }
            }
        } catch (Throwable th2) {
            LOGGER.error("run fail uniqueId={}, serialNum={}", uniqueId, Long.valueOf(this.context.getSerialNum()), th2);
            processResult = new ProcessResult(InstanceStatus.FAILED, ExceptionUtil.getFixedErrMsgByThrowable(th2, 1000));
            this.logCollector.collect(this.context.getAppGroupId(), uniqueId, ClientLoggerMessage.JOB_PROCESSOR_EXEC_FAIL, th2);
        }
        this.containerPool.remove(this.context.getUniqueId());
        reportTaskStatus(processResult, str);
        this.containerPool.removeContext();
        executeAfterListener(this.context, processResult);
    }

    @Override // com.alibaba.schedulerx.worker.container.Container
    public void kill(boolean z) {
        LOGGER.info("kill container, jobInstanceId={}", Long.valueOf(this.context.getJobInstanceId()));
        if (this.jobProcessor != null && (this.jobProcessor instanceof JobProcessorEx)) {
            ((JobProcessorEx) this.jobProcessor).kill(this.context);
        }
        if (this.future != null) {
            this.future.cancel(z);
        }
        this.containerPool.remove(this.context.getUniqueId());
    }

    private void reportTaskStatus(ProcessResult processResult, String str) {
        Worker.ContainerReportTaskStatusRequest.Builder newBuilder = Worker.ContainerReportTaskStatusRequest.newBuilder();
        newBuilder.setJobId(this.context.getJobId());
        newBuilder.setJobInstanceId(this.context.getJobInstanceId());
        newBuilder.setTaskId(this.context.getTaskId());
        newBuilder.setStatus(processResult.getStatus().getValue());
        newBuilder.setWorkerAddr(str);
        newBuilder.setWorkerId(WorkerIdGenerator.get());
        newBuilder.setSerialNum(this.context.getSerialNum());
        if (StringUtils.isNotBlank(this.context.getTraceId())) {
            newBuilder.setTraceId(this.context.getTraceId());
        }
        newBuilder.setInstanceMasterActorPath(this.context.getInstanceMasterActorPath());
        if (this.context.getTaskName() != null) {
            newBuilder.setTaskName(this.context.getTaskName());
        }
        if (processResult.getResult() != null) {
            newBuilder.setResult(processResult.getResult());
        }
        boolean submitReq = WorkerConfigUtil.isEnableShareContainerPool() ? ContainerStatusReqHandlerPool.INSTANCE.submitReq(0L, newBuilder.build()) : ContainerStatusReqHandlerPool.INSTANCE.submitReq(this.context.getJobInstanceId(), newBuilder.build());
        LOGGER.info("reportTaskStatus instanceId={}, serialNum={}, submitResult={}, processResult={}", this.context.getUniqueId(), Long.valueOf(this.context.getSerialNum()), Boolean.valueOf(submitReq), processResult);
        if (submitReq) {
            return;
        }
        this.masterActorSelection.tell(newBuilder.build(), null);
    }

    public JobContext getContext() {
        return this.context;
    }

    public void setContext(JobContext jobContext) {
        this.context = jobContext;
    }

    public Future getFuture() {
        return this.future;
    }

    public void setFuture(Future future) {
        this.future = future;
    }

    public JobProcessor getJobProcessor() {
        return this.jobProcessor;
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // com.alibaba.schedulerx.worker.container.MarkedRunnable
    public Long identify() {
        return Long.valueOf(this.context.getJobInstanceId());
    }
}
