/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.schedulerx.worker.actor;

import akka.actor.ActorSelection;
import akka.actor.Address;
import akka.actor.Props;
import akka.actor.UntypedActor;
import com.alibaba.fastjson.JSON;
import com.alibaba.schedulerx.common.domain.Metrics;
import com.alibaba.schedulerx.common.domain.TaskStatus;
import com.alibaba.schedulerx.common.monitor.MetricsCollector;
import com.alibaba.schedulerx.common.util.ConfigUtil;
import com.alibaba.schedulerx.common.util.ExceptionUtil;
import com.alibaba.schedulerx.common.util.IdUtil;
import com.alibaba.schedulerx.protocol.Worker;
import com.alibaba.schedulerx.worker.SchedulerxWorker;
import com.alibaba.schedulerx.worker.batch.ContainerStatusReqHandler;
import com.alibaba.schedulerx.worker.batch.ContainerStatusReqHandlerPool;
import com.alibaba.schedulerx.worker.batch.ReqQueue;
import com.alibaba.schedulerx.worker.container.Container;
import com.alibaba.schedulerx.worker.container.ContainerFactory;
import com.alibaba.schedulerx.worker.container.ContainerPool;
import com.alibaba.schedulerx.worker.container.ThreadContainerPool;
import com.alibaba.schedulerx.worker.domain.JobContext;
import com.alibaba.schedulerx.worker.log.LogFactory;
import com.alibaba.schedulerx.worker.log.Logger;
import com.alibaba.schedulerx.worker.logcollector.LogCollector;
import com.alibaba.schedulerx.worker.logcollector.LogCollectorFactory;
import com.alibaba.schedulerx.worker.pull.PullManager;
import com.alibaba.schedulerx.worker.util.ContanerUtil;
import com.alibaba.schedulerx.worker.util.WorkerConfigUtil;
import com.alibaba.schedulerx.worker.util.WorkerIdGenerator;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

