/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.data.redis.connection.lettuce;

import io.lettuce.core.api.StatefulConnection;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.reactive.BaseRedisReactiveCommands;
import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
import io.lettuce.core.cluster.api.reactive.RedisClusterReactiveCommands;
import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.connection.ReactiveGeoCommands;
import org.springframework.data.redis.connection.ReactiveHashCommands;
import org.springframework.data.redis.connection.ReactiveHyperLogLogCommands;
import org.springframework.data.redis.connection.ReactiveKeyCommands;
import org.springframework.data.redis.connection.ReactiveListCommands;
import org.springframework.data.redis.connection.ReactiveNumberCommands;
import org.springframework.data.redis.connection.ReactivePubSubCommands;
import org.springframework.data.redis.connection.ReactiveRedisConnection;
import org.springframework.data.redis.connection.ReactiveScriptingCommands;
import org.springframework.data.redis.connection.ReactiveServerCommands;
import org.springframework.data.redis.connection.ReactiveSetCommands;
import org.springframework.data.redis.connection.ReactiveStreamCommands;
import org.springframework.data.redis.connection.ReactiveStringCommands;
import org.springframework.data.redis.connection.ReactiveZSetCommands;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionProvider;
import org.springframework.data.redis.connection.lettuce.LettuceConverters;
import org.springframework.data.redis.connection.lettuce.LettuceFutureUtils;
import org.springframework.data.redis.connection.lettuce.LettuceReactiveGeoCommands;
import org.springframework.data.redis.connection.lettuce.LettuceReactiveHashCommands;
import org.springframework.data.redis.connection.lettuce.LettuceReactiveHyperLogLogCommands;
import org.springframework.data.redis.connection.lettuce.LettuceReactiveKeyCommands;
import org.springframework.data.redis.connection.lettuce.LettuceReactiveListCommands;
import org.springframework.data.redis.connection.lettuce.LettuceReactiveNumberCommands;
import org.springframework.data.redis.connection.lettuce.LettuceReactivePubSubCommands;
import org.springframework.data.redis.connection.lettuce.LettuceReactiveScriptingCommands;
import org.springframework.data.redis.connection.lettuce.LettuceReactiveServerCommands;
import org.springframework.data.redis.connection.lettuce.LettuceReactiveSetCommands;
import org.springframework.data.redis.connection.lettuce.LettuceReactiveStreamCommands;
import org.springframework.data.redis.connection.lettuce.LettuceReactiveStringCommands;
import org.springframework.data.redis.connection.lettuce.LettuceReactiveZSetCommands;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

