/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dubbo.registry.redis;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.URLBuilder;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.ExecutorUtil;
import org.apache.dubbo.common.utils.NamedThreadFactory;
import org.apache.dubbo.common.utils.UrlUtils;
import org.apache.dubbo.registry.NotifyListener;
import org.apache.dubbo.registry.support.FailbackRegistry;
import org.apache.dubbo.remoting.redis.RedisClient;
import org.apache.dubbo.remoting.redis.jedis.ClusterRedisClient;
import org.apache.dubbo.remoting.redis.jedis.MonoRedisClient;
import org.apache.dubbo.remoting.redis.jedis.SentinelRedisClient;
import org.apache.dubbo.rpc.RpcException;
import redis.clients.jedis.JedisPubSub;

public class RedisRegistry
extends FailbackRegistry {
    private static final Logger logger = LoggerFactory.getLogger(RedisRegistry.class);
    private static final String DEFAULT_ROOT = "dubbo";
    private final ScheduledExecutorService expireExecutor = Executors.newScheduledThreadPool(1, (ThreadFactory)new NamedThreadFactory("DubboRegistryExpireTimer", true));
    private final ScheduledFuture<?> expireFuture;
    private final String root;
    private RedisClient redisClient;
    private final ConcurrentMap<String, Notifier> notifiers = new ConcurrentHashMap<String, Notifier>();
    private final int reconnectPeriod;
    private final int expirePeriod;
    private volatile boolean admin = false;

    public RedisRegistry(URL url) {
        super(url);
        String type = url.getParameter("redis-client", "mono");
        this.redisClient = "sentinel".equals(type) ? new SentinelRedisClient(url) : ("cluster".equals(type) ? new ClusterRedisClient(url) : new MonoRedisClient(url));
        if (url.isAnyHost()) {
            throw new IllegalStateException("registry address == null");
        }
        this.reconnectPeriod = url.getParameter("reconnect.period", 3000);
        String group = url.getParameter("group", DEFAULT_ROOT);
        if (!group.startsWith("/")) {
            group = "/" + group;
        }
        if (!group.endsWith("/")) {
            group = group + "/";
        }
        this.root = group;
        this.expirePeriod = url.getParameter("session", 60000);
        this.expireFuture = this.expireExecutor.scheduleWithFixedDelay(() -> {
            try {
                this.deferExpired();
            }
            catch (Throwable t) {
                logger.error("Unexpected exception occur at defer expire time, cause: " + t.getMessage(), t);
            }
        }, this.expirePeriod / 2, this.expirePeriod / 2, TimeUnit.MILLISECONDS);
    }

    private void deferExpired() {
        for (URL url : new HashSet(this.getRegistered())) {
            String key;
            if (!url.getParameter("dynamic", true) || this.redisClient.hset(key = this.toCategoryPath(url), url.toFullString(), String.valueOf(System.currentTimeMillis() + (long)this.expirePeriod)) != 1L) continue;
            this.redisClient.publish(key, "register");
        }
        if (this.admin) {
            this.clean();
        }
    }

    private void clean() {
        Set keys = this.redisClient.scan(this.root + "*");
        if (CollectionUtils.isNotEmpty((Collection)keys)) {
            for (String key : keys) {
                Map values = this.redisClient.hgetAll(key);
                if (!CollectionUtils.isNotEmptyMap((Map)values)) continue;
                boolean delete = false;
                long now = System.currentTimeMillis();
                for (Map.Entry entry : values.entrySet()) {
                    long expire;
                    URL url = URL.valueOf((String)((String)entry.getKey()));
                    if (!url.getParameter("dynamic", true) || (expire = Long.parseLong((String)entry.getValue())) >= now) continue;
                    this.redisClient.hdel(key, new String[]{(String)entry.getKey()});
                    delete = true;
                    if (!logger.isWarnEnabled()) continue;
                    logger.warn("Delete expired key: " + key + " -> value: " + (String)entry.getKey() + ", expire: " + new Date(expire) + ", now: " + new Date(now));
                }
                if (!delete) continue;
                this.redisClient.publish(key, "unregister");
            }
        }
    }

    public boolean isAvailable() {
        return this.redisClient.isConnected();
    }

    public void destroy() {
        super.destroy();
        try {
            this.expireFuture.cancel(true);
        }
        catch (Throwable t) {
            logger.warn(t.getMessage(), t);
        }
        try {
            for (Notifier notifier : this.notifiers.values()) {
                notifier.shutdown();
            }
        }
        catch (Throwable t) {
            logger.warn(t.getMessage(), t);
        }
        try {
            this.redisClient.destroy();
        }
        catch (Throwable t) {
            logger.warn("Failed to destroy the redis registry client. registry: " + this.getUrl().getAddress() + ", cause: " + t.getMessage(), t);
        }
        ExecutorUtil.gracefulShutdown((Executor)this.expireExecutor, (int)this.expirePeriod);
    }

    public void doRegister(URL url) {
        String key = this.toCategoryPath(url);
        String value = url.toFullString();
        String expire = String.valueOf(System.currentTimeMillis() + (long)this.expirePeriod);
        try {
            this.redisClient.hset(key, value, expire);
            this.redisClient.publish(key, "register");
        }
        catch (Throwable t) {
            throw new RpcException("Failed to register service to redis registry. registry: " + url.getAddress() + ", service: " + url + ", cause: " + t.getMessage(), t);
        }
    }

    public void doUnregister(URL url) {
        String key = this.toCategoryPath(url);
        String value = url.toFullString();
        try {
            this.redisClient.hdel(key, new String[]{value});
            this.redisClient.publish(key, "unregister");
        }
        catch (Throwable t) {
            throw new RpcException("Failed to unregister service to redis registry. registry: " + url.getAddress() + ", service: " + url + ", cause: " + t.getMessage(), t);
        }
    }

    public void doSubscribe(URL url, NotifyListener listener) {
        String service = this.toServicePath(url);
        Notifier notifier = (Notifier)this.notifiers.get(service);
        if (notifier == null) {
            Notifier newNotifier = new Notifier(service);
            this.notifiers.putIfAbsent(service, newNotifier);
            notifier = (Notifier)this.notifiers.get(service);
            if (notifier == newNotifier) {
                notifier.start();
            }
        }
        try {
            if (service.endsWith("*")) {
                this.admin = true;
                Set keys = this.redisClient.scan(service);
                if (CollectionUtils.isNotEmpty((Collection)keys)) {
                    HashMap<String, Set> serviceKeys = new HashMap<String, Set>();
                    for (String key : keys) {
                        String serviceKey = this.toServicePath(key);
                        Set sk = serviceKeys.computeIfAbsent(serviceKey, k -> new HashSet());
                        sk.add(key);
                    }
                    for (Set sk : serviceKeys.values()) {
                        this.doNotify(sk, url, Collections.singletonList(listener));
                    }
                }
            } else {
                this.doNotify(this.redisClient.scan(service + "/" + "*"), url, Collections.singletonList(listener));
            }
        }
        catch (Throwable t) {
            throw new RpcException("Failed to subscribe service from redis registry. registry: " + url.getAddress() + ", service: " + url + ", cause: " + t.getMessage(), t);
        }
    }

    public void doUnsubscribe(URL url, NotifyListener listener) {
    }

    private void doNotify(String key) {
        for (Map.Entry entry : new HashMap(this.getSubscribed()).entrySet()) {
            this.doNotify(Collections.singletonList(key), (URL)entry.getKey(), new HashSet<NotifyListener>((Collection)entry.getValue()));
        }
    }

    private void doNotify(Collection<String> keys, URL url, Collection<NotifyListener> listeners) {
        if (keys == null || keys.isEmpty() || listeners == null || listeners.isEmpty()) {
            return;
        }
        long now = System.currentTimeMillis();
        ArrayList result = new ArrayList();
        List<String> categories = Arrays.asList(url.getParameter("category", new String[0]));
        String consumerService = url.getServiceInterface();
        for (String key : keys) {
            String providerService;
            if (!"*".equals(consumerService) && !(providerService = this.toServiceName(key)).equals(consumerService)) continue;
            String category = this.toCategoryName(key);
            if (!categories.contains("*") && !categories.contains(category)) continue;
            ArrayList<URL> urls = new ArrayList<URL>();
            Map values = this.redisClient.hgetAll(key);
            if (CollectionUtils.isNotEmptyMap((Map)values)) {
                for (Map.Entry entry : values.entrySet()) {
                    URL u = URL.valueOf((String)((String)entry.getKey()));
                    if (u.getParameter("dynamic", true) && Long.parseLong((String)entry.getValue()) < now || !UrlUtils.isMatch((URL)url, (URL)u)) continue;
                    urls.add(u);
                }
            }
            if (urls.isEmpty()) {
                urls.add(URLBuilder.from((URL)url).setProtocol("empty").setAddress("0.0.0.0").setPath(this.toServiceName(key)).addParameter("category", category).build());
            }
            result.addAll(urls);
            if (!logger.isInfoEnabled()) continue;
            logger.info("redis notify: " + key + " = " + urls);
        }
        if (CollectionUtils.isEmpty(result)) {
            return;
        }
        for (NotifyListener listener : listeners) {
            this.notify(url, listener, result);
        }
    }

    private String toServiceName(String categoryPath) {
        String servicePath = this.toServicePath(categoryPath);
        return servicePath.startsWith(this.root) ? servicePath.substring(this.root.length()) : servicePath;
    }

    private String toCategoryName(String categoryPath) {
        int i = categoryPath.lastIndexOf("/");
        return i > 0 ? categoryPath.substring(i + 1) : categoryPath;
    }

    private String toServicePath(String categoryPath) {
        int i = categoryPath.startsWith(this.root) ? categoryPath.indexOf("/", this.root.length()) : categoryPath.indexOf("/");
        return i > 0 ? categoryPath.substring(0, i) : categoryPath;
    }

    private String toServicePath(URL url) {
        return this.root + url.getServiceInterface();
    }

    private String toCategoryPath(URL url) {
        return this.toServicePath(url) + "/" + url.getParameter("category", "providers");
    }

    private class Notifier
    extends Thread {
        private final String service;
        private final AtomicInteger connectSkip = new AtomicInteger();
        private final AtomicInteger connectSkipped = new AtomicInteger();
        private volatile boolean first = true;
        private volatile boolean running = true;
        private volatile int connectRandom;

        public Notifier(String service) {
            super.setDaemon(true);
            super.setName("DubboRedisSubscribe");
            this.service = service;
        }

        private void resetSkip() {
            this.connectSkip.set(0);
            this.connectSkipped.set(0);
            this.connectRandom = 0;
        }

        private boolean isSkip() {
            int skip = this.connectSkip.get();
            if (skip >= 10) {
                if (this.connectRandom == 0) {
                    this.connectRandom = ThreadLocalRandom.current().nextInt(10);
                }
                skip = 10 + this.connectRandom;
            }
            if (this.connectSkipped.getAndIncrement() < skip) {
                return true;
            }
            this.connectSkip.incrementAndGet();
            this.connectSkipped.set(0);
            this.connectRandom = 0;
            return false;
        }

        @Override
        public void run() {
            while (this.running) {
                try {
                    if (this.isSkip()) continue;
                    try {
                        if (!RedisRegistry.this.redisClient.isConnected()) continue;
                        try {
                            if (this.service.endsWith("*")) {
                                if (this.first) {
                                    this.first = false;
                                    Set keys = RedisRegistry.this.redisClient.scan(this.service);
                                    if (CollectionUtils.isNotEmpty((Collection)keys)) {
                                        for (String s : keys) {
                                            RedisRegistry.this.doNotify(s);
                                        }
                                    }
                                    this.resetSkip();
                                }
                                RedisRegistry.this.redisClient.psubscribe((JedisPubSub)new NotifySub(), new String[]{this.service});
                                continue;
                            }
                            if (this.first) {
                                this.first = false;
                                RedisRegistry.this.doNotify(this.service);
                                this.resetSkip();
                            }
                            RedisRegistry.this.redisClient.psubscribe((JedisPubSub)new NotifySub(), new String[]{this.service + "/" + "*"});
                        }
                        catch (Throwable t) {
                            logger.warn("Failed to subscribe service from redis registry. registry: " + RedisRegistry.this.getUrl().getAddress() + ", cause: " + t.getMessage(), t);
                            Notifier.sleep(RedisRegistry.this.reconnectPeriod);
                        }
                    }
                    catch (Throwable t) {
                        logger.error(t.getMessage(), t);
                        Notifier.sleep(RedisRegistry.this.reconnectPeriod);
                    }
                }
                catch (Throwable t) {
                    logger.error(t.getMessage(), t);
                }
            }
        }

        public void shutdown() {
            try {
                this.running = false;
                RedisRegistry.this.redisClient.disconnect();
            }
            catch (Throwable t) {
                logger.warn(t.getMessage(), t);
            }
        }
    }

    private class NotifySub
    extends JedisPubSub {
        public void onMessage(String key, String msg) {
            if (logger.isInfoEnabled()) {
                logger.info("redis event: " + key + " = " + msg);
            }
            if (msg.equals("register") || msg.equals("unregister")) {
                try {
                    RedisRegistry.this.doNotify(key);
                }
                catch (Throwable t) {
                    logger.error(t.getMessage(), t);
                }
            }
        }

        public void onPMessage(String pattern, String key, String msg) {
            this.onMessage(key, msg);
        }

        public void onSubscribe(String key, int num) {
        }

        public void onPSubscribe(String pattern, int num) {
        }

        public void onUnsubscribe(String key, int num) {
        }

        public void onPUnsubscribe(String pattern, int num) {
        }
    }
}

