/*
 * Decompiled with CFR 0.152.
 */
package com.datastax.driver.core;

import com.datastax.driver.core.AbstractReconnectionHandler;
import com.datastax.driver.core.BusyConnectionException;
import com.datastax.driver.core.CloseFuture;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ClusterNameMismatchException;
import com.datastax.driver.core.Connection;
import com.datastax.driver.core.ConnectionException;
import com.datastax.driver.core.DefaultResultSetFuture;
import com.datastax.driver.core.Host;
import com.datastax.driver.core.Metadata;
import com.datastax.driver.core.ProtocolEvent;
import com.datastax.driver.core.ProtocolVersion;
import com.datastax.driver.core.Requests;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.SchemaElement;
import com.datastax.driver.core.Statement;
import com.datastax.driver.core.UnsupportedProtocolVersionException;
import com.datastax.driver.core.VersionNumber;
import com.datastax.driver.core.exceptions.DriverException;
import com.datastax.driver.core.exceptions.DriverInternalError;
import com.datastax.driver.core.exceptions.NoHostAvailableException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
import com.google.common.util.concurrent.ListenableFuture;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class ControlConnection
implements Host.StateListener,
Connection.Owner {
    private static final Logger logger = LoggerFactory.getLogger(ControlConnection.class);
    private static final InetAddress bindAllAddress;
    private static final String SELECT_KEYSPACES = "SELECT * FROM system.schema_keyspaces";
    private static final String SELECT_COLUMN_FAMILIES = "SELECT * FROM system.schema_columnfamilies";
    private static final String SELECT_COLUMNS = "SELECT * FROM system.schema_columns";
    private static final String SELECT_USERTYPES = "SELECT * FROM system.schema_usertypes";
    private static final String SELECT_PEERS = "SELECT * FROM system.peers";
    private static final String SELECT_LOCAL = "SELECT * FROM system.local WHERE key='local'";
    private static final String SELECT_SCHEMA_PEERS = "SELECT peer, rpc_address, schema_version FROM system.peers";
    private static final String SELECT_SCHEMA_LOCAL = "SELECT schema_version FROM system.local WHERE key='local'";
    @VisibleForTesting
    final AtomicReference<Connection> connectionRef = new AtomicReference();
    private final Cluster.Manager cluster;
    private final AtomicReference<ListenableFuture<?>> reconnectionAttempt = new AtomicReference();
    private volatile boolean isShutdown;

    public ControlConnection(Cluster.Manager manager) {
        this.cluster = manager;
    }

    void connect() throws UnsupportedProtocolVersionException {
        if (this.isShutdown) {
            return;
        }
        ArrayList<Host> hosts = new ArrayList<Host>(this.cluster.metadata.allHosts());
        Collections.shuffle(hosts);
        this.setNewConnection(this.reconnectInternal(hosts.iterator(), true));
    }

    CloseFuture closeAsync() {
        Connection connection;
        this.isShutdown = true;
        ListenableFuture<?> r = this.reconnectionAttempt.get();
        if (r != null) {
            r.cancel(false);
        }
        return (connection = this.connectionRef.get()) == null ? CloseFuture.immediateFuture() : connection.closeAsync();
    }

    Host connectedHost() {
        Connection current = this.connectionRef.get();
        return current == null ? null : this.cluster.metadata.getHost(current.address);
    }

    void triggerReconnect() {
        this.backgroundReconnect(0L);
    }

    private void backgroundReconnect(long initialDelayMs) {
        if (this.isShutdown) {
            return;
        }
        ListenableFuture<?> reconnection = this.reconnectionAttempt.get();
        if (reconnection != null && !reconnection.isDone()) {
            return;
        }
        new AbstractReconnectionHandler("Control connection", this.cluster.reconnectionExecutor, this.cluster.reconnectionPolicy().newSchedule(), this.reconnectionAttempt, initialDelayMs){

            @Override
            protected Connection tryReconnect() throws ConnectionException {
                if (ControlConnection.this.isShutdown) {
                    throw new ConnectionException(null, "Control connection was shut down");
                }
                try {
                    return ControlConnection.this.reconnectInternal(ControlConnection.this.queryPlan(), false);
                }
                catch (NoHostAvailableException e) {
                    throw new ConnectionException(null, e.getMessage());
                }
                catch (UnsupportedProtocolVersionException e) {
                    throw new AssertionError();
                }
            }

            @Override
            protected void onReconnection(Connection connection) {
                if (ControlConnection.this.isShutdown) {
                    connection.closeAsync();
                    return;
                }
                ControlConnection.this.setNewConnection(connection);
            }

            @Override
            protected boolean onConnectionException(ConnectionException e, long nextDelayMs) {
                if (ControlConnection.this.isShutdown) {
                    return false;
                }
                logger.error("[Control connection] Cannot connect to any host, scheduling retry in {} milliseconds", (Object)nextDelayMs);
                return true;
            }

            @Override
            protected boolean onUnknownException(Exception e, long nextDelayMs) {
                if (ControlConnection.this.isShutdown) {
                    return false;
                }
                logger.error(String.format("[Control connection] Unknown error during reconnection, scheduling retry in %d milliseconds", nextDelayMs), (Throwable)e);
                return true;
            }
        }.start();
    }

    private Iterator<Host> queryPlan() {
        return this.cluster.loadBalancingPolicy().newQueryPlan(null, Statement.DEFAULT);
    }

    private void signalError() {
        Connection connection = this.connectionRef.get();
        if (connection != null) {
            connection.closeAsync();
        }
        this.backgroundReconnect(0L);
    }

    private void setNewConnection(Connection newConnection) {
        Host.statesLogger.debug("[Control connection] established to {}", (Object)newConnection.address);
        newConnection.setOwner(this);
        Connection old = this.connectionRef.getAndSet(newConnection);
        if (old != null && !old.isClosed()) {
            old.closeAsync();
        }
    }

    private Connection reconnectInternal(Iterator<Host> iter, boolean isInitialConnection) throws UnsupportedProtocolVersionException {
        Map<InetSocketAddress, Throwable> errors = null;
        Host host = null;
        try {
            while (iter.hasNext()) {
                host = iter.next();
                if (!host.convictionPolicy.canReconnectNow()) continue;
                try {
                    return this.tryConnect(host, isInitialConnection);
                }
                catch (ConnectionException e) {
                    errors = ControlConnection.logError(host, e, errors, iter);
                    if (!isInitialConnection) continue;
                    host.setDown();
                }
                catch (ExecutionException e) {
                    errors = ControlConnection.logError(host, e.getCause(), errors, iter);
                }
                catch (UnsupportedProtocolVersionException e) {
                    if (this.cluster.protocolVersion() == null) {
                        throw e;
                    }
                    logger.debug("Ignoring host {}: {}", (Object)host, (Object)e.getMessage());
                    errors = ControlConnection.logError(host, e, errors, iter);
                }
                catch (ClusterNameMismatchException e) {
                    logger.debug("Ignoring host {}: {}", (Object)host, (Object)e.getMessage());
                    errors = ControlConnection.logError(host, e, errors, iter);
                }
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            if (host != null) {
                errors = ControlConnection.logError(host, new DriverException("Connection thread interrupted"), errors, iter);
            }
            while (iter.hasNext()) {
                errors = ControlConnection.logError(iter.next(), new DriverException("Connection thread interrupted"), errors, iter);
            }
        }
        throw new NoHostAvailableException(errors == null ? Collections.emptyMap() : errors);
    }

    private static Map<InetSocketAddress, Throwable> logError(Host host, Throwable exception, Map<InetSocketAddress, Throwable> errors, Iterator<Host> iter) {
        if (errors == null) {
            errors = new HashMap<InetSocketAddress, Throwable>();
        }
        errors.put(host.getSocketAddress(), exception);
        if (logger.isDebugEnabled()) {
            if (iter.hasNext()) {
                logger.debug(String.format("[Control connection] error on %s connection, trying next host", host), exception);
            } else {
                logger.debug(String.format("[Control connection] error on %s connection, no more host to try", host), exception);
            }
        }
        return errors;
    }

    private Connection tryConnect(Host host, boolean isInitialConnection) throws ConnectionException, ExecutionException, InterruptedException, UnsupportedProtocolVersionException, ClusterNameMismatchException {
        Connection connection = this.cluster.connectionFactory.open(host);
        if (this.cluster.connectionFactory.protocolVersion == null) {
            this.cluster.connectionFactory.protocolVersion = ProtocolVersion.NEWEST_SUPPORTED;
        }
        try {
            logger.trace("[Control connection] Registering for events");
            List<ProtocolEvent.Type> evs = Arrays.asList(ProtocolEvent.Type.TOPOLOGY_CHANGE, ProtocolEvent.Type.STATUS_CHANGE, ProtocolEvent.Type.SCHEMA_CHANGE);
            connection.write(new Requests.Register(evs));
            ControlConnection.refreshNodeListAndTokenMap(connection, this.cluster, isInitialConnection, true);
            logger.debug("[Control connection] Refreshing schema");
            ControlConnection.refreshSchema(connection, null, null, null, this.cluster);
            ControlConnection.refreshNodeListAndTokenMap(connection, this.cluster, false, true);
            return connection;
        }
        catch (BusyConnectionException e) {
            connection.closeAsync().force();
            throw new DriverInternalError("Newly created connection should not be busy");
        }
        catch (InterruptedException e) {
            connection.closeAsync().force();
            throw e;
        }
        catch (ConnectionException e) {
            connection.closeAsync().force();
            throw e;
        }
        catch (ExecutionException e) {
            connection.closeAsync().force();
            throw e;
        }
        catch (RuntimeException e) {
            connection.closeAsync().force();
            throw e;
        }
    }

    public void refreshSchema(SchemaElement targetType, String targetKeyspace, String targetName) throws InterruptedException {
        logger.debug("[Control connection] Refreshing schema for {}{}", (Object)(targetType == null ? "everything" : targetKeyspace), (Object)(targetType == SchemaElement.KEYSPACE ? "" : "." + targetName + " (" + (Object)((Object)targetType) + ")"));
        try {
            Connection c = this.connectionRef.get();
            if (c == null || c.isClosed()) {
                return;
            }
            ControlConnection.refreshSchema(c, targetType, targetKeyspace, targetName, this.cluster);
            if (targetType == null || targetType == SchemaElement.KEYSPACE) {
                this.cluster.submitNodeListRefresh();
            }
        }
        catch (ConnectionException e) {
            logger.debug("[Control connection] Connection error while refreshing schema ({})", (Object)e.getMessage());
            this.signalError();
        }
        catch (ExecutionException e) {
            if (!this.isShutdown) {
                logger.error("[Control connection] Unexpected error while refreshing schema", (Throwable)e);
            }
            this.signalError();
        }
        catch (BusyConnectionException e) {
            logger.debug("[Control connection] Connection is busy, reconnecting");
            this.signalError();
        }
    }

    static void refreshSchema(Connection connection, SchemaElement targetType, String targetKeyspace, String targetName, Cluster.Manager cluster) throws ConnectionException, BusyConnectionException, ExecutionException, InterruptedException {
        DefaultResultSetFuture colsFuture;
        VersionNumber cassandraVersion;
        Host host = cluster.metadata.getHost(connection.address);
        if (host == null || host.getCassandraVersion() == null) {
            cassandraVersion = cluster.protocolVersion().minCassandraVersion();
            logger.warn("Cannot find Cassandra version for host {} to parse the schema, using {} based on protocol version in use. If parsing the schema fails, this could be the cause", (Object)connection.address, (Object)cassandraVersion);
        } else {
            cassandraVersion = host.getCassandraVersion();
        }
        String whereClause = "";
        if (targetType != null) {
            whereClause = " WHERE keyspace_name = '" + targetKeyspace + '\'';
            if (targetType == SchemaElement.TABLE) {
                whereClause = whereClause + " AND columnfamily_name = '" + targetName + '\'';
            } else if (targetType == SchemaElement.TYPE) {
                whereClause = whereClause + " AND type_name = '" + targetName + '\'';
            }
        }
        boolean isSchemaOrKeyspace = targetType == null || targetType == SchemaElement.KEYSPACE;
        DefaultResultSetFuture ksFuture = isSchemaOrKeyspace ? new DefaultResultSetFuture(null, cluster.protocolVersion(), new Requests.Query(SELECT_KEYSPACES + whereClause)) : null;
        DefaultResultSetFuture udtFuture = isSchemaOrKeyspace && ControlConnection.supportsUdts(cassandraVersion) || targetType == SchemaElement.TYPE ? new DefaultResultSetFuture(null, cluster.protocolVersion(), new Requests.Query(SELECT_USERTYPES + whereClause)) : null;
        DefaultResultSetFuture cfFuture = isSchemaOrKeyspace || targetType == SchemaElement.TABLE ? new DefaultResultSetFuture(null, cluster.protocolVersion(), new Requests.Query(SELECT_COLUMN_FAMILIES + whereClause)) : null;
        DefaultResultSetFuture defaultResultSetFuture = colsFuture = isSchemaOrKeyspace || targetType == SchemaElement.TABLE ? new DefaultResultSetFuture(null, cluster.protocolVersion(), new Requests.Query(SELECT_COLUMNS + whereClause)) : null;
        if (ksFuture != null) {
            connection.write(ksFuture);
        }
        if (udtFuture != null) {
            connection.write(udtFuture);
        }
        if (cfFuture != null) {
            connection.write(cfFuture);
        }
        if (colsFuture != null) {
            connection.write(colsFuture);
        }
        try {
            cluster.metadata.rebuildSchema(targetType, targetKeyspace, targetName, ksFuture == null ? null : (ResultSet)ksFuture.get(), udtFuture == null ? null : (ResultSet)udtFuture.get(), cfFuture == null ? null : (ResultSet)cfFuture.get(), colsFuture == null ? null : (ResultSet)colsFuture.get(), cassandraVersion);
        }
        catch (RuntimeException e) {
            logger.error("Error parsing schema from Cassandra system tables: the schema in Cluster#getMetadata() will appear incomplete or stale", (Throwable)e);
        }
    }

    private static boolean supportsUdts(VersionNumber cassandraVersion) {
        return cassandraVersion.getMajor() > 2 || cassandraVersion.getMajor() == 2 && cassandraVersion.getMinor() >= 1;
    }

    void refreshNodeListAndTokenMap() {
        Connection c = this.connectionRef.get();
        if (c == null || c.isClosed()) {
            return;
        }
        try {
            ControlConnection.refreshNodeListAndTokenMap(c, this.cluster, false, true);
        }
        catch (ConnectionException e) {
            logger.debug("[Control connection] Connection error while refreshing node list and token map ({})", (Object)e.getMessage());
            this.signalError();
        }
        catch (ExecutionException e) {
            if (!this.isShutdown) {
                logger.error("[Control connection] Unexpected error while refreshing node list and token map", (Throwable)e);
            }
            this.signalError();
        }
        catch (BusyConnectionException e) {
            logger.debug("[Control connection] Connection is busy, reconnecting");
            this.signalError();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            logger.debug("[Control connection] Interrupted while refreshing node list and token map, skipping it.");
        }
    }

    private static InetSocketAddress addressToUseForPeerHost(Row peersRow, InetSocketAddress connectedHost, Cluster.Manager cluster, boolean logMissingRpcAddresses) {
        InetAddress peer = peersRow.getInet("peer");
        InetAddress addr = peersRow.getInet("rpc_address");
        if (peer.equals(connectedHost.getAddress()) || addr != null && addr.equals(connectedHost.getAddress())) {
            logger.debug("System.peers on node {} has a line for itself. This is not normal but is a known problem of some DSE version. Ignoring the entry.", (Object)connectedHost);
            return null;
        }
        if (addr == null) {
            if (logMissingRpcAddresses) {
                logger.warn("No rpc_address found for host {} in {}'s peers system table. {} will be ignored.", new Object[]{peer, connectedHost, peer});
            }
            return null;
        }
        if (addr.equals(bindAllAddress)) {
            logger.warn("Found host with 0.0.0.0 as rpc_address, using listen_address ({}) to contact it instead. If this is incorrect you should avoid the use of 0.0.0.0 server side.", (Object)peer);
            addr = peer;
        }
        return cluster.translateAddress(addr);
    }

    private Row fetchNodeInfo(Host host, Connection c) throws ConnectionException, BusyConnectionException, ExecutionException, InterruptedException {
        boolean isConnectedHost = c.address.equals(host.getSocketAddress());
        if (isConnectedHost || host.listenAddress != null) {
            DefaultResultSetFuture future = isConnectedHost ? new DefaultResultSetFuture(null, this.cluster.protocolVersion(), new Requests.Query(SELECT_LOCAL)) : new DefaultResultSetFuture(null, this.cluster.protocolVersion(), new Requests.Query("SELECT * FROM system.peers WHERE peer='" + host.listenAddress.getHostAddress() + '\''));
            c.write(future);
            return ((ResultSet)future.get()).one();
        }
        DefaultResultSetFuture future = new DefaultResultSetFuture(null, this.cluster.protocolVersion(), new Requests.Query(SELECT_PEERS));
        c.write(future);
        for (Row row : (ResultSet)future.get()) {
            InetSocketAddress addr = ControlConnection.addressToUseForPeerHost(row, c.address, this.cluster, true);
            if (addr == null || !addr.equals(host.getSocketAddress())) continue;
            return row;
        }
        return null;
    }

    boolean refreshNodeInfo(Host host) {
        Connection c = this.connectionRef.get();
        if (c == null || c.isClosed()) {
            return true;
        }
        logger.debug("[Control connection] Refreshing node info on {}", (Object)host);
        try {
            Row row = this.fetchNodeInfo(host, c);
            if (row == null) {
                if (c.isDefunct()) {
                    logger.debug("Control connection is down, could not refresh node info");
                    return true;
                }
                logger.warn("No row found for host {} in {}'s peers system table. {} will be ignored.", new Object[]{host.getAddress(), c.address, host.getAddress()});
                return false;
            }
            if (!c.address.equals(host.getSocketAddress()) && row.getInet("rpc_address") == null) {
                logger.warn("No rpc_address found for host {} in {}'s peers system table. {} will be ignored.", new Object[]{host.getAddress(), c.address, host.getAddress()});
                return false;
            }
            ControlConnection.updateInfo(host, row, this.cluster, false);
            return true;
        }
        catch (ConnectionException e) {
            logger.debug("[Control connection] Connection error while refreshing node info ({})", (Object)e.getMessage());
            this.signalError();
        }
        catch (ExecutionException e) {
            if (!this.isShutdown) {
                logger.debug("[Control connection] Unexpected error while refreshing node info", (Throwable)e);
            }
            this.signalError();
        }
        catch (BusyConnectionException e) {
            logger.debug("[Control connection] Connection is busy, reconnecting");
            this.signalError();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            logger.debug("[Control connection] Interrupted while refreshing node info, skipping it.");
        }
        catch (Exception e) {
            logger.debug("[Control connection] Unexpected error while refreshing node info", (Throwable)e);
            this.signalError();
        }
        return true;
    }

    private static void updateInfo(Host host, Row row, Cluster.Manager cluster, boolean isInitialConnection) {
        if (!row.isNull("data_center") || !row.isNull("rack")) {
            ControlConnection.updateLocationInfo(host, row.getString("data_center"), row.getString("rack"), isInitialConnection, cluster);
        }
        String version = row.getString("release_version");
        InetAddress listenAddress = row.getColumnDefinitions().contains("peer") ? row.getInet("peer") : null;
        host.setVersionAndListenAdress(version, listenAddress);
    }

    private static void updateLocationInfo(Host host, String datacenter, String rack, boolean isInitialConnection, Cluster.Manager cluster) {
        if (Objects.equal((Object)host.getDatacenter(), (Object)datacenter) && Objects.equal((Object)host.getRack(), (Object)rack)) {
            return;
        }
        if (!isInitialConnection) {
            cluster.loadBalancingPolicy().onDown(host);
        }
        host.setLocationInfo(datacenter, rack);
        if (!isInitialConnection) {
            cluster.loadBalancingPolicy().onAdd(host);
        }
    }

    private static void refreshNodeListAndTokenMap(Connection connection, Cluster.Manager cluster, boolean isInitialConnection, boolean logMissingRpcAddresses) throws ConnectionException, BusyConnectionException, ExecutionException, InterruptedException {
        logger.debug("[Control connection] Refreshing node list and token map");
        boolean metadataEnabled = cluster.configuration.getQueryOptions().isMetadataEnabled();
        DefaultResultSetFuture localFuture = new DefaultResultSetFuture(null, cluster.protocolVersion(), new Requests.Query(SELECT_LOCAL));
        DefaultResultSetFuture peersFuture = new DefaultResultSetFuture(null, cluster.protocolVersion(), new Requests.Query(SELECT_PEERS));
        connection.write(localFuture);
        connection.write(peersFuture);
        String partitioner = null;
        HashMap<Host, Collection<String>> tokenMap = new HashMap<Host, Collection<String>>();
        Row localRow = ((ResultSet)localFuture.get()).one();
        if (localRow != null) {
            Host host;
            String clusterName = localRow.getString("cluster_name");
            if (clusterName != null) {
                cluster.metadata.clusterName = clusterName;
            }
            if ((partitioner = localRow.getString("partitioner")) != null) {
                cluster.metadata.partitioner = partitioner;
            }
            if ((host = cluster.metadata.getHost(connection.address)) == null) {
                logger.debug("Host in local system table ({}) unknown to us (ok if said host just got removed)", (Object)connection.address);
            } else {
                ControlConnection.updateInfo(host, localRow, cluster, isInitialConnection);
                if (metadataEnabled) {
                    Set<String> tokens = localRow.getSet("tokens", String.class);
                    if (partitioner != null && !tokens.isEmpty()) {
                        tokenMap.put(host, tokens);
                    }
                }
            }
        }
        ArrayList<InetSocketAddress> foundHosts = new ArrayList<InetSocketAddress>();
        ArrayList<String> dcs = new ArrayList<String>();
        ArrayList<String> racks = new ArrayList<String>();
        ArrayList<String> cassandraVersions = new ArrayList<String>();
        ArrayList<InetAddress> listenAddresses = new ArrayList<InetAddress>();
        ArrayList<Set<String>> allTokens = new ArrayList<Set<String>>();
        for (Row row : (ResultSet)peersFuture.get()) {
            InetSocketAddress addr = ControlConnection.addressToUseForPeerHost(row, connection.address, cluster, logMissingRpcAddresses);
            if (addr == null) continue;
            foundHosts.add(addr);
            dcs.add(row.getString("data_center"));
            racks.add(row.getString("rack"));
            cassandraVersions.add(row.getString("release_version"));
            listenAddresses.add(row.getInet("peer"));
            if (!metadataEnabled) continue;
            allTokens.add(row.getSet("tokens", String.class));
        }
        for (int i = 0; i < foundHosts.size(); ++i) {
            Host host = cluster.metadata.getHost((InetSocketAddress)foundHosts.get(i));
            boolean isNew = false;
            if (host == null) {
                Host newHost = cluster.metadata.newHost((InetSocketAddress)foundHosts.get(i));
                Host existing = cluster.metadata.addIfAbsent(newHost);
                if (existing == null) {
                    host = newHost;
                    isNew = true;
                } else {
                    host = existing;
                    isNew = false;
                }
            }
            if (dcs.get(i) != null || racks.get(i) != null) {
                ControlConnection.updateLocationInfo(host, (String)dcs.get(i), (String)racks.get(i), isInitialConnection, cluster);
            }
            if (cassandraVersions.get(i) != null) {
                host.setVersionAndListenAdress((String)cassandraVersions.get(i), (InetAddress)listenAddresses.get(i));
            }
            if (metadataEnabled && partitioner != null && !((Set)allTokens.get(i)).isEmpty()) {
                tokenMap.put(host, (Collection<String>)allTokens.get(i));
            }
            if (!isNew || isInitialConnection) continue;
            cluster.triggerOnAdd(host);
        }
        HashSet foundHostsSet = new HashSet(foundHosts);
        for (Host host : cluster.metadata.allHosts()) {
            if (host.getSocketAddress().equals(connection.address) || foundHostsSet.contains(host.getSocketAddress())) continue;
            cluster.removeHost(host, isInitialConnection);
        }
        if (metadataEnabled) {
            cluster.metadata.rebuildTokenMap(partitioner, tokenMap);
        }
    }

    boolean waitForSchemaAgreement() throws ConnectionException, BusyConnectionException, ExecutionException, InterruptedException {
        long start = System.nanoTime();
        long elapsed = 0L;
        int maxSchemaAgreementWaitSeconds = this.cluster.configuration.getProtocolOptions().getMaxSchemaAgreementWaitSeconds();
        while (elapsed < (long)(maxSchemaAgreementWaitSeconds * 1000)) {
            if (this.checkSchemaAgreement()) {
                return true;
            }
            Thread.sleep(200L);
            elapsed = Cluster.timeSince(start, TimeUnit.MILLISECONDS);
        }
        return false;
    }

    boolean checkSchemaAgreement() throws ConnectionException, BusyConnectionException, InterruptedException, ExecutionException {
        Connection connection = this.connectionRef.get();
        if (connection == null || connection.isClosed()) {
            return false;
        }
        DefaultResultSetFuture peersFuture = new DefaultResultSetFuture(null, this.cluster.protocolVersion(), new Requests.Query(SELECT_SCHEMA_PEERS));
        DefaultResultSetFuture localFuture = new DefaultResultSetFuture(null, this.cluster.protocolVersion(), new Requests.Query(SELECT_SCHEMA_LOCAL));
        connection.write(peersFuture);
        connection.write(localFuture);
        HashSet<UUID> versions = new HashSet<UUID>();
        Row localRow = ((ResultSet)localFuture.get()).one();
        if (localRow != null && !localRow.isNull("schema_version")) {
            versions.add(localRow.getUUID("schema_version"));
        }
        for (Row row : (ResultSet)peersFuture.get()) {
            Host peer;
            InetSocketAddress addr = ControlConnection.addressToUseForPeerHost(row, connection.address, this.cluster, true);
            if (addr == null || row.isNull("schema_version") || (peer = this.cluster.metadata.getHost(addr)) == null || !peer.isUp()) continue;
            versions.add(row.getUUID("schema_version"));
        }
        logger.debug("Checking for schema agreement: versions are {}", versions);
        return versions.size() <= 1;
    }

    boolean isOpen() {
        Connection c = this.connectionRef.get();
        return c != null && !c.isClosed();
    }

    @Override
    public void onUp(Host host) {
    }

    @Override
    public void onDown(Host host) {
        this.onHostGone(host);
    }

    @Override
    public void onRemove(Host host) {
        this.onHostGone(host);
        this.cluster.submitNodeListRefresh();
    }

    private void onHostGone(Host host) {
        Connection current = this.connectionRef.get();
        if (current != null && current.address.equals(host.getSocketAddress())) {
            logger.debug("[Control connection] {} is down/removed and it was the control host, triggering reconnect", (Object)current.address);
            if (!current.isClosed()) {
                current.closeAsync();
            }
            this.backgroundReconnect(0L);
        }
    }

    @Override
    public void onConnectionDefunct(Connection connection) {
        if (connection == this.connectionRef.get()) {
            this.backgroundReconnect(0L);
        }
    }

    @Override
    public void onSuspected(Host host) {
    }

    @Override
    public void onAdd(Host host) {
        Metadata.TokenMap tkmap = this.cluster.metadata.tokenMap;
        if (host.getCassandraVersion() == null || tkmap == null || !tkmap.hosts.contains(host)) {
            this.cluster.submitNodeListRefresh();
        }
    }

    static {
        try {
            bindAllAddress = InetAddress.getByAddress(new byte[4]);
        }
        catch (UnknownHostException e) {
            throw new RuntimeException(e);
        }
    }
}

