/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.consumer.internals;

import java.io.Closeable;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
import org.apache.kafka.clients.consumer.internals.Heartbeat;
import org.apache.kafka.clients.consumer.internals.RequestFuture;
import org.apache.kafka.clients.consumer.internals.RequestFutureAdapter;
import org.apache.kafka.clients.consumer.internals.RequestFutureListener;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.IllegalGenerationException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.RebalanceInProgressException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Count;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Meter;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.FindCoordinatorRequest;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.HeartbeatRequest;
import org.apache.kafka.common.requests.HeartbeatResponse;
import org.apache.kafka.common.requests.JoinGroupRequest;
import org.apache.kafka.common.requests.JoinGroupResponse;
import org.apache.kafka.common.requests.LeaveGroupRequest;
import org.apache.kafka.common.requests.LeaveGroupResponse;
import org.apache.kafka.common.requests.SyncGroupRequest;
import org.apache.kafka.common.requests.SyncGroupResponse;
import org.apache.kafka.common.utils.KafkaThread;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;

public abstract class AbstractCoordinator
implements Closeable {
    public static final String HEARTBEAT_THREAD_PREFIX = "kafka-coordinator-heartbeat-thread";
    private final Logger log;
    private final int sessionTimeoutMs;
    private final boolean leaveGroupOnClose;
    private final GroupCoordinatorMetrics sensors;
    private final Heartbeat heartbeat;
    protected final int rebalanceTimeoutMs;
    protected final String groupId;
    protected final ConsumerNetworkClient client;
    protected final Time time;
    protected final long retryBackoffMs;
    private HeartbeatThread heartbeatThread = null;
    private boolean rejoinNeeded = true;
    private boolean needsJoinPrepare = true;
    private MemberState state = MemberState.UNJOINED;
    private RequestFuture<ByteBuffer> joinFuture = null;
    private Node coordinator = null;
    private Generation generation = Generation.NO_GENERATION;
    private RequestFuture<Void> findCoordinatorFuture = null;

    public AbstractCoordinator(LogContext logContext, ConsumerNetworkClient client, String groupId, int rebalanceTimeoutMs, int sessionTimeoutMs, int heartbeatIntervalMs, Metrics metrics, String metricGrpPrefix, Time time, long retryBackoffMs, boolean leaveGroupOnClose) {
        this.log = logContext.logger(AbstractCoordinator.class);
        this.client = client;
        this.time = time;
        this.groupId = groupId;
        this.rebalanceTimeoutMs = rebalanceTimeoutMs;
        this.sessionTimeoutMs = sessionTimeoutMs;
        this.leaveGroupOnClose = leaveGroupOnClose;
        this.heartbeat = new Heartbeat(sessionTimeoutMs, heartbeatIntervalMs, rebalanceTimeoutMs, retryBackoffMs);
        this.sensors = new GroupCoordinatorMetrics(metrics, metricGrpPrefix);
        this.retryBackoffMs = retryBackoffMs;
    }

    protected abstract String protocolType();

    protected abstract List<JoinGroupRequest.ProtocolMetadata> metadata();

    protected abstract void onJoinPrepare(int var1, String var2);

    protected abstract Map<String, ByteBuffer> performAssignment(String var1, String var2, Map<String, ByteBuffer> var3);

    protected abstract void onJoinComplete(int var1, String var2, String var3, ByteBuffer var4);

    public synchronized void ensureCoordinatorReady() {
        this.ensureCoordinatorReady(0L, Long.MAX_VALUE);
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    protected synchronized boolean ensureCoordinatorReady(long startTimeMs, long timeoutMs) {
        long remainingMs = timeoutMs;
        while (this.coordinatorUnknown()) {
            RequestFuture<Void> future = this.lookupCoordinator();
            this.client.poll(future, remainingMs);
            if (future.failed()) {
                if (!future.isRetriable()) throw future.exception();
                remainingMs = timeoutMs - (this.time.milliseconds() - startTimeMs);
                if (remainingMs <= 0L) break;
                this.log.debug("Coordinator discovery failed, refreshing metadata");
                this.client.awaitMetadataUpdate(remainingMs);
            } else if (this.coordinator != null && this.client.connectionFailed(this.coordinator)) {
                this.coordinatorDead();
                this.time.sleep(this.retryBackoffMs);
            }
            if ((remainingMs = timeoutMs - (this.time.milliseconds() - startTimeMs)) > 0L) continue;
            break;
        }
        if (this.coordinatorUnknown()) return false;
        return true;
    }

    protected synchronized RequestFuture<Void> lookupCoordinator() {
        if (this.findCoordinatorFuture == null) {
            Node node = this.client.leastLoadedNode();
            if (node == null) {
                this.log.debug("No broker available to send FindCoordinator request");
                return RequestFuture.noBrokersAvailable();
            }
            this.findCoordinatorFuture = this.sendFindCoordinatorRequest(node);
        }
        return this.findCoordinatorFuture;
    }

    private synchronized void clearFindCoordinatorFuture() {
        this.findCoordinatorFuture = null;
    }

    protected synchronized boolean needRejoin() {
        return this.rejoinNeeded;
    }

    private synchronized boolean rejoinIncomplete() {
        return this.joinFuture != null;
    }

    protected synchronized void pollHeartbeat(long now) {
        if (this.heartbeatThread != null) {
            if (this.heartbeatThread.hasFailed()) {
                RuntimeException cause = this.heartbeatThread.failureCause();
                this.heartbeatThread = null;
                throw cause;
            }
            if (this.heartbeat.shouldHeartbeat(now)) {
                this.notify();
            }
            this.heartbeat.poll(now);
        }
    }

    protected synchronized long timeToNextHeartbeat(long now) {
        if (this.state == MemberState.UNJOINED) {
            return Long.MAX_VALUE;
        }
        return this.heartbeat.timeToNextHeartbeat(now);
    }

    public void ensureActiveGroup() {
        this.ensureCoordinatorReady();
        this.startHeartbeatThreadIfNeeded();
        this.joinGroupIfNeeded();
    }

    private synchronized void startHeartbeatThreadIfNeeded() {
        if (this.heartbeatThread == null) {
            this.heartbeatThread = new HeartbeatThread();
            this.heartbeatThread.start();
        }
    }

    private synchronized void disableHeartbeatThread() {
        if (this.heartbeatThread != null) {
            this.heartbeatThread.disable();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void closeHeartbeatThread() {
        HeartbeatThread thread = null;
        AbstractCoordinator abstractCoordinator = this;
        synchronized (abstractCoordinator) {
            if (this.heartbeatThread == null) {
                return;
            }
            this.heartbeatThread.close();
            thread = this.heartbeatThread;
            this.heartbeatThread = null;
        }
        try {
            thread.join();
        }
        catch (InterruptedException e) {
            this.log.warn("Interrupted while waiting for consumer heartbeat thread to close");
            throw new InterruptException(e);
        }
    }

    void joinGroupIfNeeded() {
        while (this.needRejoin() || this.rejoinIncomplete()) {
            this.ensureCoordinatorReady();
            if (this.needsJoinPrepare) {
                this.onJoinPrepare(this.generation.generationId, this.generation.memberId);
                this.needsJoinPrepare = false;
            }
            RequestFuture<ByteBuffer> future = this.initiateJoinGroup();
            this.client.poll(future);
            if (future.succeeded()) {
                this.onJoinComplete(this.generation.generationId, this.generation.memberId, this.generation.protocol, future.value());
                this.resetJoinGroupFuture();
                this.needsJoinPrepare = true;
                continue;
            }
            this.resetJoinGroupFuture();
            RuntimeException exception = future.exception();
            if (exception instanceof UnknownMemberIdException || exception instanceof RebalanceInProgressException || exception instanceof IllegalGenerationException) continue;
            if (!future.isRetriable()) {
                throw exception;
            }
            this.time.sleep(this.retryBackoffMs);
        }
    }

    private synchronized void resetJoinGroupFuture() {
        this.joinFuture = null;
    }

    private synchronized RequestFuture<ByteBuffer> initiateJoinGroup() {
        if (this.joinFuture == null) {
            this.disableHeartbeatThread();
            this.state = MemberState.REBALANCING;
            this.joinFuture = this.sendJoinGroupRequest();
            this.joinFuture.addListener(new RequestFutureListener<ByteBuffer>(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void onSuccess(ByteBuffer value) {
                    AbstractCoordinator abstractCoordinator = AbstractCoordinator.this;
                    synchronized (abstractCoordinator) {
                        AbstractCoordinator.this.log.info("Successfully joined group with generation {}", (Object)((AbstractCoordinator)AbstractCoordinator.this).generation.generationId);
                        AbstractCoordinator.this.state = MemberState.STABLE;
                        AbstractCoordinator.this.rejoinNeeded = false;
                        if (AbstractCoordinator.this.heartbeatThread != null) {
                            AbstractCoordinator.this.heartbeatThread.enable();
                        }
                    }
                }

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void onFailure(RuntimeException e) {
                    AbstractCoordinator abstractCoordinator = AbstractCoordinator.this;
                    synchronized (abstractCoordinator) {
                        AbstractCoordinator.this.state = MemberState.UNJOINED;
                    }
                }
            });
        }
        return this.joinFuture;
    }

    private RequestFuture<ByteBuffer> sendJoinGroupRequest() {
        if (this.coordinatorUnknown()) {
            return RequestFuture.coordinatorNotAvailable();
        }
        this.log.info("(Re-)joining group");
        JoinGroupRequest.Builder requestBuilder = new JoinGroupRequest.Builder(this.groupId, this.sessionTimeoutMs, this.generation.memberId, this.protocolType(), this.metadata()).setRebalanceTimeout(this.rebalanceTimeoutMs);
        this.log.debug("Sending JoinGroup ({}) to coordinator {}", (Object)requestBuilder, (Object)this.coordinator);
        return this.client.send(this.coordinator, requestBuilder).compose(new JoinGroupResponseHandler());
    }

    private RequestFuture<ByteBuffer> onJoinFollower() {
        SyncGroupRequest.Builder requestBuilder = new SyncGroupRequest.Builder(this.groupId, this.generation.generationId, this.generation.memberId, Collections.emptyMap());
        this.log.debug("Sending follower SyncGroup to coordinator {}: {}", (Object)this.coordinator, (Object)requestBuilder);
        return this.sendSyncGroupRequest(requestBuilder);
    }

    private RequestFuture<ByteBuffer> onJoinLeader(JoinGroupResponse joinResponse) {
        try {
            Map<String, ByteBuffer> groupAssignment = this.performAssignment(joinResponse.leaderId(), joinResponse.groupProtocol(), joinResponse.members());
            SyncGroupRequest.Builder requestBuilder = new SyncGroupRequest.Builder(this.groupId, this.generation.generationId, this.generation.memberId, groupAssignment);
            this.log.debug("Sending leader SyncGroup to coordinator {}: {}", (Object)this.coordinator, (Object)requestBuilder);
            return this.sendSyncGroupRequest(requestBuilder);
        }
        catch (RuntimeException e) {
            return RequestFuture.failure(e);
        }
    }

    private RequestFuture<ByteBuffer> sendSyncGroupRequest(SyncGroupRequest.Builder requestBuilder) {
        if (this.coordinatorUnknown()) {
            return RequestFuture.coordinatorNotAvailable();
        }
        return this.client.send(this.coordinator, requestBuilder).compose(new SyncGroupResponseHandler());
    }

    private RequestFuture<Void> sendFindCoordinatorRequest(Node node) {
        this.log.debug("Sending FindCoordinator request to broker {}", (Object)node);
        FindCoordinatorRequest.Builder requestBuilder = new FindCoordinatorRequest.Builder(FindCoordinatorRequest.CoordinatorType.GROUP, this.groupId);
        return this.client.send(node, requestBuilder).compose(new FindCoordinatorResponseHandler());
    }

    public boolean coordinatorUnknown() {
        return this.coordinator() == null;
    }

    protected synchronized Node coordinator() {
        if (this.coordinator != null && this.client.connectionFailed(this.coordinator)) {
            this.coordinatorDead();
            return null;
        }
        return this.coordinator;
    }

    protected synchronized void coordinatorDead() {
        if (this.coordinator != null) {
            this.log.info("Marking the coordinator {} dead", (Object)this.coordinator);
            Node oldCoordinator = this.coordinator;
            this.coordinator = null;
            this.client.disconnect(oldCoordinator);
        }
    }

    protected synchronized Generation generation() {
        if (this.state != MemberState.STABLE) {
            return null;
        }
        return this.generation;
    }

    protected synchronized void resetGeneration() {
        this.generation = Generation.NO_GENERATION;
        this.rejoinNeeded = true;
        this.state = MemberState.UNJOINED;
    }

    protected synchronized void requestRejoin() {
        this.rejoinNeeded = true;
    }

    @Override
    public final void close() {
        this.close(0L);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void close(long timeoutMs) {
        try {
            this.closeHeartbeatThread();
        }
        finally {
            AbstractCoordinator abstractCoordinator = this;
            synchronized (abstractCoordinator) {
                Node coordinator;
                if (this.leaveGroupOnClose) {
                    this.maybeLeaveGroup();
                }
                if ((coordinator = this.coordinator()) != null && !this.client.awaitPendingRequests(coordinator, timeoutMs)) {
                    this.log.warn("Close timed out with {} pending requests to coordinator, terminating client connections", (Object)this.client.pendingRequestCount(coordinator));
                }
            }
        }
    }

    public synchronized void maybeLeaveGroup() {
        if (!this.coordinatorUnknown() && this.state != MemberState.UNJOINED && this.generation != Generation.NO_GENERATION) {
            this.log.debug("Sending LeaveGroup request to coordinator {}", (Object)this.coordinator);
            LeaveGroupRequest.Builder request = new LeaveGroupRequest.Builder(this.groupId, this.generation.memberId);
            this.client.send(this.coordinator, request).compose(new LeaveGroupResponseHandler());
            this.client.pollNoWakeup();
        }
        this.resetGeneration();
    }

    synchronized RequestFuture<Void> sendHeartbeatRequest() {
        this.log.debug("Sending Heartbeat request to coordinator {}", (Object)this.coordinator);
        HeartbeatRequest.Builder requestBuilder = new HeartbeatRequest.Builder(this.groupId, this.generation.generationId, this.generation.memberId);
        return this.client.send(this.coordinator, requestBuilder).compose(new HeartbeatResponseHandler());
    }

    protected Meter createMeter(Metrics metrics, String groupName, String baseName, String descriptiveName) {
        return new Meter(new Count(), metrics.metricName(baseName + "-rate", groupName, String.format("The number of %s per second", descriptiveName)), metrics.metricName(baseName + "-total", groupName, String.format("The total number of %s", descriptiveName)));
    }

    private static class UnjoinedGroupException
    extends RetriableException {
        private UnjoinedGroupException() {
        }
    }

    protected static class Generation {
        public static final Generation NO_GENERATION = new Generation(-1, "", null);
        public final int generationId;
        public final String memberId;
        public final String protocol;

        public Generation(int generationId, String memberId, String protocol) {
            this.generationId = generationId;
            this.memberId = memberId;
            this.protocol = protocol;
        }
    }

    private class HeartbeatThread
    extends KafkaThread {
        private boolean enabled;
        private boolean closed;
        private AtomicReference<RuntimeException> failed;

        private HeartbeatThread() {
            super(AbstractCoordinator.HEARTBEAT_THREAD_PREFIX + (AbstractCoordinator.this.groupId.isEmpty() ? "" : " | " + AbstractCoordinator.this.groupId), true);
            this.enabled = false;
            this.closed = false;
            this.failed = new AtomicReference<Object>(null);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void enable() {
            AbstractCoordinator abstractCoordinator = AbstractCoordinator.this;
            synchronized (abstractCoordinator) {
                AbstractCoordinator.this.log.debug("Enabling heartbeat thread");
                this.enabled = true;
                AbstractCoordinator.this.heartbeat.resetTimeouts(AbstractCoordinator.this.time.milliseconds());
                AbstractCoordinator.this.notify();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void disable() {
            AbstractCoordinator abstractCoordinator = AbstractCoordinator.this;
            synchronized (abstractCoordinator) {
                AbstractCoordinator.this.log.debug("Disabling heartbeat thread");
                this.enabled = false;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void close() {
            AbstractCoordinator abstractCoordinator = AbstractCoordinator.this;
            synchronized (abstractCoordinator) {
                this.closed = true;
                AbstractCoordinator.this.notify();
            }
        }

        private boolean hasFailed() {
            return this.failed.get() != null;
        }

        private RuntimeException failureCause() {
            return this.failed.get();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * Enabled aggressive block sorting
         * Enabled unnecessary exception pruning
         * Enabled aggressive exception aggregation
         * Converted monitor instructions to comments
         * Lifted jumps to return sites
         */
        @Override
        public void run() {
            try {
                AbstractCoordinator.this.log.debug("Heartbeat thread started");
                while (true) {
                    AbstractCoordinator abstractCoordinator = AbstractCoordinator.this;
                    // MONITORENTER : abstractCoordinator
                    if (this.closed) {
                        // MONITOREXIT : abstractCoordinator
                        return;
                    }
                    if (!this.enabled) {
                        AbstractCoordinator.this.wait();
                        // MONITOREXIT : abstractCoordinator
                        continue;
                    }
                    if (AbstractCoordinator.this.state != MemberState.STABLE) {
                        this.disable();
                        // MONITOREXIT : abstractCoordinator
                        continue;
                    }
                    AbstractCoordinator.this.client.pollNoWakeup();
                    long now = AbstractCoordinator.this.time.milliseconds();
                    if (AbstractCoordinator.this.coordinatorUnknown()) {
                        if (AbstractCoordinator.this.findCoordinatorFuture != null || AbstractCoordinator.this.lookupCoordinator().failed()) {
                            AbstractCoordinator.this.wait(AbstractCoordinator.this.retryBackoffMs);
                        }
                    } else if (AbstractCoordinator.this.heartbeat.sessionTimeoutExpired(now)) {
                        AbstractCoordinator.this.coordinatorDead();
                    } else if (AbstractCoordinator.this.heartbeat.pollTimeoutExpired(now)) {
                        AbstractCoordinator.this.maybeLeaveGroup();
                    } else if (!AbstractCoordinator.this.heartbeat.shouldHeartbeat(now)) {
                        AbstractCoordinator.this.wait(AbstractCoordinator.this.retryBackoffMs);
                    } else {
                        AbstractCoordinator.this.heartbeat.sentHeartbeat(now);
                        AbstractCoordinator.this.sendHeartbeatRequest().addListener(new RequestFutureListener<Void>(){

                            /*
                             * WARNING - Removed try catching itself - possible behaviour change.
                             */
                            @Override
                            public void onSuccess(Void value) {
                                AbstractCoordinator abstractCoordinator = AbstractCoordinator.this;
                                synchronized (abstractCoordinator) {
                                    AbstractCoordinator.this.heartbeat.receiveHeartbeat(AbstractCoordinator.this.time.milliseconds());
                                }
                            }

                            /*
                             * WARNING - Removed try catching itself - possible behaviour change.
                             */
                            @Override
                            public void onFailure(RuntimeException e) {
                                AbstractCoordinator abstractCoordinator = AbstractCoordinator.this;
                                synchronized (abstractCoordinator) {
                                    if (e instanceof RebalanceInProgressException) {
                                        AbstractCoordinator.this.heartbeat.receiveHeartbeat(AbstractCoordinator.this.time.milliseconds());
                                    } else {
                                        AbstractCoordinator.this.heartbeat.failHeartbeat();
                                        AbstractCoordinator.this.notify();
                                    }
                                }
                            }
                        });
                    }
                    // MONITOREXIT : abstractCoordinator
                    continue;
                    break;
                }
            }
            catch (AuthenticationException e) {
                AbstractCoordinator.this.log.error("An authentication error occurred in the heartbeat thread", (Throwable)e);
                this.failed.set(e);
                return;
            }
            catch (GroupAuthorizationException e) {
                AbstractCoordinator.this.log.error("A group authorization error occurred in the heartbeat thread", (Throwable)e);
                this.failed.set(e);
                return;
            }
            catch (InterruptedException | InterruptException e) {
                Thread.interrupted();
                AbstractCoordinator.this.log.error("Unexpected interrupt received in heartbeat thread", (Throwable)e);
                this.failed.set(new RuntimeException(e));
                return;
            }
            catch (Throwable e) {
                AbstractCoordinator.this.log.error("Heartbeat thread failed due to unexpected error", e);
                if (e instanceof RuntimeException) {
                    this.failed.set((RuntimeException)e);
                    return;
                }
                this.failed.set(new RuntimeException(e));
                return;
            }
            finally {
                AbstractCoordinator.this.log.debug("Heartbeat thread has closed");
            }
        }
    }

    private class GroupCoordinatorMetrics {
        public final String metricGrpName;
        public final Sensor heartbeatLatency;
        public final Sensor joinLatency;
        public final Sensor syncLatency;

        public GroupCoordinatorMetrics(Metrics metrics, String metricGrpPrefix) {
            this.metricGrpName = metricGrpPrefix + "-coordinator-metrics";
            this.heartbeatLatency = metrics.sensor("heartbeat-latency");
            this.heartbeatLatency.add(metrics.metricName("heartbeat-response-time-max", this.metricGrpName, "The max time taken to receive a response to a heartbeat request"), new Max());
            this.heartbeatLatency.add(AbstractCoordinator.this.createMeter(metrics, this.metricGrpName, "heartbeat", "heartbeats"));
            this.joinLatency = metrics.sensor("join-latency");
            this.joinLatency.add(metrics.metricName("join-time-avg", this.metricGrpName, "The average time taken for a group rejoin"), new Avg());
            this.joinLatency.add(metrics.metricName("join-time-max", this.metricGrpName, "The max time taken for a group rejoin"), new Max());
            this.joinLatency.add(AbstractCoordinator.this.createMeter(metrics, this.metricGrpName, "join", "group joins"));
            this.syncLatency = metrics.sensor("sync-latency");
            this.syncLatency.add(metrics.metricName("sync-time-avg", this.metricGrpName, "The average time taken for a group sync"), new Avg());
            this.syncLatency.add(metrics.metricName("sync-time-max", this.metricGrpName, "The max time taken for a group sync"), new Max());
            this.syncLatency.add(AbstractCoordinator.this.createMeter(metrics, this.metricGrpName, "sync", "group syncs"));
            Measurable lastHeartbeat = new Measurable(){

                @Override
                public double measure(MetricConfig config, long now) {
                    return TimeUnit.SECONDS.convert(now - AbstractCoordinator.this.heartbeat.lastHeartbeatSend(), TimeUnit.MILLISECONDS);
                }
            };
            metrics.addMetric(metrics.metricName("last-heartbeat-seconds-ago", this.metricGrpName, "The number of seconds since the last controller heartbeat was sent"), lastHeartbeat);
        }
    }

    protected abstract class CoordinatorResponseHandler<R, T>
    extends RequestFutureAdapter<ClientResponse, T> {
        protected ClientResponse response;

        protected CoordinatorResponseHandler() {
        }

        public abstract void handle(R var1, RequestFuture<T> var2);

        @Override
        public void onFailure(RuntimeException e, RequestFuture<T> future) {
            if (e instanceof DisconnectException) {
                AbstractCoordinator.this.coordinatorDead();
            }
            future.raise(e);
        }

        @Override
        public void onSuccess(ClientResponse clientResponse, RequestFuture<T> future) {
            block2: {
                try {
                    this.response = clientResponse;
                    AbstractResponse responseObj = clientResponse.responseBody();
                    this.handle(responseObj, future);
                }
                catch (RuntimeException e) {
                    if (future.isDone()) break block2;
                    future.raise(e);
                }
            }
        }
    }

    private class HeartbeatResponseHandler
    extends CoordinatorResponseHandler<HeartbeatResponse, Void> {
        private HeartbeatResponseHandler() {
        }

        @Override
        public void handle(HeartbeatResponse heartbeatResponse, RequestFuture<Void> future) {
            ((AbstractCoordinator)AbstractCoordinator.this).sensors.heartbeatLatency.record(this.response.requestLatencyMs());
            Errors error = heartbeatResponse.error();
            if (error == Errors.NONE) {
                AbstractCoordinator.this.log.debug("Received successful Heartbeat response");
                future.complete(null);
            } else if (error == Errors.COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR) {
                AbstractCoordinator.this.log.debug("Attempt to heartbeat since coordinator {} is either not started or not valid.", (Object)AbstractCoordinator.this.coordinator());
                AbstractCoordinator.this.coordinatorDead();
                future.raise(error);
            } else if (error == Errors.REBALANCE_IN_PROGRESS) {
                AbstractCoordinator.this.log.debug("Attempt to heartbeat failed since group is rebalancing");
                AbstractCoordinator.this.requestRejoin();
                future.raise(Errors.REBALANCE_IN_PROGRESS);
            } else if (error == Errors.ILLEGAL_GENERATION) {
                AbstractCoordinator.this.log.debug("Attempt to heartbeat failed since generation {} is not current", (Object)((AbstractCoordinator)AbstractCoordinator.this).generation.generationId);
                AbstractCoordinator.this.resetGeneration();
                future.raise(Errors.ILLEGAL_GENERATION);
            } else if (error == Errors.UNKNOWN_MEMBER_ID) {
                AbstractCoordinator.this.log.debug("Attempt to heartbeat failed for since member id {} is not valid.", (Object)((AbstractCoordinator)AbstractCoordinator.this).generation.memberId);
                AbstractCoordinator.this.resetGeneration();
                future.raise(Errors.UNKNOWN_MEMBER_ID);
            } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
                future.raise(new GroupAuthorizationException(AbstractCoordinator.this.groupId));
            } else {
                future.raise(new KafkaException("Unexpected error in heartbeat response: " + error.message()));
            }
        }
    }

    private class LeaveGroupResponseHandler
    extends CoordinatorResponseHandler<LeaveGroupResponse, Void> {
        private LeaveGroupResponseHandler() {
        }

        @Override
        public void handle(LeaveGroupResponse leaveResponse, RequestFuture<Void> future) {
            Errors error = leaveResponse.error();
            if (error == Errors.NONE) {
                AbstractCoordinator.this.log.debug("LeaveGroup request returned successfully");
                future.complete(null);
            } else {
                AbstractCoordinator.this.log.debug("LeaveGroup request failed with error: {}", (Object)error.message());
                future.raise(error);
            }
        }
    }

    private class FindCoordinatorResponseHandler
    extends RequestFutureAdapter<ClientResponse, Void> {
        private FindCoordinatorResponseHandler() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void onSuccess(ClientResponse resp, RequestFuture<Void> future) {
            AbstractCoordinator.this.log.debug("Received FindCoordinator response {}", (Object)resp);
            AbstractCoordinator.this.clearFindCoordinatorFuture();
            FindCoordinatorResponse findCoordinatorResponse = (FindCoordinatorResponse)resp.responseBody();
            Errors error = findCoordinatorResponse.error();
            if (error == Errors.NONE) {
                AbstractCoordinator abstractCoordinator = AbstractCoordinator.this;
                synchronized (abstractCoordinator) {
                    int coordinatorConnectionId = Integer.MAX_VALUE - findCoordinatorResponse.node().id();
                    AbstractCoordinator.this.coordinator = new Node(coordinatorConnectionId, findCoordinatorResponse.node().host(), findCoordinatorResponse.node().port());
                    AbstractCoordinator.this.log.info("Discovered group coordinator {}", (Object)AbstractCoordinator.this.coordinator);
                    AbstractCoordinator.this.client.tryConnect(AbstractCoordinator.this.coordinator);
                    AbstractCoordinator.this.heartbeat.resetTimeouts(AbstractCoordinator.this.time.milliseconds());
                }
                future.complete(null);
            } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
                future.raise(new GroupAuthorizationException(AbstractCoordinator.this.groupId));
            } else {
                AbstractCoordinator.this.log.debug("Group coordinator lookup failed: {}", (Object)error.message());
                future.raise(error);
            }
        }

        @Override
        public void onFailure(RuntimeException e, RequestFuture<Void> future) {
            AbstractCoordinator.this.clearFindCoordinatorFuture();
            super.onFailure(e, future);
        }
    }

    private class SyncGroupResponseHandler
    extends CoordinatorResponseHandler<SyncGroupResponse, ByteBuffer> {
        private SyncGroupResponseHandler() {
        }

        @Override
        public void handle(SyncGroupResponse syncResponse, RequestFuture<ByteBuffer> future) {
            Errors error = syncResponse.error();
            if (error == Errors.NONE) {
                ((AbstractCoordinator)AbstractCoordinator.this).sensors.syncLatency.record(this.response.requestLatencyMs());
                future.complete(syncResponse.memberAssignment());
            } else {
                AbstractCoordinator.this.requestRejoin();
                if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
                    future.raise(new GroupAuthorizationException(AbstractCoordinator.this.groupId));
                } else if (error == Errors.REBALANCE_IN_PROGRESS) {
                    AbstractCoordinator.this.log.debug("SyncGroup failed due to group rebalance");
                    future.raise(error);
                } else if (error == Errors.UNKNOWN_MEMBER_ID || error == Errors.ILLEGAL_GENERATION) {
                    AbstractCoordinator.this.log.debug("SyncGroup failed: {}", (Object)error.message());
                    AbstractCoordinator.this.resetGeneration();
                    future.raise(error);
                } else if (error == Errors.COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR) {
                    AbstractCoordinator.this.log.debug("SyncGroup failed:", (Object)error.message());
                    AbstractCoordinator.this.coordinatorDead();
                    future.raise(error);
                } else {
                    future.raise(new KafkaException("Unexpected error from SyncGroup: " + error.message()));
                }
            }
        }
    }

    private class JoinGroupResponseHandler
    extends CoordinatorResponseHandler<JoinGroupResponse, ByteBuffer> {
        private JoinGroupResponseHandler() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> future) {
            Errors error = joinResponse.error();
            if (error == Errors.NONE) {
                AbstractCoordinator.this.log.debug("Received successful JoinGroup response: {}", (Object)joinResponse);
                ((AbstractCoordinator)AbstractCoordinator.this).sensors.joinLatency.record(this.response.requestLatencyMs());
                AbstractCoordinator abstractCoordinator = AbstractCoordinator.this;
                synchronized (abstractCoordinator) {
                    if (AbstractCoordinator.this.state != MemberState.REBALANCING) {
                        future.raise(new UnjoinedGroupException());
                    } else {
                        AbstractCoordinator.this.generation = new Generation(joinResponse.generationId(), joinResponse.memberId(), joinResponse.groupProtocol());
                        if (joinResponse.isLeader()) {
                            AbstractCoordinator.this.onJoinLeader(joinResponse).chain(future);
                        } else {
                            AbstractCoordinator.this.onJoinFollower().chain(future);
                        }
                    }
                }
            } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS) {
                AbstractCoordinator.this.log.debug("Attempt to join group rejected since coordinator {} is loading the group.", (Object)AbstractCoordinator.this.coordinator());
                future.raise(error);
            } else if (error == Errors.UNKNOWN_MEMBER_ID) {
                AbstractCoordinator.this.resetGeneration();
                AbstractCoordinator.this.log.debug("Attempt to join group failed due to unknown member id.");
                future.raise(Errors.UNKNOWN_MEMBER_ID);
            } else if (error == Errors.COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR) {
                AbstractCoordinator.this.coordinatorDead();
                AbstractCoordinator.this.log.debug("Attempt to join group failed due to obsolete coordinator information: {}", (Object)error.message());
                future.raise(error);
            } else if (error == Errors.INCONSISTENT_GROUP_PROTOCOL || error == Errors.INVALID_SESSION_TIMEOUT || error == Errors.INVALID_GROUP_ID) {
                AbstractCoordinator.this.log.error("Attempt to join group failed due to fatal error: {}", (Object)error.message());
                future.raise(error);
            } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
                future.raise(new GroupAuthorizationException(AbstractCoordinator.this.groupId));
            } else {
                future.raise(new KafkaException("Unexpected error in join group response: " + error.message()));
            }
        }
    }

    private static enum MemberState {
        UNJOINED,
        REBALANCING,
        STABLE;

    }
}

