/*
 * Decompiled with CFR 0.152.
 */
package com.alicp.jetcache.redis.lettuce;

import com.alicp.jetcache.CacheConfigException;
import com.alicp.jetcache.CacheManager;
import com.alicp.jetcache.CacheResult;
import com.alicp.jetcache.CacheResultCode;
import com.alicp.jetcache.ResultData;
import com.alicp.jetcache.redis.lettuce.LettuceConnectionManager;
import com.alicp.jetcache.redis.lettuce.RedisLettuceCacheConfig;
import com.alicp.jetcache.support.BroadcastManager;
import com.alicp.jetcache.support.CacheMessage;
import com.alicp.jetcache.support.JetCacheExecutor;
import com.alicp.jetcache.support.SquashedLogger;
import io.lettuce.core.RedisFuture;
import io.lettuce.core.api.async.BaseRedisAsyncCommands;
import io.lettuce.core.pubsub.RedisPubSubAdapter;
import io.lettuce.core.pubsub.api.async.RedisPubSubAsyncCommands;
import java.nio.charset.StandardCharsets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LettuceBroadcastManager
extends BroadcastManager {
    private static final Logger logger = LoggerFactory.getLogger(LettuceBroadcastManager.class);
    private final RedisLettuceCacheConfig<Object, Object> config;
    private final byte[] channel;
    private volatile boolean subscribeThreadStart;
    private volatile RedisPubSubAdapter<byte[], byte[]> pubSubAdapter;
    private final LettuceConnectionManager lettuceConnectionManager;
    private final BaseRedisAsyncCommands<byte[], byte[]> stringAsyncCommands;

    public LettuceBroadcastManager(CacheManager cacheManager, RedisLettuceCacheConfig<Object, Object> config) {
        super(cacheManager);
        this.checkConfig(config);
        if (config.getPubSubConnection() == null) {
            throw new CacheConfigException("PubSubConnection not set");
        }
        this.config = config;
        this.channel = config.getBroadcastChannel().getBytes(StandardCharsets.UTF_8);
        this.lettuceConnectionManager = config.getConnectionManager();
        this.lettuceConnectionManager.init(config.getRedisClient(), config.getConnection());
        this.stringAsyncCommands = (BaseRedisAsyncCommands)this.lettuceConnectionManager.asyncCommands(config.getRedisClient());
    }

    public CacheResult publish(CacheMessage cacheMessage) {
        try {
            byte[] value = (byte[])this.config.getValueEncoder().apply(cacheMessage);
            RedisFuture future = this.stringAsyncCommands.publish((Object)this.channel, (Object)value);
            return new CacheResult(future.handle((rt, ex) -> {
                if (ex != null) {
                    JetCacheExecutor.defaultExecutor().execute(() -> SquashedLogger.getLogger((Logger)logger).error((CharSequence)"jetcache publish error", ex));
                    return new ResultData(ex);
                }
                return new ResultData(CacheResultCode.SUCCESS, null, null);
            }));
        }
        catch (Exception ex2) {
            SquashedLogger.getLogger((Logger)logger).error((CharSequence)"jetcache publish error", (Throwable)ex2);
            return new CacheResult((Throwable)ex2);
        }
    }

    public synchronized void startSubscribe() {
        if (this.subscribeThreadStart) {
            throw new IllegalStateException("startSubscribe has invoked");
        }
        this.pubSubAdapter = new RedisPubSubAdapter<byte[], byte[]>(){

            public void message(byte[] channel, byte[] message) {
                LettuceBroadcastManager.this.processNotification(message, LettuceBroadcastManager.this.config.getValueDecoder());
            }
        };
        this.config.getPubSubConnection().addListener(this.pubSubAdapter);
        RedisPubSubAsyncCommands asyncCommands = this.config.getPubSubConnection().async();
        asyncCommands.subscribe((Object[])new byte[][]{this.channel});
        this.subscribeThreadStart = true;
    }

    public void close() {
        this.config.getPubSubConnection().removeListener(this.pubSubAdapter);
        this.config.getPubSubConnection().close();
    }
}