public class ContainerActor
extends UntypedActor {
    private ContainerPool containerPool = ContainerFactory.getContainerPool();
    private ContainerStatusReqHandlerPool statusReqBatchHandlerPool = ContainerStatusReqHandlerPool.INSTANCE;
    private static Logger LOGGER = LogFactory.getLogger(ContainerActor.class);
    private static ThreadPoolExecutor containerStarter = new ThreadPoolExecutor(8, 8, 30L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory(){
        private final AtomicInteger nextId = new AtomicInteger(1);
        private final String namePrefix = "Schedulerx-Container-Starter-Thread-";

        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, "Schedulerx-Container-Starter-Thread-" + this.nextId.getAndIncrement());
        }
    }, new ThreadPoolExecutor.CallerRunsPolicy());
    private int batchSize = ConfigUtil.getWorkerConfig().getInt("worker.map.page.size", 1000);
    private LogCollector logCollector = LogCollectorFactory.get();

    public static Props props() {
        return Props.create(ContainerActor.class, new Object[0]);
    }

    @Override
    public void onReceive(Object obj) throws Throwable {
        if (obj instanceof Worker.MasterStartContainerRequest) {
            this.handleStartContainer((Worker.MasterStartContainerRequest)obj);
        } else if (obj instanceof Worker.MasterBatchStartContainersRequest) {
            this.handleBatchStartContainers((Worker.MasterBatchStartContainersRequest)obj);
        } else if (obj instanceof Worker.MasterKillContainerRequest) {
            this.handleKillContainer((Worker.MasterKillContainerRequest)obj);
        } else if (obj instanceof Worker.MasterDestroyContainerPoolRequest) {
            this.handleDestroyContainerPool((Worker.MasterDestroyContainerPoolRequest)obj);
        }
    }

    private void handleStartContainer(Worker.MasterStartContainerRequest request) {
        Worker.MasterStartContainerResponse response = null;
        try {
            if (!SchedulerxWorker.INITED) {
                LOGGER.warn("jobInstanceId={}, serialNum={}, worker not ready for running.", request.getJobInstanceId(), request.getSerialNum());
                response = Worker.MasterStartContainerResponse.newBuilder().setSuccess(false).setMessage("WORKER_NOT_RUNNING:worker not ready for running.").build();
            } else {
                LOGGER.info("jobInstanceId={}, serialNum={}, batch start container.", request.getJobInstanceId(), request.getSerialNum());
                String uniqueId = this.startContainer(request);
                response = Worker.MasterStartContainerResponse.newBuilder().setSuccess(true).build();
                LOGGER.debug("submit container to containerPool, uniqueId={}, cost={}ms", uniqueId, System.currentTimeMillis() - request.getScheduleTime());
            }
        }
        catch (Throwable e) {
            String uniqueId = IdUtil.getUniqueId(request.getJobId(), request.getJobInstanceId(), request.getTaskId());
            this.containerPool.remove(uniqueId);
            response = Worker.MasterStartContainerResponse.newBuilder().setSuccess(false).setMessage(ExceptionUtil.getMessage(e)).build();
            LOGGER.error("handleStartContainer error.", e);
            this.logCollector.collect(request.getAppGroupId(), uniqueId, "container start instance fail:", e);
        }
        this.getSender().tell(response, this.getSelf());
    }

    private void handleBatchStartContainers(Worker.MasterBatchStartContainersRequest request) {
        Worker.MasterBatchStartContainersResponse response;
        if (!SchedulerxWorker.INITED) {
            LOGGER.warn("worker not ready for running.");
            response = Worker.MasterBatchStartContainersResponse.newBuilder().setSuccess(false).setMessage("WORKER_NOT_RUNNING:worker not ready for running.").build();
        } else {
            LOGGER.info("jobInstanceId={}, serialNum={}, batch start containers, size:{}", request.getJobInstanceId(), request.getSerialNum(), request.getStartReqsCount());
            Metrics metrics = MetricsCollector.getMetrics();
            ThreadPoolExecutor sharedThreadPool = ThreadContainerPool.getInstance().getSharedThreadPool();
            if (sharedThreadPool != null && metrics != null) {
                Integer availableSize = sharedThreadPool.getCorePoolSize() - sharedThreadPool.getActiveCount() + ((int)Math.sqrt(sharedThreadPool.getCorePoolSize()) - sharedThreadPool.getQueue().size());
                metrics.setSharePoolAvailableSize(availableSize - request.getStartReqsCount());
            }
            containerStarter.submit(new ContainerStartRunnable(request));
            Worker.MasterBatchStartContainersResponse.Builder builder = Worker.MasterBatchStartContainersResponse.newBuilder();
            builder.setSuccess(true);
            builder.setMetricsJson(metrics != null ? JSON.toJSONString((Object)metrics) : "");
            response = builder.build();
        }
        this.getSender().tell(response, this.getSelf());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private String startContainer(Worker.MasterStartContainerRequest request) throws Exception {
        String uniqueId = IdUtil.getUniqueId(request.getJobId(), request.getJobInstanceId(), request.getTaskId());
        LOGGER.debug("handleStartContainer, uniqueId={}, cost={}ms", uniqueId, System.currentTimeMillis() - request.getScheduleTime());
        JobContext context = ContanerUtil.convert2JobContext(request);
        Container container = ContainerFactory.create(context);
        if (container != null) {
            AtomicLong jobInstanceLock;
            AtomicLong atomicLong = jobInstanceLock = this.containerPool.getInstanceLock(request.getJobInstanceId(), request.getSerialNum());
            synchronized (atomicLong) {
                long statusReqBatchHandlerKey;
                this.containerPool.put(context.getUniqueId(), container);
                boolean enableShareContainerPool = WorkerConfigUtil.isEnableShareContainerPool();
                long l = statusReqBatchHandlerKey = enableShareContainerPool ? 0L : request.getJobInstanceId();
                if (!this.statusReqBatchHandlerPool.contains(statusReqBatchHandlerKey)) {
                    ReqQueue reqQueue = new ReqQueue(statusReqBatchHandlerKey, 100000);
                    reqQueue.init();
                    this.statusReqBatchHandlerPool.start(statusReqBatchHandlerKey, new ContainerStatusReqHandler<Worker.ContainerReportTaskStatusRequest>(statusReqBatchHandlerKey, 1, 1, this.batchSize, reqQueue, request.getInstanceMasterAkkaPath()));
                }
                int consumerNum = request.hasConsumerNum() ? request.getConsumerNum() : 5;
                this.containerPool.submit(context.getJobId(), context.getJobInstanceId(), context.getTaskId(), container, consumerNum);
            }
        } else {
            LOGGER.warn("Container is null, uniqueId={}", uniqueId);
        }
        return uniqueId;
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private void handleKillContainer(Worker.MasterKillContainerRequest request) {
        Worker.MasterKillContainerResponse response;
        long jobId = request.getJobId();
        long jobInstanceId = request.getJobInstanceId();
        String uniqueIdToLogger = "";
        try {
            if (request.hasTaskId()) {
                long taskId = request.getTaskId();
                String uniqueId = IdUtil.getUniqueId(jobId, jobInstanceId, taskId);
                if (!this.containerPool.contain(uniqueId)) {
                    uniqueIdToLogger = uniqueId;
                    LOGGER.error("kill task container failed, uniqueId={}", uniqueIdToLogger);
                    Worker.MasterKillContainerResponse response2 = Worker.MasterKillContainerResponse.newBuilder().setSuccess(false).setMessage("kill task container failed, Task not running, uniqueId=" + uniqueIdToLogger).build();
                    this.getSender().tell(response2, this.getSelf());
                    return;
                }
                this.containerPool.get(uniqueId).kill(request.getMayInterruptIfRunning());
                uniqueIdToLogger = uniqueId;
                LOGGER.info("kill task container success, uniqueId={}", uniqueIdToLogger);
            } else {
                uniqueIdToLogger = IdUtil.getUniqueIdWithoutTask(jobId, jobInstanceId);
                this.killInstance(jobId, jobInstanceId, request.getMayInterruptIfRunning());
                LOGGER.info("kill instance success, uniqueId:{}", uniqueIdToLogger);
            }
            response = Worker.MasterKillContainerResponse.newBuilder().setSuccess(true).build();
            if (request.hasAppGroupId()) {
                this.logCollector.collect(request.getAppGroupId(), uniqueIdToLogger, "container kill instance success:");
            }
        }
        catch (Throwable t) {
            LOGGER.error("kill container exception", t);
            if (request.hasAppGroupId()) {
                this.logCollector.collect(request.getAppGroupId(), uniqueIdToLogger, "container kill instance fail:", t);
            }
            response = Worker.MasterKillContainerResponse.newBuilder().setSuccess(false).setMessage(t.getMessage()).build();
        }
        this.getSender().tell(response, this.getSelf());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private void handleDestroyContainerPool(Worker.MasterDestroyContainerPoolRequest request) {
        try {
            AtomicLong jobInstanceLock;
            ContainerStatusReqHandler<Worker.ContainerReportTaskStatusRequest> handler = this.statusReqBatchHandlerPool.getHandlers().get(request.getJobInstanceId());
            AtomicLong atomicLong = jobInstanceLock = this.containerPool.getInstanceLock(request.getJobInstanceId(), null);
            synchronized (atomicLong) {
                if (request.hasSerialNum() && jobInstanceLock.get() != request.getSerialNum()) {
                    LOGGER.info("skip handleDestroyContainerPool cycleId={}_{}, lock serialNum={}.", request.getJobInstanceId(), request.getSerialNum(), jobInstanceLock.get());
                    // MONITOREXIT @DISABLED, blocks:[0, 6, 12] lbl7 : MonitorExitStatement: MONITOREXIT : var4_6
                    Worker.MasterDestroyContainerPoolResponse response = Worker.MasterDestroyContainerPoolResponse.newBuilder().setSuccess(true).setDeliveryId(request.getDeliveryId()).build();
                    this.getSender().tell(response, this.getSelf());
                    return;
                }
                if (handler != null) {
                    if (handler.getLatestRequest() != null && request.hasSerialNum() && ((Worker.ContainerReportTaskStatusRequest)handler.getLatestRequest()).getSerialNum() != request.getSerialNum()) {
                        LOGGER.info("skip handleDestroyContainerPool cycleId={}_{}, handler serialNum={}.", request.getJobInstanceId(), request.getSerialNum(), ((Worker.ContainerReportTaskStatusRequest)handler.getLatestRequest()).getSerialNum());
                        // MONITOREXIT @DISABLED, blocks:[0, 6, 10, 11] lbl14 : MonitorExitStatement: MONITOREXIT : var4_6
                        Worker.MasterDestroyContainerPoolResponse response = Worker.MasterDestroyContainerPoolResponse.newBuilder().setSuccess(true).setDeliveryId(request.getDeliveryId()).build();
                        this.getSender().tell(response, this.getSelf());
                        return;
                    }
                    LOGGER.info("handleDestroyContainerPool from cycleId={}_{}, handler serialNum={}.", request.getJobInstanceId(), request.getSerialNum(), handler.getLatestRequest() != null ? ((Worker.ContainerReportTaskStatusRequest)handler.getLatestRequest()).getSerialNum() : 0L);
                    this.statusReqBatchHandlerPool.stop(request.getJobInstanceId());
                }
                PullManager.INSTANCE.stop(request.getJobInstanceId());
                this.containerPool.destroyByInstance(request.getJobInstanceId(), true);
            }
        }
        catch (Throwable th) {
            LOGGER.error("cycleId={}_{} handleDestroyContainerPool failed.", request.getJobInstanceId(), request.getSerialNum(), th);
        }
        catch (Throwable throwable) {
            throw throwable;
        }
        this.containerPool.releaseInstanceLock(request.getJobInstanceId());
    }

    private void killInstance(long jobId, long jobInstanceId, boolean mayInterruptIfRunning) {
        Map<String, Container> containerMap = this.containerPool.getContainerMap();
        String prefixKey = jobId + "_" + jobInstanceId;
        Iterator<Map.Entry<String, Container>> it = containerMap.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<String, Container> entry = it.next();
            String uniqueId = entry.getKey();
            Container container = entry.getValue();
            if (!uniqueId.startsWith(prefixKey)) continue;
            container.kill(mayInterruptIfRunning);
            it.remove();
        }
        this.containerPool.destroyByInstance(jobInstanceId, mayInterruptIfRunning);
    }

    static {
        containerStarter.allowCoreThreadTimeOut(true);
    }

    private class ContainerStartRunnable
    implements Runnable {
        private Worker.MasterBatchStartContainersRequest request;

        ContainerStartRunnable(Worker.MasterBatchStartContainersRequest request) {
            this.request = request;
        }

        @Override
        public void run() {
            for (Worker.MasterStartContainerRequest req : this.request.getStartReqsList()) {
                try {
                    String uniqueId = ContainerActor.this.startContainer(req);
                    LOGGER.debug("submit container to containerPool, uniqueId={}, cost={}ms", uniqueId, System.currentTimeMillis() - req.getScheduleTime());
                }
                catch (Throwable e) {
                    String uniqueId = IdUtil.getUniqueId(req.getJobId(), this.request.getJobInstanceId(), req.getTaskId());
                    ContainerActor.this.containerPool.remove(uniqueId);
                    ContainerActor.this.logCollector.collect(req.getAppGroupId(), uniqueId, "container start instance fail:", e);
                    Worker.ContainerReportTaskStatusRequest.Builder resultBuilder = Worker.ContainerReportTaskStatusRequest.newBuilder();
                    resultBuilder.setJobId(req.getJobId());
                    resultBuilder.setJobInstanceId(req.getJobInstanceId());
                    resultBuilder.setTaskId(req.getTaskId());
                    resultBuilder.setStatus(TaskStatus.FAILED.getValue());
                    resultBuilder.setResult(e.getMessage());
                    Address address = SchedulerxWorker.actorSystem.provider().getDefaultAddress();
                    String workerAddr = address.host().get() + ":" + address.port().get();
                    resultBuilder.setWorkerAddr(workerAddr);
                    resultBuilder.setWorkerId(WorkerIdGenerator.get());
                    if (req.getTaskName() != null) {
                        resultBuilder.setTaskName(req.getTaskName());
                    }
                    ActorSelection masterActorSelection = SchedulerxWorker.actorSystem.actorSelection(req.getInstanceMasterAkkaPath());
                    masterActorSelection.tell(resultBuilder.build(), null);
                }
            }
        }
    }
}

