/*
 * Decompiled with CFR 0.152.
 */
package tech.powerjob.remote.akka;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.DeadLetter;
import akka.actor.Props;
import akka.routing.RoundRobinPool;
import akka.routing.RouterConfig;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigMergeable;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import tech.powerjob.common.enums.Protocol;
import tech.powerjob.common.serialize.JsonUtils;
import tech.powerjob.remote.akka.AkkaConstant;
import tech.powerjob.remote.akka.AkkaMappingService;
import tech.powerjob.remote.akka.AkkaProxyActor;
import tech.powerjob.remote.akka.AkkaTransporter;
import tech.powerjob.remote.akka.AkkaTroubleshootingActor;
import tech.powerjob.remote.framework.actor.ActorInfo;
import tech.powerjob.remote.framework.base.Address;
import tech.powerjob.remote.framework.cs.CSInitializer;
import tech.powerjob.remote.framework.cs.CSInitializerConfig;
import tech.powerjob.remote.framework.transporter.Transporter;

public class AkkaCSInitializer
implements CSInitializer {
    private static final Logger log = LoggerFactory.getLogger(AkkaCSInitializer.class);
    private ActorSystem actorSystem;
    private CSInitializerConfig config;

    public String type() {
        return Protocol.AKKA.name();
    }

    public void init(CSInitializerConfig config) {
        this.config = config;
        Address bindAddress = config.getBindAddress();
        log.info("[PowerJob-AKKA] bindAddress: {}", (Object)bindAddress);
        HashMap overrideConfig = Maps.newHashMap();
        overrideConfig.put("akka.remote.artery.canonical.hostname", bindAddress.getHost());
        overrideConfig.put("akka.remote.artery.canonical.port", bindAddress.getPort());
        Config akkaBasicConfig = ConfigFactory.load((String)"powerjob.akka.conf");
        Config akkaFinalConfig = ConfigFactory.parseMap((Map)overrideConfig).withFallback((ConfigMergeable)akkaBasicConfig);
        log.info("[PowerJob-AKKA] try to start AKKA System.");
        String actorSystemName = AkkaConstant.fetchActorSystemName(config.getServerType());
        this.actorSystem = ActorSystem.create((String)actorSystemName, (Config)akkaFinalConfig);
        ActorRef troubleshootingActor = this.actorSystem.actorOf(Props.create(AkkaTroubleshootingActor.class, (Object[])new Object[0]), "troubleshooting");
        this.actorSystem.eventStream().subscribe(troubleshootingActor, DeadLetter.class);
        log.info("[PowerJob-AKKA] initialize actorSystem[{}] successfully!", (Object)this.actorSystem.name());
    }

    public Transporter buildTransporter() {
        return new AkkaTransporter(this.actorSystem);
    }

    public void bindHandlers(List<ActorInfo> actorInfos) {
        int cores = Runtime.getRuntime().availableProcessors();
        actorInfos.forEach(actorInfo -> {
            String rootPath = actorInfo.getAnno().path();
            AkkaMappingService.ActorConfig actorConfig = AkkaMappingService.parseActorName(rootPath);
            log.info("[PowerJob-AKKA] start to process actor[path={},config={}]", (Object)rootPath, (Object)JsonUtils.toJSONString((Object)actorConfig));
            this.actorSystem.actorOf(AkkaProxyActor.props(actorInfo).withDispatcher("akka.".concat(actorConfig.getDispatcherName())).withRouter((RouterConfig)new RoundRobinPool(cores)), actorConfig.getActorName());
        });
    }

    public void close() throws IOException {
        this.actorSystem.terminate();
    }
}

