/*
 * Decompiled with CFR 0.152.
 */
package com.aizuda.snailjob.server.starter.dispatch;

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import cn.hutool.core.collection.CollUtil;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.cache.CacheConsumerGroup;
import com.aizuda.snailjob.server.common.cache.CacheGroupScanActor;
import com.aizuda.snailjob.server.common.config.SystemProperties;
import com.aizuda.snailjob.server.common.dto.ScanTask;
import com.aizuda.snailjob.server.common.enums.SyetemTaskTypeEnum;
import com.aizuda.snailjob.server.retry.task.support.cache.CacheGroupRateLimiter;
import com.aizuda.snailjob.server.starter.dispatch.ConsumerBucket;
import com.aizuda.snailjob.template.datasource.access.AccessTemplate;
import com.aizuda.snailjob.template.datasource.persistence.mapper.ServerNodeMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.GroupConfig;
import com.aizuda.snailjob.template.datasource.persistence.po.ServerNode;
import com.baomidou.mybatisplus.core.conditions.Wrapper;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.support.SFunction;
import com.google.common.cache.Cache;
import com.google.common.util.concurrent.RateLimiter;
import java.util.List;
import java.util.Objects;
import lombok.Generated;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;

@Component(value="ScanBucketActor")
@Scope(value="prototype")
public class ConsumerBucketActor
extends AbstractActor {
    private final AccessTemplate accessTemplate;
    private final ServerNodeMapper serverNodeMapper;
    private final SystemProperties systemProperties;
    private static final String DEFAULT_JOB_KEY = "DEFAULT_JOB_KEY";
    private static final String DEFAULT_WORKFLOW_KEY = "DEFAULT_JOB_KEY";

    public AbstractActor.Receive createReceive() {
        return this.receiveBuilder().match(ConsumerBucket.class, consumerBucket -> {
            try {
                this.doDispatch((ConsumerBucket)consumerBucket);
            }
            catch (Exception e) {
                SnailJobLog.LOCAL.error("Data dispatcher processing exception. [{}]", new Object[]{consumerBucket, e});
            }
        }).build();
    }

    private void doDispatch(ConsumerBucket consumerBucket) {
        if (CollUtil.isEmpty(consumerBucket.getBuckets())) {
            return;
        }
        this.doScanJobAndWorkflow(consumerBucket);
        this.doScanRetry(consumerBucket);
    }

    private void doScanRetry(ConsumerBucket consumerBucket) {
        List groupConfigs = null;
        try {
            groupConfigs = this.accessTemplate.getGroupConfigAccess().list((LambdaQueryWrapper)((LambdaQueryWrapper)new LambdaQueryWrapper().select(new SFunction[]{GroupConfig::getGroupName, GroupConfig::getGroupPartition, GroupConfig::getNamespaceId}).eq(GroupConfig::getGroupStatus, (Object)StatusEnum.YES.getStatus())).in(GroupConfig::getBucketIndex, consumerBucket.getBuckets()));
        }
        catch (Exception e) {
            SnailJobLog.LOCAL.error("\u751f\u6210\u91cd\u8bd5\u4efb\u52a1\u5f02\u5e38.", new Object[]{e});
        }
        if (CollUtil.isNotEmpty(groupConfigs)) {
            for (GroupConfig groupConfig : groupConfigs) {
                CacheConsumerGroup.addOrUpdate((String)groupConfig.getGroupName(), (String)groupConfig.getNamespaceId());
                ScanTask scanTask = new ScanTask();
                scanTask.setNamespaceId(groupConfig.getNamespaceId());
                scanTask.setGroupName(groupConfig.getGroupName());
                scanTask.setBuckets(consumerBucket.getBuckets());
                scanTask.setGroupPartition(groupConfig.getGroupPartition());
                this.produceScanActorTask(scanTask);
            }
        }
    }

    private void doScanJobAndWorkflow(ConsumerBucket consumerBucket) {
        ScanTask scanTask = new ScanTask();
        scanTask.setBuckets(consumerBucket.getBuckets());
        ActorRef scanJobActorRef = this.cacheActorRef("DEFAULT_JOB_KEY", SyetemTaskTypeEnum.JOB);
        scanJobActorRef.tell((Object)scanTask, scanJobActorRef);
        ActorRef scanWorkflowActorRef = this.cacheActorRef("DEFAULT_JOB_KEY", SyetemTaskTypeEnum.WORKFLOW);
        scanWorkflowActorRef.tell((Object)scanTask, scanWorkflowActorRef);
    }

    private void produceScanActorTask(ScanTask scanTask) {
        String groupName = scanTask.getGroupName();
        this.cacheRateLimiter(groupName);
        ActorRef scanRetryActorRef = this.cacheActorRef(groupName, SyetemTaskTypeEnum.RETRY);
        scanRetryActorRef.tell((Object)scanTask, scanRetryActorRef);
        ActorRef scanCallbackActorRef = this.cacheActorRef(groupName, SyetemTaskTypeEnum.CALLBACK);
        scanCallbackActorRef.tell((Object)scanTask, scanCallbackActorRef);
    }

    private void cacheRateLimiter(String groupName) {
        List serverNodes = this.serverNodeMapper.selectList((Wrapper)new LambdaQueryWrapper().eq(ServerNode::getGroupName, (Object)groupName));
        Cache rateLimiterCache = CacheGroupRateLimiter.getAll();
        for (ServerNode serverNode : serverNodes) {
            RateLimiter rateLimiter = (RateLimiter)rateLimiterCache.getIfPresent((Object)serverNode.getHostId());
            if (!Objects.isNull(rateLimiter)) continue;
            rateLimiterCache.put((Object)serverNode.getHostId(), (Object)RateLimiter.create((double)this.systemProperties.getLimiter()));
        }
    }

    private ActorRef cacheActorRef(String groupName, SyetemTaskTypeEnum typeEnum) {
        ActorRef scanActorRef = CacheGroupScanActor.get((String)groupName, (SyetemTaskTypeEnum)typeEnum);
        if (Objects.isNull(scanActorRef)) {
            scanActorRef = (ActorRef)typeEnum.getActorRef().get();
            CacheGroupScanActor.put((String)groupName, (SyetemTaskTypeEnum)typeEnum, (ActorRef)scanActorRef);
        }
        return scanActorRef;
    }

    @Generated
    public ConsumerBucketActor(AccessTemplate accessTemplate, ServerNodeMapper serverNodeMapper, SystemProperties systemProperties) {
        this.accessTemplate = accessTemplate;
        this.serverNodeMapper = serverNodeMapper;
        this.systemProperties = systemProperties;
    }
}

