/*
 * Decompiled with CFR 0.152.
 */
package com.datastax.driver.core;

import com.datastax.driver.core.AuthProvider;
import com.datastax.driver.core.Authenticator;
import com.datastax.driver.core.CloseFuture;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ClusterNameMismatchException;
import com.datastax.driver.core.CodecRegistry;
import com.datastax.driver.core.Configuration;
import com.datastax.driver.core.DefaultResultSetFuture;
import com.datastax.driver.core.EndPoint;
import com.datastax.driver.core.ExceptionCode;
import com.datastax.driver.core.ExecutionInfo;
import com.datastax.driver.core.ExtendedAuthProvider;
import com.datastax.driver.core.ExtendedRemoteEndpointAwareSslOptions;
import com.datastax.driver.core.Frame;
import com.datastax.driver.core.FrameCompressor;
import com.datastax.driver.core.FramingFormatHandler;
import com.datastax.driver.core.GuavaCompatibility;
import com.datastax.driver.core.Host;
import com.datastax.driver.core.HostConnectionPool;
import com.datastax.driver.core.InboundTrafficMeter;
import com.datastax.driver.core.LwtInfo;
import com.datastax.driver.core.Message;
import com.datastax.driver.core.Metadata;
import com.datastax.driver.core.Metrics;
import com.datastax.driver.core.NettyOptions;
import com.datastax.driver.core.OutboundTrafficMeter;
import com.datastax.driver.core.ProtocolOptions;
import com.datastax.driver.core.ProtocolV1Authenticator;
import com.datastax.driver.core.ProtocolVersion;
import com.datastax.driver.core.RemoteEndpointAwareSSLOptions;
import com.datastax.driver.core.RequestHandler;
import com.datastax.driver.core.Requests;
import com.datastax.driver.core.Responses;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.SSLOptions;
import com.datastax.driver.core.ShardingInfo;
import com.datastax.driver.core.SocketOptions;
import com.datastax.driver.core.Statement;
import com.datastax.driver.core.StreamIdGenerator;
import com.datastax.driver.core.SystemProperties;
import com.datastax.driver.core.exceptions.AuthenticationException;
import com.datastax.driver.core.exceptions.BusyConnectionException;
import com.datastax.driver.core.exceptions.ConnectionException;
import com.datastax.driver.core.exceptions.CrcMismatchException;
import com.datastax.driver.core.exceptions.DriverException;
import com.datastax.driver.core.exceptions.DriverInternalError;
import com.datastax.driver.core.exceptions.FrameTooLongException;
import com.datastax.driver.core.exceptions.OperationTimedOutException;
import com.datastax.driver.core.exceptions.TransportException;
import com.datastax.driver.core.exceptions.UnsupportedProtocolVersionException;
import com.datastax.driver.core.utils.MoreFutures;
import com.datastax.driver.core.utils.MoreObjects;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.MapMaker;
import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.SettableFuture;
import com.google.common.util.concurrent.Uninterruptibles;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.DecoderException;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.Timeout;
import io.netty.util.Timer;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.io.IOException;
import java.lang.ref.WeakReference;
import java.net.BindException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.SocketAddress;
import java.nio.channels.ClosedChannelException;
import java.security.InvalidParameterException;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class Connection {
    private static final Logger logger = LoggerFactory.getLogger(Connection.class);
    private static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
    private static final boolean DISABLE_COALESCING = SystemProperties.getBoolean("com.datastax.driver.DISABLE_COALESCING", false);
    private static final int FLUSHER_SCHEDULE_PERIOD_NS = SystemProperties.getInt("com.datastax.driver.FLUSHER_SCHEDULE_PERIOD_NS", 10000);
    private static final long ADV_SHARD_AWARENESS_BLOCK_ON_NAT = 60000000000L;
    private static final long ADV_SHARD_AWARENESS_BLOCK_ON_ERROR = 300000L;
    final AtomicReference<State> state = new AtomicReference<State>(State.OPEN);
    volatile long maxIdleTime;
    EndPoint endPoint;
    private final String name;
    private volatile Integer shardId = null;
    private int requestedShardId = -1;
    @VisibleForTesting
    volatile Channel channel;
    private final Factory factory;
    @VisibleForTesting
    final Dispatcher dispatcher;
    final AtomicInteger inFlight = new AtomicInteger(0);
    private final AtomicInteger writer = new AtomicInteger(0);
    private final AtomicReference<SetKeyspaceAttempt> targetKeyspace;
    private final SetKeyspaceAttempt defaultKeyspaceAttempt;
    private volatile boolean isInitialized;
    private final AtomicBoolean isDefunct = new AtomicBoolean();
    private final AtomicBoolean signaled = new AtomicBoolean();
    private final AtomicReference<ConnectionCloseFuture> closeFuture = new AtomicReference();
    private final AtomicReference<Owner> ownerRef = new AtomicReference();
    private static final ConcurrentMap<EventLoop, Flusher> flusherLookup = new MapMaker().concurrencyLevel(16).weakKeys().makeMap();
    private static final ResponseCallback HEARTBEAT_CALLBACK = new ResponseCallback(){

        @Override
        public Message.Request request() {
            return new Requests.Options();
        }

        @Override
        public int retryCount() {
            return 0;
        }

        @Override
        public void onSet(Connection connection, Message.Response response, long latency, int retryCount) {
            switch (response.type) {
                case SUPPORTED: {
                    logger.debug("{} heartbeat query succeeded", (Object)connection);
                    break;
                }
                case ERROR: {
                    Responses.Error error = (Responses.Error)response;
                    this.fail(connection, new ConnectionException(connection.endPoint, String.format("Got ERROR response message from server to a heartbeat query: %s", error.message)));
                }
                default: {
                    this.fail(connection, new ConnectionException(connection.endPoint, "Unexpected heartbeat response: " + response));
                }
            }
        }

        @Override
        public void onException(Connection connection, Exception exception, long latency, int retryCount) {
        }

        @Override
        public boolean onTimeout(Connection connection, long latency, int retryCount) {
            this.fail(connection, new ConnectionException(connection.endPoint, "Heartbeat query timed out"));
            return true;
        }

        private void fail(Connection connection, Exception e) {
            connection.defunct(e);
        }
    };

    protected Connection(String name, EndPoint endPoint, Factory factory, Owner owner) {
        this.endPoint = endPoint;
        this.factory = factory;
        this.dispatcher = new Dispatcher();
        this.name = name;
        this.ownerRef.set(owner);
        ListenableFuture thisFuture = Futures.immediateFuture((Object)this);
        this.defaultKeyspaceAttempt = new SetKeyspaceAttempt(null, (ListenableFuture<Connection>)thisFuture);
        this.targetKeyspace = new AtomicReference<SetKeyspaceAttempt>(this.defaultKeyspaceAttempt);
    }

    Connection(String name, EndPoint endPoint, Factory factory) {
        this(name, endPoint, factory, null);
    }

    ListenableFuture<Void> initAsync() {
        return this.initAsync(-1, 0);
    }

    ListenableFuture<Void> initAsync(final int shardId, int serverPort) {
        if (this.factory.isShutdown) {
            return Futures.immediateFailedFuture((Throwable)new ConnectionException(this.endPoint, "Connection factory is shut down"));
        }
        this.requestedShardId = shardId;
        final ProtocolVersion protocolVersion = this.factory.protocolVersion == null ? ProtocolVersion.DEFAULT : this.factory.protocolVersion;
        final SettableFuture channelReadyFuture = SettableFuture.create();
        try {
            ChannelFuture future;
            int highPort;
            int lowPort;
            ShardingInfo shardingInfo;
            final ProtocolOptions protocolOptions = this.factory.configuration.getProtocolOptions();
            final Bootstrap bootstrap = this.factory.newBootstrap();
            this.prepareBootstrap(bootstrap, protocolVersion, protocolOptions);
            final InetSocketAddress serverAddress = serverPort == 0 ? this.endPoint.resolve() : new InetSocketAddress(this.endPoint.resolve().getAddress(), serverPort);
            Owner owner = this.ownerRef.get();
            final HostConnectionPool pool = owner instanceof HostConnectionPool ? (HostConnectionPool)owner : null;
            ShardingInfo shardingInfo2 = shardingInfo = pool == null ? null : pool.host.getShardingInfo();
            if (shardingInfo == null && shardId != -1) {
                throw new InvalidParameterException(MessageFormat.format("Requested connection to shard {0} of host {1}:{2}, but sharding info or pool is absent", shardId, serverAddress.getAddress().getHostAddress(), serverPort));
            }
            if (pool != null) {
                lowPort = pool.manager.configuration().getProtocolOptions().getLowLocalPort();
                highPort = pool.manager.configuration().getProtocolOptions().getHighLocalPort();
            } else {
                highPort = -1;
                lowPort = -1;
            }
            if (shardId == -1) {
                future = bootstrap.connect((SocketAddress)serverAddress);
            } else {
                int localPort = PortAllocator.getNextAvailablePort(shardingInfo.getShardsCount(), shardId, lowPort, highPort);
                if (localPort == -1) {
                    throw new RuntimeException("Can't find free local port to use");
                }
                future = bootstrap.connect((SocketAddress)serverAddress, (SocketAddress)new InetSocketAddress(localPort));
                logger.debug("Connecting to shard {} using local port {} (shardCount: {})\n", new Object[]{shardId, localPort, shardingInfo.getShardsCount()});
            }
            ChannelFutureListener channelListener = new ChannelFutureListener(){

                public void operationComplete(ChannelFuture future) throws Exception {
                    if (future.cause() != null) {
                        int localPort;
                        if (shardId != -1 && future.cause().getCause() instanceof BindException && (localPort = PortAllocator.getNextAvailablePort(shardingInfo.getShardsCount(), shardId, lowPort, highPort)) != -1) {
                            if (future.channel() != null) {
                                future.channel().close().addListener((GenericFutureListener)new ChannelFutureListener(){

                                    public void operationComplete(ChannelFuture future) throws Exception {
                                        if (future.cause() != null) {
                                            logger.warn("Error while closing old channel", future.cause());
                                        }
                                    }
                                });
                            }
                            Connection.this.prepareBootstrap(bootstrap, protocolVersion, protocolOptions);
                            ChannelFuture newFuture = bootstrap.connect((SocketAddress)serverAddress, (SocketAddress)new InetSocketAddress(localPort));
                            newFuture.addListener((GenericFutureListener)this);
                            logger.debug("Retrying connecting to shard {} using local port {} (shardCount: {})\n", new Object[]{shardId, localPort, shardingInfo.getShardsCount()});
                            return;
                        }
                        logger.warn("Error creating netty channel to " + Connection.this.endPoint, future.cause());
                    }
                    Connection.this.writer.decrementAndGet();
                    Connection.this.channel = future.channel();
                    if (Connection.this.isClosed() && Connection.this.channel != null) {
                        Connection.this.channel.close().addListener((GenericFutureListener)new ChannelFutureListener(){

                            public void operationComplete(ChannelFuture future) throws Exception {
                                channelReadyFuture.setException((Throwable)new TransportException(Connection.this.endPoint, "Connection closed during initialization."));
                            }
                        });
                    } else {
                        if (Connection.this.channel != null) {
                            Connection.this.factory.allChannels.add((Object)Connection.this.channel);
                        }
                        if (!future.isSuccess()) {
                            if (logger.isDebugEnabled()) {
                                logger.debug(String.format("%s Error connecting to %s%s", Connection.this, Connection.this.endPoint, Connection.extractMessage(future.cause())));
                            }
                            channelReadyFuture.setException((Throwable)new TransportException(Connection.this.endPoint, "Cannot connect", future.cause()));
                            if (shardId != -1) {
                                pool.tempBlockAdvShardAwareness(300000L);
                            }
                        } else {
                            assert (Connection.this.channel != null);
                            logger.debug("{} Connection established, initializing transport", (Object)Connection.this);
                            Connection.this.channel.closeFuture().addListener((GenericFutureListener)new ChannelCloseListener());
                            channelReadyFuture.set(null);
                        }
                    }
                }
            };
            this.writer.incrementAndGet();
            future.addListener((GenericFutureListener)channelListener);
        }
        catch (RuntimeException e) {
            this.closeAsync().force();
            throw e;
        }
        Executor initExecutor = this.factory.manager.configuration.getPoolingOptions().getInitializationExecutor();
        ListenableFuture<Void> queryOptionsFuture = GuavaCompatibility.INSTANCE.transformAsync(channelReadyFuture, this.onChannelReady(protocolVersion, initExecutor), initExecutor);
        ListenableFuture<Void> initializeTransportFuture = GuavaCompatibility.INSTANCE.transformAsync(queryOptionsFuture, this.onOptionsReady(protocolVersion, initExecutor), initExecutor);
        ListenableFuture<Void> initFuture = GuavaCompatibility.INSTANCE.withFallback(initializeTransportFuture, new AsyncFunction<Throwable, Void>(){

            public ListenableFuture<Void> apply(Throwable t) throws Exception {
                SettableFuture future = SettableFuture.create();
                if (t instanceof ClusterNameMismatchException || t instanceof UnsupportedProtocolVersionException) {
                    Connection.this.closeAsync().force();
                    future.setException(t);
                } else {
                    Throwable e = t instanceof ConnectionException || t instanceof DriverException || t instanceof InterruptedException || t instanceof Error ? t : new ConnectionException(Connection.this.endPoint, String.format("Unexpected error during transport initialization (%s)", t), t);
                    future.setException(Connection.this.defunct(e));
                }
                return future;
            }
        }, initExecutor);
        GuavaCompatibility.INSTANCE.addCallback(initFuture, new MoreFutures.FailureCallback<Void>(){

            public void onFailure(Throwable t) {
                if (!Connection.this.isClosed()) {
                    Connection.this.closeAsync().force();
                }
            }
        }, initExecutor);
        return initFuture;
    }

    private Bootstrap prepareBootstrap(Bootstrap bootstrap, ProtocolVersion protocolVersion, ProtocolOptions protocolOptions) {
        bootstrap.handler((ChannelHandler)new Initializer(this, protocolVersion, protocolOptions.getCompression().compressor(), protocolOptions.getSSLOptions(), this.factory.configuration.getPoolingOptions().getHeartbeatIntervalSeconds(), this.factory.configuration.getNettyOptions(), this.factory.configuration.getCodecRegistry(), this.factory.configuration.getMetricsOptions().isEnabled() ? this.factory.manager.metrics : null));
        return bootstrap;
    }

    private static String extractMessage(Throwable t) {
        if (t == null) {
            return "";
        }
        String msg = t.getMessage() == null || t.getMessage().isEmpty() ? t.toString() : t.getMessage();
        return " (" + msg + ')';
    }

    public ListenableFuture<String> optionsQuery() {
        Future startupOptionsFuture = this.write(new Requests.Options());
        return GuavaCompatibility.INSTANCE.transformAsync(startupOptionsFuture, this.onSupportedResponse());
    }

    private AsyncFunction<Void, Void> onChannelReady(final ProtocolVersion protocolVersion, final Executor initExecutor) {
        return new AsyncFunction<Void, Void>(){

            public ListenableFuture<Void> apply(Void input) throws Exception {
                Future startupOptionsFuture = Connection.this.write(new Requests.Options());
                return GuavaCompatibility.INSTANCE.transformAsync(startupOptionsFuture, Connection.this.onOptionsResponse(protocolVersion, initExecutor), initExecutor);
            }
        };
    }

    private AsyncFunction<Message.Response, Void> onOptionsResponse(final ProtocolVersion protocolVersion, Executor initExecutor) {
        return new AsyncFunction<Message.Response, Void>(){

            public ListenableFuture<Void> apply(Message.Response response) throws Exception {
                switch (response.type) {
                    case SUPPORTED: {
                        LwtInfo lwt;
                        Responses.Supported msg = (Responses.Supported)response;
                        ShardingInfo.ConnectionShardingInfo sharding = ShardingInfo.parseShardingInfo(msg.supported);
                        if (sharding != null) {
                            Connection.this.getHost().setShardingInfo(sharding.shardingInfo);
                            Connection.this.shardId = sharding.shardId;
                            if (Connection.this.requestedShardId != -1 && Connection.this.requestedShardId != sharding.shardId) {
                                logger.warn("Advanced shard awareness: requested connection to shard {}, but connected to {}. Is there a NAT between client and server?", (Object)Connection.this.requestedShardId, (Object)sharding.shardId);
                                ((HostConnectionPool)Connection.this.ownerRef.get()).tempBlockAdvShardAwareness(60000000000L);
                            }
                        } else {
                            Connection.this.getHost().setShardingInfo(null);
                            Connection.this.shardId = 0;
                        }
                        if ((lwt = LwtInfo.parseLwtInfo(msg.supported)) != null) {
                            Connection.this.getHost().setLwtInfo(lwt);
                        }
                        return MoreFutures.VOID_SUCCESS;
                    }
                    case ERROR: {
                        Responses.Error error = (Responses.Error)response;
                        if (Connection.this.isUnsupportedProtocolVersion(error)) {
                            throw Connection.this.unsupportedProtocolVersionException(protocolVersion, error.serverProtocolVersion);
                        }
                        throw new TransportException(Connection.this.endPoint, String.format("Got ERROR response message from server to an OPTIONS message: %s", error.message));
                    }
                }
                throw new TransportException(Connection.this.endPoint, String.format("Unexpected %s response message from server to an OPTIONS message", new Object[]{response.type}));
            }
        };
    }

    private AsyncFunction<Void, Void> onOptionsReady(final ProtocolVersion protocolVersion, final Executor initExecutor) {
        return new AsyncFunction<Void, Void>(){

            public ListenableFuture<Void> apply(Void input) throws Exception {
                ProtocolOptions protocolOptions = ((Connection)Connection.this).factory.configuration.getProtocolOptions();
                HashMap<String, String> extraOptions = new HashMap<String, String>();
                LwtInfo lwtInfo = Connection.this.getHost().getLwtInfo();
                if (lwtInfo != null) {
                    lwtInfo.addOption(extraOptions);
                }
                Future startupResponseFuture = Connection.this.write(new Requests.Startup(protocolOptions.getCompression(), protocolOptions.isNoCompact(), extraOptions));
                return GuavaCompatibility.INSTANCE.transformAsync(startupResponseFuture, Connection.this.onStartupResponse(protocolVersion, initExecutor), initExecutor);
            }
        };
    }

    private AsyncFunction<Message.Response, String> onSupportedResponse() {
        return new AsyncFunction<Message.Response, String>(){

            public ListenableFuture<String> apply(Message.Response response) throws Exception {
                switch (response.type) {
                    case SUPPORTED: {
                        return Connection.this.getProductType((Responses.Supported)response);
                    }
                    case ERROR: {
                        Responses.Error error = (Responses.Error)response;
                        throw new TransportException(Connection.this.endPoint, String.format("Error initializing connection: %s", error.message));
                    }
                }
                throw new TransportException(Connection.this.endPoint, String.format("Unexpected %s response message from server to a STARTUP message", new Object[]{response.type}));
            }
        };
    }

    private AsyncFunction<Message.Response, Void> onStartupResponse(final ProtocolVersion protocolVersion, final Executor initExecutor) {
        return new AsyncFunction<Message.Response, Void>(){

            public ListenableFuture<Void> apply(Message.Response response) throws Exception {
                switch (response.type) {
                    case READY: {
                        return Connection.this.checkClusterName(protocolVersion, initExecutor);
                    }
                    case ERROR: {
                        Responses.Error error = (Responses.Error)response;
                        if (Connection.this.isUnsupportedProtocolVersion(error)) {
                            throw Connection.this.unsupportedProtocolVersionException(protocolVersion, error.serverProtocolVersion);
                        }
                        throw new TransportException(Connection.this.endPoint, String.format("Error initializing connection: %s", error.message));
                    }
                    case AUTHENTICATE: {
                        Authenticator authenticator;
                        Responses.Authenticate authenticate = (Responses.Authenticate)response;
                        try {
                            authenticator = ((Connection)Connection.this).factory.authProvider instanceof ExtendedAuthProvider ? ((ExtendedAuthProvider)((Connection)Connection.this).factory.authProvider).newAuthenticator(Connection.this.endPoint, authenticate.authenticator) : ((Connection)Connection.this).factory.authProvider.newAuthenticator(Connection.this.endPoint.resolve(), authenticate.authenticator);
                        }
                        catch (AuthenticationException e) {
                            Connection.this.incrementAuthErrorMetric();
                            throw e;
                        }
                        switch (protocolVersion) {
                            case V1: {
                                if (authenticator instanceof ProtocolV1Authenticator) {
                                    return Connection.this.authenticateV1(authenticator, protocolVersion, initExecutor);
                                }
                                return Connection.this.authenticateV2(authenticator, protocolVersion, initExecutor);
                            }
                            case V2: 
                            case V3: 
                            case V4: 
                            case V5: 
                            case V6: {
                                return Connection.this.authenticateV2(authenticator, protocolVersion, initExecutor);
                            }
                        }
                        throw Connection.this.defunct(protocolVersion.unsupported());
                    }
                }
                throw new TransportException(Connection.this.endPoint, String.format("Unexpected %s response message from server to a STARTUP message", new Object[]{response.type}));
            }
        };
    }

    private ListenableFuture<Void> checkClusterName(ProtocolVersion protocolVersion, Executor executor) {
        final String expected = this.factory.manager.metadata.clusterName;
        if (expected == null) {
            this.markInitialized();
            return MoreFutures.VOID_SUCCESS;
        }
        DefaultResultSetFuture clusterNameFuture = new DefaultResultSetFuture(null, protocolVersion, new Requests.Query("select cluster_name from system.local where key = 'local'"));
        try {
            this.write(clusterNameFuture);
            return GuavaCompatibility.INSTANCE.transformAsync(clusterNameFuture, new AsyncFunction<ResultSet, Void>(){

                public ListenableFuture<Void> apply(ResultSet rs) throws Exception {
                    Row row = rs.one();
                    String actual = row.getString("cluster_name");
                    if (!expected.equals(actual)) {
                        throw new ClusterNameMismatchException(Connection.this.endPoint, actual, expected);
                    }
                    Connection.this.markInitialized();
                    return MoreFutures.VOID_SUCCESS;
                }
            }, executor);
        }
        catch (Exception e) {
            return Futures.immediateFailedFuture((Throwable)e);
        }
    }

    private ListenableFuture<String> getProductType(Responses.Supported response) {
        if (response.supported.containsKey("PRODUCT_TYPE") && response.supported.get("PRODUCT_TYPE").size() > 0) {
            return Futures.immediateFuture((Object)response.supported.get("PRODUCT_TYPE").get(0));
        }
        return Futures.immediateFuture((Object)"");
    }

    private void markInitialized() {
        this.isInitialized = true;
        Host.statesLogger.debug("[{}] {} Transport initialized, connection ready", (Object)this.endPoint, (Object)this);
    }

    private ListenableFuture<Void> authenticateV1(Authenticator authenticator, final ProtocolVersion protocolVersion, final Executor executor) {
        Requests.Credentials creds = new Requests.Credentials(((ProtocolV1Authenticator)((Object)authenticator)).getCredentials());
        try {
            Future authResponseFuture = this.write(creds);
            return GuavaCompatibility.INSTANCE.transformAsync(authResponseFuture, new AsyncFunction<Message.Response, Void>(){

                public ListenableFuture<Void> apply(Message.Response authResponse) throws Exception {
                    switch (authResponse.type) {
                        case READY: {
                            return Connection.this.checkClusterName(protocolVersion, executor);
                        }
                        case ERROR: {
                            Connection.this.incrementAuthErrorMetric();
                            throw new AuthenticationException(Connection.this.endPoint, ((Responses.Error)authResponse).message);
                        }
                    }
                    throw new TransportException(Connection.this.endPoint, String.format("Unexpected %s response message from server to a CREDENTIALS message", new Object[]{authResponse.type}));
                }
            }, executor);
        }
        catch (Exception e) {
            return Futures.immediateFailedFuture((Throwable)e);
        }
    }

    private ListenableFuture<Void> authenticateV2(Authenticator authenticator, ProtocolVersion protocolVersion, Executor executor) {
        byte[] initialResponse = authenticator.initialResponse();
        if (null == initialResponse) {
            initialResponse = EMPTY_BYTE_ARRAY;
        }
        try {
            Future authResponseFuture = this.write(new Requests.AuthResponse(initialResponse));
            return GuavaCompatibility.INSTANCE.transformAsync(authResponseFuture, this.onV2AuthResponse(authenticator, protocolVersion, executor), executor);
        }
        catch (Exception e) {
            return Futures.immediateFailedFuture((Throwable)e);
        }
    }

    private AsyncFunction<Message.Response, Void> onV2AuthResponse(final Authenticator authenticator, final ProtocolVersion protocolVersion, final Executor executor) {
        return new AsyncFunction<Message.Response, Void>(){

            public ListenableFuture<Void> apply(Message.Response authResponse) throws Exception {
                switch (authResponse.type) {
                    case AUTH_SUCCESS: {
                        logger.trace("{} Authentication complete", (Object)this);
                        authenticator.onAuthenticationSuccess(((Responses.AuthSuccess)authResponse).token);
                        return Connection.this.checkClusterName(protocolVersion, executor);
                    }
                    case AUTH_CHALLENGE: {
                        byte[] responseToServer = authenticator.evaluateChallenge(((Responses.AuthChallenge)authResponse).token);
                        if (responseToServer == null) {
                            logger.trace("{} Authentication complete (No response to server)", (Object)this);
                            return Connection.this.checkClusterName(protocolVersion, executor);
                        }
                        logger.trace("{} Sending Auth response to challenge", (Object)this);
                        Future nextResponseFuture = Connection.this.write(new Requests.AuthResponse(responseToServer));
                        return GuavaCompatibility.INSTANCE.transformAsync(nextResponseFuture, Connection.this.onV2AuthResponse(authenticator, protocolVersion, executor), executor);
                    }
                    case ERROR: {
                        String message = ((Responses.Error)authResponse).message;
                        if (message.startsWith("java.lang.ArrayIndexOutOfBoundsException: 15")) {
                            message = String.format("Cannot use authenticator %s with protocol version 1, only plain text authentication is supported with this protocol version", authenticator);
                        }
                        Connection.this.incrementAuthErrorMetric();
                        throw new AuthenticationException(Connection.this.endPoint, message);
                    }
                }
                throw new TransportException(Connection.this.endPoint, String.format("Unexpected %s response message from server to authentication message", new Object[]{authResponse.type}));
            }
        };
    }

    private void incrementAuthErrorMetric() {
        if (this.factory.manager.configuration.getMetricsOptions().isEnabled()) {
            this.factory.manager.metrics.getErrorMetrics().getAuthenticationErrors().inc();
        }
    }

    private boolean isUnsupportedProtocolVersion(Responses.Error error) {
        return !(error.code != ExceptionCode.PROTOCOL_ERROR && error.code != ExceptionCode.SERVER_ERROR || !error.message.contains("Invalid or unsupported protocol version") && !error.message.contains("Beta version of the protocol used"));
    }

    private UnsupportedProtocolVersionException unsupportedProtocolVersionException(ProtocolVersion triedVersion, ProtocolVersion serverProtocolVersion) {
        UnsupportedProtocolVersionException e = new UnsupportedProtocolVersionException(this.endPoint, triedVersion, serverProtocolVersion);
        logger.debug(e.getMessage());
        return e;
    }

    boolean isDefunct() {
        return this.isDefunct.get();
    }

    int maxAvailableStreams() {
        return this.dispatcher.streamIdHandler.maxAvailableStreams();
    }

    <E extends Throwable> E defunct(E e) {
        if (this.isDefunct.compareAndSet(false, true)) {
            if (Host.statesLogger.isTraceEnabled()) {
                Host.statesLogger.trace("Defuncting " + this, e);
            } else if (Host.statesLogger.isDebugEnabled()) {
                Host.statesLogger.debug("Defuncting {} because: {}", (Object)this, (Object)e.getMessage());
            }
            Host host = this.getHost();
            if (host != null) {
                boolean decrement = this.signaled.compareAndSet(false, true);
                boolean hostDown = host.convictionPolicy.signalConnectionFailure(this, decrement);
                if (hostDown) {
                    this.factory.manager.signalHostDown(host, host.wasJustAdded());
                } else {
                    this.notifyOwnerWhenDefunct();
                }
            }
            this.closeAsync().force();
        }
        return e;
    }

    private void notifyOwnerWhenDefunct() {
        if (!this.isInitialized) {
            return;
        }
        Owner owner = this.ownerRef.get();
        if (owner != null) {
            owner.onConnectionDefunct(this);
        }
    }

    String keyspace() {
        return this.targetKeyspace.get().keyspace;
    }

    void setKeyspace(String keyspace) throws ConnectionException {
        if (keyspace == null) {
            return;
        }
        if (MoreObjects.equal(this.keyspace(), keyspace)) {
            return;
        }
        try {
            Uninterruptibles.getUninterruptibly(this.setKeyspaceAsync(keyspace));
        }
        catch (ConnectionException e) {
            throw this.defunct(e);
        }
        catch (BusyConnectionException e) {
            logger.warn("Tried to set the keyspace on busy {}. This should not happen but is not critical (it will be retried)", (Object)this);
            throw new ConnectionException(this.endPoint, "Tried to set the keyspace on busy connection");
        }
        catch (ExecutionException e) {
            Throwable cause = e.getCause();
            if (cause instanceof OperationTimedOutException) {
                logger.warn("Timeout while setting keyspace on {}. This should not happen but is not critical (it will be retried)", (Object)this);
                throw new ConnectionException(this.endPoint, "Timeout while setting keyspace on connection");
            }
            throw this.defunct(new ConnectionException(this.endPoint, "Error while setting keyspace", cause));
        }
    }

    ListenableFuture<Connection> setKeyspaceAsync(final String keyspace) throws ConnectionException, BusyConnectionException {
        SetKeyspaceAttempt existingAttempt = this.targetKeyspace.get();
        if (MoreObjects.equal(existingAttempt.keyspace, keyspace)) {
            return existingAttempt.future;
        }
        final SettableFuture ksFuture = SettableFuture.create();
        final SetKeyspaceAttempt attempt = new SetKeyspaceAttempt(keyspace, (ListenableFuture<Connection>)ksFuture);
        do {
            if (attempt.equals(existingAttempt = this.targetKeyspace.get())) {
                return existingAttempt.future;
            }
            if (existingAttempt.future.isDone()) continue;
            ksFuture.setException((Throwable)new DriverException("Aborting attempt to set keyspace to '" + keyspace + "' since there is already an in flight attempt to set keyspace to '" + existingAttempt.keyspace + "'.  This can happen if you try to USE different keyspaces from the same session simultaneously."));
            return ksFuture;
        } while (!this.targetKeyspace.compareAndSet(existingAttempt, attempt));
        logger.debug("{} Setting keyspace {}", (Object)this, (Object)keyspace);
        Future future = this.write(new Requests.Query("USE \"" + keyspace + '\"'));
        GuavaCompatibility.INSTANCE.addCallback(future, new FutureCallback<Message.Response>(){

            public void onSuccess(Message.Response response) {
                if (response instanceof Responses.Result.SetKeyspace) {
                    logger.debug("{} Keyspace set to {}", (Object)Connection.this, (Object)keyspace);
                    ksFuture.set((Object)Connection.this);
                } else {
                    Connection.this.targetKeyspace.compareAndSet(attempt, Connection.this.defaultKeyspaceAttempt);
                    if (response.type == Message.Response.Type.ERROR) {
                        Responses.Error error = (Responses.Error)response;
                        ksFuture.setException((Throwable)Connection.this.defunct(error.asException(Connection.this.endPoint)));
                    } else {
                        ksFuture.setException((Throwable)Connection.this.defunct(new DriverInternalError("Unexpected response while setting keyspace: " + response)));
                    }
                }
            }

            public void onFailure(Throwable t) {
                Connection.this.targetKeyspace.compareAndSet(attempt, Connection.this.defaultKeyspaceAttempt);
                ksFuture.setException(t);
            }
        }, this.factory.manager.configuration.getPoolingOptions().getInitializationExecutor());
        return ksFuture;
    }

    Future write(Message.Request request) throws ConnectionException, BusyConnectionException {
        Future future = new Future(request);
        this.write(future);
        return future;
    }

    ResponseHandler write(ResponseCallback callback) throws ConnectionException, BusyConnectionException {
        return this.write(callback, -1L, true);
    }

    ResponseHandler write(ResponseCallback callback, long statementReadTimeoutMillis, boolean startTimeout) throws ConnectionException, BusyConnectionException {
        ResponseHandler handler = new ResponseHandler(this, statementReadTimeoutMillis, callback);
        this.dispatcher.add(handler);
        Message.Request request = callback.request().setStreamId(handler.streamId);
        if (this.isDefunct.get()) {
            this.dispatcher.removeHandler(handler, true);
            throw new ConnectionException(this.endPoint, "Write attempt on defunct connection");
        }
        if (this.isClosed()) {
            this.dispatcher.removeHandler(handler, true);
            throw new ConnectionException(this.endPoint, "Connection has been closed");
        }
        logger.trace("{}, stream {}, writing request {}", new Object[]{this, request.getStreamId(), request});
        this.writer.incrementAndGet();
        if (DISABLE_COALESCING) {
            this.channel.writeAndFlush((Object)request).addListener((GenericFutureListener)this.writeHandler(request, handler));
        } else {
            this.flush(new FlushItem(this.channel, request, this.writeHandler(request, handler)));
        }
        if (startTimeout) {
            handler.startTimeout();
        }
        return handler;
    }

    private ChannelFutureListener writeHandler(final Message.Request request, final ResponseHandler handler) {
        return new ChannelFutureListener(){

            public void operationComplete(ChannelFuture writeFuture) {
                Connection.this.writer.decrementAndGet();
                if (!writeFuture.isSuccess()) {
                    logger.debug("{}, stream {}, Error writing request {}", new Object[]{Connection.this, request.getStreamId(), request});
                    Connection.this.dispatcher.removeHandler(handler, true);
                    final TransportException ce = writeFuture.cause() instanceof ClosedChannelException ? new TransportException(Connection.this.endPoint, "Error writing: Closed channel") : new TransportException(Connection.this.endPoint, "Error writing", writeFuture.cause());
                    final long latency = System.nanoTime() - handler.startTime;
                    ListeningExecutorService executor = ((Connection)Connection.this).factory.manager.executor;
                    if (!executor.isShutdown()) {
                        executor.execute(new Runnable(){

                            @Override
                            public void run() {
                                handler.callback.onException(Connection.this, Connection.this.defunct(ce), latency, handler.retryCount);
                            }
                        });
                    }
                } else {
                    logger.trace("{}, stream {}, request sent successfully", (Object)Connection.this, (Object)request.getStreamId());
                }
            }
        };
    }

    boolean hasOwner() {
        return this.ownerRef.get() != null;
    }

    boolean setOwner(Owner owner) {
        return this.ownerRef.compareAndSet(null, owner);
    }

    public int shardId() {
        return this.shardId == null || this.getHost().getShardingInfo() == null ? 0 : this.shardId;
    }

    void release(boolean busy) {
        Owner owner = this.ownerRef.get();
        if (owner instanceof HostConnectionPool) {
            ((HostConnectionPool)owner).returnConnection(this, busy);
        }
    }

    void release() {
        this.release(false);
    }

    boolean isClosed() {
        return this.closeFuture.get() != null;
    }

    CloseFuture closeAsync() {
        boolean terminated;
        Host host;
        ConnectionCloseFuture future = new ConnectionCloseFuture();
        if (!this.closeFuture.compareAndSet(null, future)) {
            return this.closeFuture.get();
        }
        logger.debug("{} closing connection", (Object)this);
        if (this.signaled.compareAndSet(false, true) && (host = this.getHost()) != null) {
            host.convictionPolicy.signalConnectionClosed(this);
        }
        if (!(terminated = this.tryTerminate(false))) {
            long terminateTime = System.currentTimeMillis() + 2L * this.factory.getReadTimeoutMillis();
            this.factory.reaper.register(this, terminateTime);
        }
        return future;
    }

    private Host getHost() {
        Metadata metadata = this.factory.manager.metadata;
        Host host = metadata.getHost(this.endPoint);
        if (host == null) {
            host = metadata.getContactPoint(this.endPoint);
        }
        return host;
    }

    boolean tryTerminate(boolean force) {
        assert (this.isClosed());
        ConnectionCloseFuture future = this.closeFuture.get();
        if (future.isDone()) {
            logger.debug("{} has already terminated", (Object)this);
            return true;
        }
        if (force || this.dispatcher.pending.isEmpty()) {
            if (force) {
                logger.warn("Forcing termination of {}. This should not happen and is likely a bug, please report.", (Object)this);
            }
            future.force();
            return true;
        }
        logger.debug("Not terminating {}: there are still pending requests", (Object)this);
        return false;
    }

    public String toString() {
        return String.format("Connection[%s, inFlight=%d, closed=%b]", this.name, this.inFlight.get(), this.isClosed());
    }

    private void flush(FlushItem item) {
        Flusher alt;
        EventLoop loop = item.channel.eventLoop();
        Flusher flusher = (Flusher)flusherLookup.get(loop);
        if (flusher == null && (alt = flusherLookup.putIfAbsent(loop, flusher = new Flusher(loop))) != null) {
            flusher = alt;
        }
        flusher.queued.add(item);
        flusher.start();
    }

    static interface Owner {
        public void onConnectionDefunct(Connection var1);
    }

    private static class Initializer
    extends ChannelInitializer<SocketChannel> {
        private static final Message.ProtocolDecoder messageDecoder = new Message.ProtocolDecoder();
        private static final Message.ProtocolEncoder messageEncoderV1 = new Message.ProtocolEncoder(ProtocolVersion.V1);
        private static final Message.ProtocolEncoder messageEncoderV2 = new Message.ProtocolEncoder(ProtocolVersion.V2);
        private static final Message.ProtocolEncoder messageEncoderV3 = new Message.ProtocolEncoder(ProtocolVersion.V3);
        private static final Message.ProtocolEncoder messageEncoderV4 = new Message.ProtocolEncoder(ProtocolVersion.V4);
        private static final Message.ProtocolEncoder messageEncoderV5 = new Message.ProtocolEncoder(ProtocolVersion.V5);
        private static final Message.ProtocolEncoder messageEncoderV6 = new Message.ProtocolEncoder(ProtocolVersion.V6);
        private static final Frame.Encoder frameEncoder = new Frame.Encoder();
        private final ProtocolVersion protocolVersion;
        private final Connection connection;
        private final FrameCompressor compressor;
        private final SSLOptions sslOptions;
        private final NettyOptions nettyOptions;
        private final ChannelHandler idleStateHandler;
        private final CodecRegistry codecRegistry;
        private final Metrics metrics;

        Initializer(Connection connection, ProtocolVersion protocolVersion, FrameCompressor compressor, SSLOptions sslOptions, int heartBeatIntervalSeconds, NettyOptions nettyOptions, CodecRegistry codecRegistry, Metrics metrics) {
            this.connection = connection;
            this.protocolVersion = protocolVersion;
            this.compressor = compressor;
            this.sslOptions = sslOptions;
            this.nettyOptions = nettyOptions;
            this.codecRegistry = codecRegistry;
            this.idleStateHandler = new IdleStateHandler(heartBeatIntervalSeconds, 0, 0);
            this.metrics = metrics;
        }

        protected void initChannel(SocketChannel channel) throws Exception {
            channel.attr(Message.CODEC_REGISTRY_ATTRIBUTE_KEY).set((Object)this.codecRegistry);
            ChannelPipeline pipeline = channel.pipeline();
            if (this.sslOptions != null) {
                SslHandler handler = this.sslOptions instanceof ExtendedRemoteEndpointAwareSslOptions ? ((ExtendedRemoteEndpointAwareSslOptions)this.sslOptions).newSSLHandler(channel, this.connection.endPoint) : (this.sslOptions instanceof RemoteEndpointAwareSSLOptions ? ((RemoteEndpointAwareSSLOptions)this.sslOptions).newSSLHandler(channel, this.connection.endPoint.resolve()) : this.sslOptions.newSSLHandler(channel));
                pipeline.addLast("ssl", (ChannelHandler)handler);
            }
            if (this.metrics != null) {
                pipeline.addLast("inboundTrafficMeter", (ChannelHandler)new InboundTrafficMeter(this.metrics.getBytesReceived()));
                pipeline.addLast("outboundTrafficMeter", (ChannelHandler)new OutboundTrafficMeter(this.metrics.getBytesSent()));
            }
            pipeline.addLast("frameDecoder", (ChannelHandler)new Frame.Decoder());
            pipeline.addLast("frameEncoder", (ChannelHandler)frameEncoder);
            pipeline.addLast("framingFormatHandler", (ChannelHandler)new FramingFormatHandler(this.connection.factory));
            if (this.compressor != null && this.protocolVersion.compareTo(ProtocolVersion.V5) < 0) {
                pipeline.addLast("frameDecompressor", (ChannelHandler)new Frame.Decompressor(this.compressor));
                pipeline.addLast("frameCompressor", (ChannelHandler)new Frame.Compressor(this.compressor));
            }
            pipeline.addLast("messageDecoder", (ChannelHandler)messageDecoder);
            pipeline.addLast("messageEncoder", (ChannelHandler)this.messageEncoderFor(this.protocolVersion));
            pipeline.addLast("idleStateHandler", this.idleStateHandler);
            pipeline.addLast("dispatcher", (ChannelHandler)this.connection.dispatcher);
            this.nettyOptions.afterChannelInitialized(channel);
        }

        private Message.ProtocolEncoder messageEncoderFor(ProtocolVersion version) {
            switch (version) {
                case V1: {
                    return messageEncoderV1;
                }
                case V2: {
                    return messageEncoderV2;
                }
                case V3: {
                    return messageEncoderV3;
                }
                case V4: {
                    return messageEncoderV4;
                }
                case V5: {
                    return messageEncoderV5;
                }
                case V6: {
                    return messageEncoderV6;
                }
            }
            throw new DriverInternalError("Unsupported protocol version " + (Object)((Object)this.protocolVersion));
        }
    }

    static interface DefaultResponseHandler {
        public void handle(Message.Response var1);
    }

    static class ResponseHandler {
        final Connection connection;
        final int streamId;
        final ResponseCallback callback;
        final int retryCount;
        private final long readTimeoutMillis;
        private final long startTime;
        private volatile Timeout timeout;
        private final AtomicBoolean isCancelled = new AtomicBoolean();

        ResponseHandler(Connection connection, long statementReadTimeoutMillis, ResponseCallback callback) throws BusyConnectionException {
            this.connection = connection;
            this.readTimeoutMillis = statementReadTimeoutMillis >= 0L ? statementReadTimeoutMillis : connection.factory.getReadTimeoutMillis();
            this.streamId = connection.dispatcher.streamIdHandler.next();
            if (this.streamId == -1) {
                throw new BusyConnectionException(connection.endPoint);
            }
            this.callback = callback;
            this.retryCount = callback.retryCount();
            this.startTime = System.nanoTime();
        }

        void startTimeout() {
            this.timeout = this.readTimeoutMillis <= 0L ? null : ((Connection)this.connection).factory.timer.newTimeout(this.onTimeoutTask(), this.readTimeoutMillis, TimeUnit.MILLISECONDS);
        }

        void cancelTimeout() {
            if (this.timeout != null) {
                this.timeout.cancel();
            }
        }

        boolean cancelHandler() {
            if (!this.isCancelled.compareAndSet(false, true)) {
                return false;
            }
            this.connection.dispatcher.removeHandler(this, false);
            return true;
        }

        private TimerTask onTimeoutTask() {
            return new TimerTask(){

                public void run(Timeout timeout) {
                    if (ResponseHandler.this.callback.onTimeout(ResponseHandler.this.connection, System.nanoTime() - ResponseHandler.this.startTime, ResponseHandler.this.retryCount)) {
                        ResponseHandler.this.cancelHandler();
                    }
                }
            };
        }
    }

    static interface ResponseCallback {
        public Message.Request request();

        public int retryCount();

        public void onSet(Connection var1, Message.Response var2, long var3, int var5);

        public void onException(Connection var1, Exception var2, long var3, int var5);

        public boolean onTimeout(Connection var1, long var2, int var4);
    }

    static class Future
    extends AbstractFuture<Message.Response>
    implements RequestHandler.Callback {
        private final Message.Request request;
        private volatile EndPoint endPoint;
        private volatile Host host;

        Future(Message.Request request) {
            this.request = request;
        }

        public Host getHost() {
            return this.host;
        }

        @Override
        public void register(RequestHandler handler) {
        }

        @Override
        public Message.Request request() {
            return this.request;
        }

        @Override
        public int retryCount() {
            return 0;
        }

        @Override
        public void onSet(Connection connection, Message.Response response, ExecutionInfo info, Statement statement, long latency) {
            this.onSet(connection, response, latency, 0);
        }

        @Override
        public void onSet(Connection connection, Message.Response response, long latency, int retryCount) {
            this.endPoint = connection.endPoint;
            this.host = connection.getHost();
            super.set((Object)response);
        }

        @Override
        public void onException(Connection connection, Exception exception, long latency, int retryCount) {
            if (connection != null) {
                this.endPoint = connection.endPoint;
                this.host = connection.getHost();
            }
            super.setException((Throwable)exception);
        }

        @Override
        public boolean onTimeout(Connection connection, long latency, int retryCount) {
            assert (connection != null);
            this.endPoint = connection.endPoint;
            this.host = connection.getHost();
            return super.setException((Throwable)new OperationTimedOutException(connection.endPoint));
        }

        EndPoint getEndPoint() {
            return this.endPoint;
        }
    }

    private class SetKeyspaceAttempt {
        private final String keyspace;
        private final ListenableFuture<Connection> future;

        SetKeyspaceAttempt(String keyspace, ListenableFuture<Connection> future) {
            this.keyspace = keyspace;
            this.future = future;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (!(o instanceof SetKeyspaceAttempt)) {
                return false;
            }
            SetKeyspaceAttempt that = (SetKeyspaceAttempt)o;
            return this.keyspace != null ? this.keyspace.equals(that.keyspace) : that.keyspace == null;
        }

        public int hashCode() {
            return this.keyspace != null ? this.keyspace.hashCode() : 0;
        }
    }

    private class ConnectionCloseFuture
    extends CloseFuture {
        private ConnectionCloseFuture() {
        }

        @Override
        public ConnectionCloseFuture force() {
            if (Connection.this.channel == null) {
                this.set(null);
                return this;
            }
            Connection.this.dispatcher.errorOutAllHandler(new TransportException(Connection.this.endPoint, "Connection has been closed"));
            ChannelFuture future = Connection.this.channel.close();
            future.addListener((GenericFutureListener)new ChannelFutureListener(){

                public void operationComplete(ChannelFuture future) {
                    Connection.this.factory.allChannels.remove((Object)Connection.this.channel);
                    if (future.cause() != null) {
                        logger.warn("Error closing channel", future.cause());
                        ConnectionCloseFuture.this.setException(future.cause());
                    } else {
                        ConnectionCloseFuture.this.set(null);
                    }
                }
            });
            return this;
        }
    }

    private class ChannelCloseListener
    implements ChannelFutureListener {
        private ChannelCloseListener() {
        }

        public void operationComplete(ChannelFuture future) throws Exception {
            if (!Connection.this.isInitialized || Connection.this.isClosed()) {
                Connection.this.dispatcher.errorOutAllHandler(new TransportException(Connection.this.endPoint, "Channel has been closed"));
                Connection.this.closeAsync().force();
            } else {
                Connection.this.defunct(new TransportException(Connection.this.endPoint, "Channel has been closed"));
            }
        }
    }

    @ChannelHandler.Sharable
    class Dispatcher
    extends SimpleChannelInboundHandler<Message.Response> {
        final StreamIdGenerator streamIdHandler;
        private final ConcurrentMap<Integer, ResponseHandler> pending = new ConcurrentHashMap<Integer, ResponseHandler>();

        Dispatcher() {
            ProtocolVersion protocolVersion = ((Connection)Connection.this).factory.protocolVersion;
            if (protocolVersion == null) {
                protocolVersion = ProtocolVersion.V2;
            }
            this.streamIdHandler = StreamIdGenerator.newInstance(protocolVersion);
        }

        void add(ResponseHandler handler) {
            ResponseHandler old = this.pending.put(handler.streamId, handler);
            assert (old == null);
        }

        void removeHandler(ResponseHandler handler, boolean releaseStreamId) {
            boolean removed;
            if (!releaseStreamId) {
                this.streamIdHandler.mark(handler.streamId);
            }
            if (!(removed = this.pending.remove(handler.streamId, handler))) {
                if (!releaseStreamId) {
                    this.streamIdHandler.unmark(handler.streamId);
                }
                return;
            }
            handler.cancelTimeout();
            if (releaseStreamId) {
                this.streamIdHandler.release(handler.streamId);
            }
            if (Connection.this.isClosed()) {
                Connection.this.tryTerminate(false);
            }
        }

        protected void channelRead0(ChannelHandlerContext ctx, Message.Response response) throws Exception {
            int streamId = response.getStreamId();
            if (logger.isTraceEnabled()) {
                logger.trace("{}, stream {}, received: {}", new Object[]{Connection.this, streamId, this.asDebugString(response)});
            }
            if (streamId < 0) {
                ((Connection)Connection.this).factory.defaultHandler.handle(response);
                return;
            }
            ResponseHandler handler = (ResponseHandler)this.pending.remove(streamId);
            this.streamIdHandler.release(streamId);
            if (handler == null) {
                this.streamIdHandler.unmark(streamId);
                if (logger.isDebugEnabled()) {
                    logger.debug("{} Response received on stream {} but no handler set anymore (either the request has timed out or it was closed due to another error). Received message is {}", new Object[]{Connection.this, streamId, this.asDebugString(response)});
                }
                return;
            }
            handler.cancelTimeout();
            handler.callback.onSet(Connection.this, response, System.nanoTime() - handler.startTime, handler.retryCount);
            if (Connection.this.isClosed()) {
                Connection.this.tryTerminate(false);
            }
        }

        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            if (Connection.this.isInitialized && !Connection.this.isClosed() && evt instanceof IdleStateEvent && ((IdleStateEvent)evt).state() == IdleState.READER_IDLE) {
                logger.debug("{} was inactive for {} seconds, sending heartbeat", (Object)Connection.this, (Object)((Connection)Connection.this).factory.configuration.getPoolingOptions().getHeartbeatIntervalSeconds());
                Connection.this.write(HEARTBEAT_CALLBACK);
            }
        }

        private String asDebugString(Object obj) {
            if (obj == null) {
                return "null";
            }
            String msg = obj.toString();
            if (msg.length() < 500) {
                return msg;
            }
            return msg.substring(0, 500) + "... [message of size " + msg.length() + " truncated]";
        }

        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            if (logger.isDebugEnabled()) {
                logger.debug(String.format("%s connection error", Connection.this), cause);
            }
            if (Connection.this.writer.get() > 0) {
                return;
            }
            if (cause instanceof DecoderException) {
                Throwable error = cause.getCause();
                if (error instanceof FrameTooLongException) {
                    FrameTooLongException ftle = (FrameTooLongException)error;
                    int streamId = ftle.getStreamId();
                    ResponseHandler handler = (ResponseHandler)this.pending.remove(streamId);
                    this.streamIdHandler.release(streamId);
                    if (handler == null) {
                        this.streamIdHandler.unmark(streamId);
                        if (logger.isDebugEnabled()) {
                            logger.debug("{} FrameTooLongException received on stream {} but no handler set anymore (either the request has timed out or it was closed due to another error).", (Object)Connection.this, (Object)streamId);
                        }
                        return;
                    }
                    handler.cancelTimeout();
                    handler.callback.onException(Connection.this, ftle, System.nanoTime() - handler.startTime, handler.retryCount);
                    return;
                }
                if (error instanceof CrcMismatchException) {
                    logger.warn("CRC mismatch while decoding a response, dropping the connection", error);
                }
            }
            Connection.this.defunct(new TransportException(Connection.this.endPoint, String.format("Unexpected exception triggered (%s)", cause), cause));
        }

        void errorOutAllHandler(ConnectionException ce) {
            Iterator iter = this.pending.values().iterator();
            while (iter.hasNext()) {
                ResponseHandler handler = (ResponseHandler)iter.next();
                handler.cancelTimeout();
                handler.callback.onException(Connection.this, ce, System.nanoTime() - handler.startTime, handler.retryCount);
                iter.remove();
            }
        }
    }

    private static class FlushItem {
        final Channel channel;
        final Object request;
        final ChannelFutureListener listener;

        private FlushItem(Channel channel, Object request, ChannelFutureListener listener) {
            this.channel = channel;
            this.request = request;
            this.listener = listener;
        }
    }

    private static final class Flusher
    implements Runnable {
        final WeakReference<EventLoop> eventLoopRef;
        final Queue<FlushItem> queued = new ConcurrentLinkedQueue<FlushItem>();
        final AtomicBoolean running = new AtomicBoolean(false);
        final HashSet<Channel> channels = new HashSet();

        private Flusher(EventLoop eventLoop) {
            this.eventLoopRef = new WeakReference<EventLoop>(eventLoop);
        }

        void start() {
            EventLoop eventLoop;
            if (!this.running.get() && this.running.compareAndSet(false, true) && (eventLoop = (EventLoop)this.eventLoopRef.get()) != null) {
                eventLoop.execute((Runnable)this);
            }
        }

        @Override
        public void run() {
            FlushItem flush;
            while (null != (flush = this.queued.poll())) {
                Channel channel = flush.channel;
                if (!channel.isActive()) continue;
                this.channels.add(channel);
                channel.write(flush.request).addListener((GenericFutureListener)flush.listener);
            }
            for (Channel channel : this.channels) {
                channel.flush();
            }
            this.channels.clear();
            this.running.set(false);
            if (this.queued.isEmpty() || !this.running.compareAndSet(false, true)) {
                return;
            }
            EventLoop eventLoop = (EventLoop)this.eventLoopRef.get();
            if (eventLoop != null && !eventLoop.isShuttingDown()) {
                if (FLUSHER_SCHEDULE_PERIOD_NS > 0) {
                    eventLoop.schedule((Runnable)this, (long)FLUSHER_SCHEDULE_PERIOD_NS, TimeUnit.NANOSECONDS);
                } else {
                    eventLoop.execute((Runnable)this);
                }
            }
        }
    }

    static class Factory {
        final Timer timer;
        final EventLoopGroup eventLoopGroup;
        private final Class<? extends Channel> channelClass;
        private final ChannelGroup allChannels = new DefaultChannelGroup((EventExecutor)GlobalEventExecutor.INSTANCE);
        private final ConcurrentMap<Host, AtomicInteger> idGenerators = new ConcurrentHashMap<Host, AtomicInteger>();
        final DefaultResponseHandler defaultHandler;
        final Cluster.Manager manager;
        final Cluster.ConnectionReaper reaper;
        final Configuration configuration;
        final AuthProvider authProvider;
        private volatile boolean isShutdown;
        volatile ProtocolVersion protocolVersion;
        private final NettyOptions nettyOptions;

        Factory(Cluster.Manager manager, Configuration configuration) {
            this.defaultHandler = manager;
            this.manager = manager;
            this.reaper = manager.reaper;
            this.configuration = configuration;
            this.authProvider = configuration.getProtocolOptions().getAuthProvider();
            this.protocolVersion = configuration.getProtocolOptions().initialProtocolVersion;
            this.nettyOptions = configuration.getNettyOptions();
            this.eventLoopGroup = this.nettyOptions.eventLoopGroup(manager.configuration.getThreadingOptions().createThreadFactory(manager.clusterName, "nio-worker"));
            this.channelClass = this.nettyOptions.channelClass();
            this.timer = this.nettyOptions.timer(manager.configuration.getThreadingOptions().createThreadFactory(manager.clusterName, "timeouter"));
        }

        int getPort() {
            return this.configuration.getProtocolOptions().getPort();
        }

        Connection open(Host host) throws ConnectionException, InterruptedException, UnsupportedProtocolVersionException, ClusterNameMismatchException {
            EndPoint endPoint = host.getEndPoint();
            if (this.isShutdown) {
                throw new ConnectionException(endPoint, "Connection factory is shut down");
            }
            host.convictionPolicy.signalConnectionsOpening(1);
            Connection connection = new Connection(this.buildConnectionName(host), endPoint, this);
            try {
                connection.initAsync().get();
                return connection;
            }
            catch (ExecutionException e) {
                throw Factory.launderAsyncInitException(e);
            }
        }

        Connection open(HostConnectionPool pool) throws ConnectionException, InterruptedException, UnsupportedProtocolVersionException, ClusterNameMismatchException {
            return this.open(pool, -1, 0);
        }

        Connection open(HostConnectionPool pool, int shardId, int serverPort) throws ConnectionException, InterruptedException, UnsupportedProtocolVersionException, ClusterNameMismatchException {
            pool.host.convictionPolicy.signalConnectionsOpening(1);
            Connection connection = new Connection(this.buildConnectionName(pool.host), pool.host.getEndPoint(), this, pool);
            try {
                connection.initAsync(shardId, serverPort).get();
                return connection;
            }
            catch (ExecutionException e) {
                throw Factory.launderAsyncInitException(e);
            }
        }

        List<Connection> newConnections(HostConnectionPool pool, int count) {
            pool.host.convictionPolicy.signalConnectionsOpening(count);
            ArrayList connections = Lists.newArrayListWithCapacity((int)count);
            for (int i = 0; i < count; ++i) {
                connections.add(new Connection(this.buildConnectionName(pool.host), pool.host.getEndPoint(), this, pool));
            }
            return connections;
        }

        private String buildConnectionName(Host host) {
            return host.getEndPoint().toString() + '-' + this.getIdGenerator(host).getAndIncrement();
        }

        static RuntimeException launderAsyncInitException(ExecutionException e) throws ConnectionException, InterruptedException, UnsupportedProtocolVersionException, ClusterNameMismatchException {
            Throwable t = e.getCause();
            if (t instanceof ConnectionException) {
                throw (ConnectionException)t;
            }
            if (t instanceof InterruptedException) {
                throw (InterruptedException)t;
            }
            if (t instanceof UnsupportedProtocolVersionException) {
                throw (UnsupportedProtocolVersionException)t;
            }
            if (t instanceof ClusterNameMismatchException) {
                throw (ClusterNameMismatchException)t;
            }
            if (t instanceof DriverException) {
                throw (DriverException)t;
            }
            if (t instanceof Error) {
                throw (Error)t;
            }
            return new RuntimeException("Unexpected exception during connection initialization", t);
        }

        private AtomicInteger getIdGenerator(Host host) {
            AtomicInteger old;
            AtomicInteger g = (AtomicInteger)this.idGenerators.get(host);
            if (g == null && (old = this.idGenerators.putIfAbsent(host, g = new AtomicInteger(1))) != null) {
                g = old;
            }
            return g;
        }

        long getReadTimeoutMillis() {
            return this.configuration.getSocketOptions().getReadTimeoutMillis();
        }

        private Bootstrap newBootstrap() {
            Integer sendBufferSize;
            Integer receiveBufferSize;
            Boolean tcpNoDelay;
            Integer soLinger;
            Boolean reuseAddress;
            Bootstrap b = new Bootstrap();
            ((Bootstrap)b.group(this.eventLoopGroup)).channel(this.channelClass);
            SocketOptions options = this.configuration.getSocketOptions();
            b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (Object)options.getConnectTimeoutMillis());
            Boolean keepAlive = options.getKeepAlive();
            if (keepAlive != null) {
                b.option(ChannelOption.SO_KEEPALIVE, (Object)keepAlive);
            }
            if ((reuseAddress = options.getReuseAddress()) != null) {
                b.option(ChannelOption.SO_REUSEADDR, (Object)reuseAddress);
            }
            if ((soLinger = options.getSoLinger()) != null) {
                b.option(ChannelOption.SO_LINGER, (Object)soLinger);
            }
            if ((tcpNoDelay = options.getTcpNoDelay()) != null) {
                b.option(ChannelOption.TCP_NODELAY, (Object)tcpNoDelay);
            }
            if ((receiveBufferSize = options.getReceiveBufferSize()) != null) {
                b.option(ChannelOption.SO_RCVBUF, (Object)receiveBufferSize);
            }
            if ((sendBufferSize = options.getSendBufferSize()) != null) {
                b.option(ChannelOption.SO_SNDBUF, (Object)sendBufferSize);
            }
            this.nettyOptions.afterBootstrapInitialized(b);
            return b;
        }

        void shutdown() {
            this.isShutdown = true;
            this.allChannels.close().awaitUninterruptibly();
            this.nettyOptions.onClusterClose(this.eventLoopGroup);
            this.nettyOptions.onClusterClose(this.timer);
        }
    }

    static class PortAllocator {
        private static final AtomicInteger lastPort = new AtomicInteger(-1);

        PortAllocator() {
        }

        public static int getNextAvailablePort(int shardCount, int shardId, int lowPort, int highPort) {
            int lastPortValue;
            int foundPort = -1;
            do {
                int port;
                int scanStart;
                int n = scanStart = (lastPortValue = lastPort.get()) == -1 ? lowPort : lastPortValue;
                if (scanStart < lowPort) {
                    scanStart = lowPort;
                }
                scanStart += shardCount - scanStart % shardCount + shardId;
                for (port = scanStart; port <= highPort; port += shardCount) {
                    if (!PortAllocator.isTcpPortAvailable(port)) continue;
                    foundPort = port;
                    break;
                }
                if (foundPort == -1) {
                    for (port = scanStart = lowPort + (shardCount - lowPort % shardCount) + shardId; port <= highPort; port += shardCount) {
                        if (!PortAllocator.isTcpPortAvailable(port)) continue;
                        foundPort = port;
                        break;
                    }
                }
                if (foundPort != -1) continue;
                return -1;
            } while (!lastPort.compareAndSet(lastPortValue, foundPort));
            return foundPort;
        }

        public static boolean isTcpPortAvailable(int port) {
            boolean bl;
            ServerSocket serverSocket = new ServerSocket();
            try {
                serverSocket.setReuseAddress(false);
                serverSocket.bind(new InetSocketAddress(port), 1);
                bl = true;
            }
            catch (Throwable throwable) {
                try {
                    serverSocket.close();
                    throw throwable;
                }
                catch (IOException ex) {
                    return false;
                }
            }
            serverSocket.close();
            return bl;
        }
    }

    static enum State {
        OPEN,
        TRASHED,
        RESURRECTING,
        GONE;

    }
}

