/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.data.redis.connection.lettuce;

import io.lettuce.core.AbstractRedisClient;
import io.lettuce.core.api.StatefulConnection;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.support.AsyncConnectionPoolSupport;
import io.lettuce.core.support.AsyncPool;
import io.lettuce.core.support.BoundedPoolConfig;
import io.lettuce.core.support.CommonsPool2ConfigConverter;
import io.lettuce.core.support.ConnectionPoolSupport;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.data.redis.connection.PoolException;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionProvider;
import org.springframework.data.redis.connection.lettuce.LettuceFutureUtils;
import org.springframework.data.redis.connection.lettuce.LettucePoolingClientConfiguration;
import org.springframework.data.redis.connection.lettuce.RedisClientProvider;
import org.springframework.util.Assert;

class LettucePoolingConnectionProvider
implements LettuceConnectionProvider,
RedisClientProvider,
DisposableBean {
    private static final Log log = LogFactory.getLog(LettucePoolingConnectionProvider.class);
    private final LettuceConnectionProvider connectionProvider;
    private final GenericObjectPoolConfig poolConfig;
    private final Map<StatefulConnection<?, ?>, GenericObjectPool<StatefulConnection<?, ?>>> poolRef = new ConcurrentHashMap(32);
    private final Map<StatefulConnection<?, ?>, AsyncPool<StatefulConnection<?, ?>>> asyncPoolRef = new ConcurrentHashMap(32);
    private final Map<CompletableFuture<StatefulConnection<?, ?>>, AsyncPool<StatefulConnection<?, ?>>> inProgressAsyncPoolRef = new ConcurrentHashMap(32);
    private final Map<Class<?>, GenericObjectPool<StatefulConnection<?, ?>>> pools = new ConcurrentHashMap(32);
    private final Map<Class<?>, AsyncPool<StatefulConnection<?, ?>>> asyncPools = new ConcurrentHashMap(32);
    private final BoundedPoolConfig asyncPoolConfig;

    LettucePoolingConnectionProvider(LettuceConnectionProvider connectionProvider, LettucePoolingClientConfiguration clientConfiguration) {
        Assert.notNull((Object)connectionProvider, (String)"ConnectionProvider must not be null");
        Assert.notNull((Object)clientConfiguration, (String)"ClientConfiguration must not be null");
        this.connectionProvider = connectionProvider;
        this.poolConfig = clientConfiguration.getPoolConfig();
        this.asyncPoolConfig = CommonsPool2ConfigConverter.bounded((GenericObjectPoolConfig)this.poolConfig);
    }

    @Override
    public <T extends StatefulConnection<?, ?>> T getConnection(Class<T> connectionType) {
        GenericObjectPool pool = this.pools.computeIfAbsent(connectionType, poolType -> ConnectionPoolSupport.createGenericObjectPool(() -> this.connectionProvider.getConnection(connectionType), (GenericObjectPoolConfig)this.poolConfig, (boolean)false));
        try {
            StatefulConnection connection = (StatefulConnection)pool.borrowObject();
            this.poolRef.put(connection, pool);
            return (T)((StatefulConnection)connectionType.cast(connection));
        }
        catch (Exception ex) {
            throw new PoolException("Could not get a resource from the pool", ex);
        }
    }

    @Override
    public <T extends StatefulConnection<?, ?>> CompletionStage<T> getConnectionAsync(Class<T> connectionType) {
        AsyncPool pool = this.asyncPools.computeIfAbsent(connectionType, poolType -> AsyncConnectionPoolSupport.createBoundedObjectPool(() -> this.connectionProvider.getConnectionAsync(connectionType).thenApply(connectionType::cast), (BoundedPoolConfig)this.asyncPoolConfig, (boolean)false));
        CompletableFuture acquire = pool.acquire();
        this.inProgressAsyncPoolRef.put(acquire, pool);
        return ((CompletableFuture)acquire.whenComplete((connection, e) -> {
            this.inProgressAsyncPoolRef.remove(acquire);
            if (connection != null) {
                this.asyncPoolRef.put((StatefulConnection<?, ?>)connection, (AsyncPool<StatefulConnection<?, ?>>)pool);
            }
        })).thenApply(connectionType::cast);
    }

    @Override
    public AbstractRedisClient getRedisClient() {
        if (this.connectionProvider instanceof RedisClientProvider) {
            return ((RedisClientProvider)((Object)this.connectionProvider)).getRedisClient();
        }
        throw new IllegalStateException(String.format("Underlying connection provider %s does not implement RedisClientProvider", this.connectionProvider.getClass().getName()));
    }

    @Override
    public void release(StatefulConnection<?, ?> connection) {
        GenericObjectPool<StatefulConnection<?, ?>> pool = this.poolRef.remove(connection);
        if (pool == null) {
            AsyncPool<StatefulConnection<?, ?>> asyncPool = this.asyncPoolRef.remove(connection);
            if (asyncPool == null) {
                throw new PoolException("Returned connection " + connection + " was either previously returned or does not belong to this connection provider");
            }
            this.discardIfNecessary(connection);
            asyncPool.release(connection).join();
            return;
        }
        this.discardIfNecessary(connection);
        pool.returnObject(connection);
    }

    private void discardIfNecessary(StatefulConnection<?, ?> connection) {
        StatefulRedisConnection redisConnection;
        if (connection instanceof StatefulRedisConnection && (redisConnection = (StatefulRedisConnection)connection).isMulti()) {
            redisConnection.async().discard();
        }
    }

    @Override
    public CompletableFuture<Void> releaseAsync(StatefulConnection<?, ?> connection) {
        GenericObjectPool<StatefulConnection<?, ?>> blockingPool = this.poolRef.remove(connection);
        if (blockingPool != null) {
            log.warn((Object)"Releasing asynchronously a connection that was obtained from a non-blocking pool");
            blockingPool.returnObject(connection);
            return CompletableFuture.completedFuture(null);
        }
        AsyncPool<StatefulConnection<?, ?>> pool = this.asyncPoolRef.remove(connection);
        if (pool == null) {
            return LettuceFutureUtils.failed((Throwable)((Object)new PoolException("Returned connection " + connection + " was either previously returned or does not belong to this connection provider")));
        }
        return pool.release(connection);
    }

    public void destroy() throws Exception {
        ArrayList futures = new ArrayList();
        if (!this.poolRef.isEmpty() || !this.asyncPoolRef.isEmpty()) {
            log.warn((Object)"LettucePoolingConnectionProvider contains unreleased connections");
        }
        if (!this.inProgressAsyncPoolRef.isEmpty()) {
            log.warn((Object)"LettucePoolingConnectionProvider has active connection retrievals");
            this.inProgressAsyncPoolRef.forEach((k, v) -> futures.add(k.thenApply(StatefulConnection::closeAsync)));
        }
        if (!this.poolRef.isEmpty()) {
            this.poolRef.forEach((connection, pool) -> pool.returnObject(connection));
            this.poolRef.clear();
        }
        if (!this.asyncPoolRef.isEmpty()) {
            this.asyncPoolRef.forEach((connection, pool) -> futures.add(pool.release(connection)));
            this.asyncPoolRef.clear();
        }
        this.pools.forEach((type, pool) -> pool.close());
        ((CompletableFuture)((CompletableFuture)CompletableFuture.allOf((CompletableFuture[])futures.stream().map(it -> it.exceptionally(LettuceFutureUtils.ignoreErrors())).toArray(CompletableFuture[]::new)).thenCompose(ignored -> {
            CompletableFuture[] poolClose = (CompletableFuture[])this.asyncPools.values().stream().map(AsyncPool::closeAsync).map(it -> it.exceptionally(LettuceFutureUtils.ignoreErrors())).toArray(CompletableFuture[]::new);
            return CompletableFuture.allOf(poolClose);
        })).thenRun(() -> {
            this.asyncPoolRef.clear();
            this.inProgressAsyncPoolRef.clear();
        })).join();
        this.pools.clear();
    }
}

