/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.schedulerx.worker.timer;

import akka.actor.ActorSelection;
import com.alibaba.schedulerx.common.util.JsonUtil;
import com.alibaba.schedulerx.protocol.Worker;
import com.alibaba.schedulerx.protocol.utils.FutureUtils;
import com.alibaba.schedulerx.shade.com.google.common.collect.Lists;
import com.alibaba.schedulerx.shade.com.google.common.collect.Maps;
import com.alibaba.schedulerx.shade.org.apache.commons.collections.CollectionUtils;
import com.alibaba.schedulerx.shade.org.apache.commons.lang.StringUtils;
import com.alibaba.schedulerx.worker.SchedulerxWorker;
import com.alibaba.schedulerx.worker.batch.ContainerStatusReqHandlerPool;
import com.alibaba.schedulerx.worker.container.ContainerFactory;
import com.alibaba.schedulerx.worker.container.ContainerPool;
import com.alibaba.schedulerx.worker.timer.AbstractTimerTask;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class ZombieContainerCheckTimer
extends AbstractTimerTask {
    private ContainerStatusReqHandlerPool statusReqBatchHandlerPool = ContainerStatusReqHandlerPool.INSTANCE;
    private ContainerPool containerPool = ContainerFactory.getContainerPool();

    @Override
    public String getName() {
        return "ZombieContainerCheckTimer";
    }

    @Override
    public long getInitialDelay() {
        return 300L;
    }

    @Override
    public long getPeriod() {
        return 600L;
    }

    @Override
    public void run() {
        HashMap<String, ArrayList<Long>> masterPath2JobInstanceIds = Maps.newHashMap();
        ContainerPool containerPool = ContainerFactory.getContainerPool();
        containerPool.getInstanceMasterActorPathMap();
        for (Map.Entry<Long, String> entry : containerPool.getInstanceMasterActorPathMap().entrySet()) {
            if (!masterPath2JobInstanceIds.containsKey(entry.getValue())) {
                masterPath2JobInstanceIds.put(entry.getValue(), Lists.newArrayList(entry.getKey()));
                continue;
            }
            ((List)masterPath2JobInstanceIds.get(entry.getValue())).add(entry.getKey());
        }
        for (Map.Entry<Long, String> entry : masterPath2JobInstanceIds.entrySet()) {
            String masterCheckPath = ((String)((Object)entry.getKey())).replace("/user/task_routing", "/user/heartbeat_routing");
            List jobInstanceIds = (List)((Object)entry.getValue());
            Worker.ContainerCheckZombieRequest request = Worker.ContainerCheckZombieRequest.newBuilder().addAllJobInstanceId(jobInstanceIds).build();
            ActorSelection selection = SchedulerxWorker.actorSystem.actorSelection(masterCheckPath);
            try {
                Worker.ContainerCheckZombieResponse response = (Worker.ContainerCheckZombieResponse)FutureUtils.awaitResult(selection, (Object)request, 10L);
                List<Long> zombieJobInstanceIds = response.getZombieJobInstanceIdList();
                if (CollectionUtils.isEmpty(zombieJobInstanceIds)) continue;
                LOGGER.warn("detect zombieJobInstanceIds:{}, clean...", StringUtils.join(zombieJobInstanceIds, ","));
                for (Long zombieJobInstanceId : zombieJobInstanceIds) {
                    if (zombieJobInstanceId == 0L) continue;
                    this.statusReqBatchHandlerPool.stop(zombieJobInstanceId);
                    containerPool.destroyByInstance(zombieJobInstanceId, true);
                }
            }
            catch (Throwable e) {
                LOGGER.warn("JobInstanceId={}, masterCheckPath={}, ZombieContainerCheckTimer check error ", JsonUtil.toJson(jobInstanceIds), masterCheckPath, e);
            }
        }
    }
}