class LettuceReactiveRedisConnection
implements ReactiveRedisConnection {
    static final RedisCodec<ByteBuffer, ByteBuffer> CODEC = ByteBufferCodec.INSTANCE;
    private final AsyncConnect<StatefulConnection<ByteBuffer, ByteBuffer>> dedicatedConnection;
    private final AsyncConnect<StatefulRedisPubSubConnection<ByteBuffer, ByteBuffer>> pubSubConnection;
    @Nullable
    private Mono<StatefulConnection<ByteBuffer, ByteBuffer>> sharedConnection;

    LettuceReactiveRedisConnection(LettuceConnectionProvider connectionProvider) {
        Assert.notNull((Object)connectionProvider, (String)"LettuceConnectionProvider must not be null!");
        this.dedicatedConnection = new AsyncConnect<StatefulConnection>(connectionProvider, StatefulConnection.class);
        this.pubSubConnection = new AsyncConnect<StatefulRedisPubSubConnection>(connectionProvider, StatefulRedisPubSubConnection.class);
    }

    LettuceReactiveRedisConnection(StatefulConnection<ByteBuffer, ByteBuffer> sharedConnection, LettuceConnectionProvider connectionProvider) {
        Assert.notNull(sharedConnection, (String)"Shared StatefulConnection must not be null!");
        Assert.notNull((Object)connectionProvider, (String)"LettuceConnectionProvider must not be null!");
        this.dedicatedConnection = new AsyncConnect<StatefulConnection>(connectionProvider, StatefulConnection.class);
        this.pubSubConnection = new AsyncConnect<StatefulRedisPubSubConnection>(connectionProvider, StatefulRedisPubSubConnection.class);
        this.sharedConnection = Mono.just(sharedConnection);
    }

    @Override
    public ReactiveKeyCommands keyCommands() {
        return new LettuceReactiveKeyCommands(this);
    }

    @Override
    public ReactiveStringCommands stringCommands() {
        return new LettuceReactiveStringCommands(this);
    }

    @Override
    public ReactiveNumberCommands numberCommands() {
        return new LettuceReactiveNumberCommands(this);
    }

    @Override
    public ReactiveListCommands listCommands() {
        return new LettuceReactiveListCommands(this);
    }

    @Override
    public ReactiveSetCommands setCommands() {
        return new LettuceReactiveSetCommands(this);
    }

    @Override
    public ReactiveZSetCommands zSetCommands() {
        return new LettuceReactiveZSetCommands(this);
    }

    @Override
    public ReactiveHashCommands hashCommands() {
        return new LettuceReactiveHashCommands(this);
    }

    @Override
    public ReactiveGeoCommands geoCommands() {
        return new LettuceReactiveGeoCommands(this);
    }

    @Override
    public ReactiveHyperLogLogCommands hyperLogLogCommands() {
        return new LettuceReactiveHyperLogLogCommands(this);
    }

    @Override
    public ReactivePubSubCommands pubSubCommands() {
        return new LettuceReactivePubSubCommands(this);
    }

    @Override
    public ReactiveScriptingCommands scriptingCommands() {
        return new LettuceReactiveScriptingCommands(this);
    }

    @Override
    public ReactiveServerCommands serverCommands() {
        return new LettuceReactiveServerCommands(this);
    }

    @Override
    public ReactiveStreamCommands streamCommands() {
        return new LettuceReactiveStreamCommands(this);
    }

    @Override
    public Mono<String> ping() {
        return this.execute(BaseRedisReactiveCommands::ping).next();
    }

    public <T> Flux<T> execute(LettuceReactiveCallback<T> callback) {
        return this.getCommands().flatMapMany(callback::doWithCommands).onErrorMap(this.translateException());
    }

    public <T> Flux<T> executeDedicated(LettuceReactiveCallback<T> callback) {
        return this.getDedicatedCommands().flatMapMany(callback::doWithCommands).onErrorMap(this.translateException());
    }

    @Override
    public Mono<Void> closeLater() {
        return Mono.fromRunnable(this.dedicatedConnection::close);
    }

    protected Mono<? extends StatefulConnection<ByteBuffer, ByteBuffer>> getConnection() {
        if (this.sharedConnection != null) {
            return this.sharedConnection;
        }
        return this.getDedicatedConnection();
    }

    protected Mono<StatefulConnection<ByteBuffer, ByteBuffer>> getDedicatedConnection() {
        return this.dedicatedConnection.getConnection().onErrorMap(this.translateException());
    }

    protected Mono<StatefulRedisPubSubConnection<ByteBuffer, ByteBuffer>> getPubSubConnection() {
        return this.pubSubConnection.getConnection().onErrorMap(this.translateException());
    }

    protected Mono<? extends RedisClusterReactiveCommands<ByteBuffer, ByteBuffer>> getCommands() {
        if (this.sharedConnection != null) {
            return this.sharedConnection.map(LettuceReactiveRedisConnection::getRedisClusterReactiveCommands);
        }
        return this.getDedicatedCommands();
    }

    protected Mono<? extends RedisClusterReactiveCommands<ByteBuffer, ByteBuffer>> getDedicatedCommands() {
        return this.dedicatedConnection.getConnection().map(LettuceReactiveRedisConnection::getRedisClusterReactiveCommands);
    }

    private static RedisClusterReactiveCommands<ByteBuffer, ByteBuffer> getRedisClusterReactiveCommands(StatefulConnection<ByteBuffer, ByteBuffer> connection) {
        if (connection instanceof StatefulRedisConnection) {
            return ((StatefulRedisConnection)connection).reactive();
        }
        if (connection instanceof StatefulRedisClusterConnection) {
            return ((StatefulRedisClusterConnection)connection).reactive();
        }
        throw new IllegalStateException("o.O unknown connection type " + connection);
    }

    <T> Function<Throwable, Throwable> translateException() {
        return throwable -> {
            if (throwable instanceof RuntimeException) {
                DataAccessException convertedException = (DataAccessException)LettuceConverters.exceptionConverter().convert((Object)((RuntimeException)throwable));
                return convertedException != null ? convertedException : throwable;
            }
            return throwable;
        };
    }

    static class AsyncConnect<T extends StatefulConnection<?, ?>> {
        private final Mono<T> connectionPublisher;
        private final LettuceConnectionProvider connectionProvider;
        private AtomicReference<State> state = new AtomicReference<State>(State.INITIAL);
        @Nullable
        private volatile StatefulConnection<ByteBuffer, ByteBuffer> connection;

        AsyncConnect(LettuceConnectionProvider connectionProvider, Class<T> connectionType) {
            Assert.notNull((Object)connectionProvider, (String)"LettuceConnectionProvider must not be null!");
            Assert.notNull(connectionType, (String)"Connection type must not be null!");
            this.connectionProvider = connectionProvider;
            Mono defer = Mono.defer(() -> Mono.fromCompletionStage(connectionProvider.getConnectionAsync(connectionType)));
            this.connectionPublisher = defer.doOnNext(it -> {
                if (AsyncConnect.isClosing(this.state.get())) {
                    it.closeAsync();
                } else {
                    this.connection = it;
                }
            }).cache().handle((connection, sink) -> {
                if (AsyncConnect.isClosing(this.state.get())) {
                    sink.error((Throwable)new IllegalStateException("Unable to connect. Connection is closed!"));
                } else {
                    sink.next(connection);
                }
            });
        }

        Mono<T> getConnection() {
            State state = this.state.get();
            if (AsyncConnect.isClosing(state)) {
                return Mono.error((Throwable)new IllegalStateException("Unable to connect. Connection is closed!"));
            }
            this.state.compareAndSet(State.INITIAL, State.CONNECTION_REQUESTED);
            return this.connectionPublisher;
        }

        void close() {
            if (this.state.compareAndSet(State.INITIAL, State.CLOSING) || this.state.compareAndSet(State.CONNECTION_REQUESTED, State.CLOSING)) {
                StatefulConnection<ByteBuffer, ByteBuffer> connection = this.connection;
                this.connection = null;
                if (connection != null) {
                    LettuceFutureUtils.join(this.connectionProvider.releaseAsync(connection));
                }
                this.state.set(State.CLOSED);
            }
        }

        private static boolean isClosing(State state) {
            return state == State.CLOSING || state == State.CLOSED;
        }

        static enum State {
            INITIAL,
            CONNECTION_REQUESTED,
            CLOSING,
            CLOSED;

        }
    }

    static enum ByteBufferCodec implements RedisCodec<ByteBuffer, ByteBuffer>
    {
        INSTANCE;


        public ByteBuffer decodeKey(ByteBuffer bytes) {
            ByteBuffer buffer = ByteBuffer.allocate(bytes.remaining());
            buffer.put(bytes);
            buffer.flip();
            return buffer;
        }

        public ByteBuffer decodeValue(ByteBuffer bytes) {
            return this.decodeKey(bytes);
        }

        public ByteBuffer encodeKey(ByteBuffer key) {
            return key.duplicate();
        }

        public ByteBuffer encodeValue(ByteBuffer value) {
            return value.duplicate();
        }
    }

    static interface LettuceReactiveCallback<T> {
        public Publisher<T> doWithCommands(RedisClusterReactiveCommands<ByteBuffer, ByteBuffer> var1);
    }
}

