/*
 * Decompiled with CFR 0.152.
 */
package com.coreos.jetcd.internal.impl;

import com.coreos.jetcd.Watch;
import com.coreos.jetcd.api.WatchCancelRequest;
import com.coreos.jetcd.api.WatchCreateRequest;
import com.coreos.jetcd.api.WatchGrpc;
import com.coreos.jetcd.api.WatchRequest;
import com.coreos.jetcd.api.WatchResponse;
import com.coreos.jetcd.common.exception.ErrorCode;
import com.coreos.jetcd.common.exception.EtcdException;
import com.coreos.jetcd.common.exception.EtcdExceptionFactory;
import com.coreos.jetcd.data.ByteSequence;
import com.coreos.jetcd.internal.impl.ClientConnectionManager;
import com.coreos.jetcd.internal.impl.Util;
import com.coreos.jetcd.options.WatchOption;
import com.coreos.jetcd.watch.WatchResponseWithError;
import com.google.common.base.Strings;
import com.google.protobuf.ByteString;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

class WatchImpl
implements Watch {
    private static final Logger logger = Logger.getLogger(WatchImpl.class.getName());
    private final ConcurrentHashMap<Long, WatcherImpl> watchers = new ConcurrentHashMap();
    private final ConcurrentLinkedQueue<WatcherImpl> pendingWatchers = new ConcurrentLinkedQueue();
    private final Set<Long> cancelSet = ConcurrentHashMap.newKeySet();
    private final ExecutorService executor = Executors.newCachedThreadPool();
    private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
    private final ClientConnectionManager connectionManager;
    private final WatchGrpc.WatchStub stub;
    private volatile StreamObserver<WatchRequest> grpcWatchStreamObserver;
    private boolean closed = false;

    WatchImpl(ClientConnectionManager connectionManager) {
        this.connectionManager = connectionManager;
        this.stub = connectionManager.newStub(WatchGrpc::newStub);
    }

    private boolean isClosed() {
        return this.closed;
    }

    private void setClosed() {
        this.closed = true;
    }

    @Override
    public Watch.Watcher watch(ByteSequence key) {
        return this.watch(key, WatchOption.DEFAULT);
    }

    @Override
    public synchronized Watch.Watcher watch(ByteSequence key, WatchOption watchOption) {
        if (this.isClosed()) {
            throw EtcdExceptionFactory.newClosedWatchClientException();
        }
        WatcherImpl watcher = new WatcherImpl(key, watchOption, this);
        this.pendingWatchers.add(watcher);
        if (this.pendingWatchers.size() == 1) {
            WatchRequest request = this.toWatchCreateRequest(watcher);
            this.getGrpcWatchStreamObserver().onNext((Object)request);
        }
        return watcher;
    }

    @Override
    public synchronized void close() {
        if (this.isClosed()) {
            return;
        }
        this.setClosed();
        this.notifyWatchers(EtcdExceptionFactory.newClosedWatchClientException());
        this.closeGrpcWatchStreamObserver();
        this.executor.shutdownNow();
        this.scheduledExecutorService.shutdownNow();
    }

    private void notifyWatchers(EtcdException e) {
        WatchResponseWithError wre = new WatchResponseWithError(e);
        this.pendingWatchers.forEach(watcher -> {
            try {
                ((WatcherImpl)watcher).enqueue(wre);
            }
            catch (Exception we) {
                logger.log(Level.WARNING, "failed to notify watcher", we);
            }
        });
        this.pendingWatchers.clear();
        this.watchers.values().forEach(watcher -> {
            try {
                ((WatcherImpl)watcher).enqueue(wre);
            }
            catch (Exception we) {
                logger.log(Level.WARNING, "failed to notify watcher", we);
            }
        });
        this.watchers.clear();
    }

    private synchronized void cancelWatcher(long id) {
        if (this.isClosed()) {
            return;
        }
        if (this.cancelSet.contains(id)) {
            return;
        }
        this.watchers.remove(id);
        this.cancelSet.add(id);
        WatchCancelRequest watchCancelRequest = WatchCancelRequest.newBuilder().setWatchId(id).build();
        WatchRequest cancelRequest = WatchRequest.newBuilder().setCancelRequest(watchCancelRequest).build();
        this.getGrpcWatchStreamObserver().onNext((Object)cancelRequest);
    }

    private synchronized StreamObserver<WatchRequest> getGrpcWatchStreamObserver() {
        if (this.grpcWatchStreamObserver == null) {
            this.grpcWatchStreamObserver = this.stub.watch(this.createWatchStreamObserver());
        }
        return this.grpcWatchStreamObserver;
    }

    private StreamObserver<WatchResponse> createWatchStreamObserver() {
        return new StreamObserver<WatchResponse>(){

            public void onNext(WatchResponse watchResponse) {
                WatchImpl.this.processWatchResponse(watchResponse);
            }

            public void onError(Throwable t) {
                WatchImpl.this.processError(t);
            }

            public void onCompleted() {
            }
        };
    }

    private synchronized void processWatchResponse(WatchResponse watchResponse) {
        if (this.isClosed()) {
            return;
        }
        if (watchResponse.getCreated()) {
            this.processCreate(watchResponse);
        } else if (watchResponse.getCanceled()) {
            this.processCanceled(watchResponse);
        } else {
            this.processEvents(watchResponse);
        }
    }

    private synchronized void processError(Throwable t) {
        if (this.isClosed()) {
            return;
        }
        Status status = Status.fromThrowable((Throwable)t);
        if (this.isHaltError(status) || this.isNoLeaderError(status)) {
            this.notifyWatchers(EtcdExceptionFactory.toEtcdException(status));
            this.closeGrpcWatchStreamObserver();
            this.cancelSet.clear();
            return;
        }
        this.scheduledExecutorService.schedule(this::resume, 500L, TimeUnit.MILLISECONDS);
    }

    private synchronized void resume() {
        this.closeGrpcWatchStreamObserver();
        this.cancelSet.clear();
        this.resumeWatchers();
    }

    private void closeGrpcWatchStreamObserver() {
        if (this.grpcWatchStreamObserver == null) {
            return;
        }
        this.grpcWatchStreamObserver.onCompleted();
        this.grpcWatchStreamObserver = null;
    }

    private boolean isNoLeaderError(Status status) {
        return status.getCode() == Status.Code.UNAVAILABLE && "etcdserver: no leader".equals(status.getDescription());
    }

    private boolean isHaltError(Status status) {
        return status.getCode() != Status.Code.UNAVAILABLE && status.getCode() != Status.Code.INTERNAL;
    }

    private void processCreate(WatchResponse response) {
        WatcherImpl watcher = this.pendingWatchers.poll();
        this.sendNextWatchCreateRequest();
        if (watcher == null) {
            logger.log(Level.WARNING, "Watch client receives watch create response but find no corresponding watcher");
            return;
        }
        if (watcher.isClosed()) {
            return;
        }
        if (response.getWatchId() == -1L) {
            watcher.enqueue(new WatchResponseWithError(EtcdExceptionFactory.newEtcdException(ErrorCode.INTERNAL, "etcd server failed to create watch id")));
            return;
        }
        if (watcher.getRevision() == 0L) {
            watcher.setRevision(response.getHeader().getRevision());
        }
        watcher.setWatchID(response.getWatchId());
        this.watchers.put(watcher.getWatchID(), watcher);
    }

    private Optional<WatchRequest> nextResume() {
        WatcherImpl pendingWatcher = this.pendingWatchers.peek();
        if (pendingWatcher != null) {
            return Optional.of(this.toWatchCreateRequest(pendingWatcher));
        }
        return Optional.empty();
    }

    private void sendNextWatchCreateRequest() {
        this.nextResume().ifPresent(nextWatchRequest -> this.getGrpcWatchStreamObserver().onNext(nextWatchRequest));
    }

    private void processEvents(WatchResponse response) {
        WatcherImpl watcher = this.watchers.get(response.getWatchId());
        if (watcher == null) {
            this.cancelWatcher(response.getWatchId());
            return;
        }
        if (response.getCompactRevision() != 0L) {
            watcher.enqueue(new WatchResponseWithError(EtcdExceptionFactory.newCompactedException(response.getCompactRevision())));
            return;
        }
        if (response.getEventsCount() == 0) {
            watcher.setRevision(response.getHeader().getRevision());
            return;
        }
        watcher.enqueue(new WatchResponseWithError(response));
        watcher.setRevision(response.getEvents(response.getEventsCount() - 1).getKv().getModRevision() + 1L);
    }

    private void resumeWatchers() {
        this.watchers.values().forEach(watcher -> {
            if (watcher.isClosed()) {
                return;
            }
            ((WatcherImpl)watcher).setWatchID(-1L);
            this.pendingWatchers.add((WatcherImpl)watcher);
        });
        this.watchers.clear();
        this.sendNextWatchCreateRequest();
    }

    private void processCanceled(WatchResponse response) {
        WatcherImpl watcher = this.watchers.get(response.getWatchId());
        this.cancelSet.remove(response.getWatchId());
        if (watcher == null) {
            return;
        }
        String reason = response.getCancelReason();
        if (Strings.isNullOrEmpty((String)reason)) {
            watcher.enqueue(new WatchResponseWithError(EtcdExceptionFactory.newEtcdException(ErrorCode.OUT_OF_RANGE, "etcdserver: mvcc: required revision is a future revision")));
        } else {
            watcher.enqueue(new WatchResponseWithError(EtcdExceptionFactory.newEtcdException(ErrorCode.FAILED_PRECONDITION, reason)));
        }
    }

    private WatchRequest toWatchCreateRequest(WatcherImpl watcher) {
        ByteString key = Util.byteStringFromByteSequence(watcher.getKey());
        WatchOption option = watcher.getWatchOption();
        WatchCreateRequest.Builder builder = WatchCreateRequest.newBuilder().setKey(key).setPrevKv(option.isPrevKV()).setProgressNotify(option.isProgressNotify()).setStartRevision(watcher.getRevision());
        option.getEndKey().ifPresent(endKey -> builder.setRangeEnd(Util.byteStringFromByteSequence(endKey)));
        if (option.isNoDelete()) {
            builder.addFilters(WatchCreateRequest.FilterType.NODELETE);
        }
        if (option.isNoPut()) {
            builder.addFilters(WatchCreateRequest.FilterType.NOPUT);
        }
        return WatchRequest.newBuilder().setCreateRequest(builder).build();
    }

    public static class WatcherImpl
    implements Watch.Watcher {
        final ExecutorService executor = Executors.newSingleThreadExecutor();
        private final WatchOption watchOption;
        private final ByteSequence key;
        private final Object closedLock = new Object();
        private final BlockingQueue<WatchResponseWithError> eventsQueue = new LinkedBlockingQueue<WatchResponseWithError>();
        private long watchID;
        private long revision;
        private boolean closed = false;
        private final WatchImpl owner;

        private WatcherImpl(ByteSequence key, WatchOption watchOption, WatchImpl owner) {
            this.key = key;
            this.watchOption = watchOption;
            this.revision = watchOption.getRevision();
            this.owner = owner;
        }

        private long getRevision() {
            return this.revision;
        }

        private void setRevision(long revision) {
            this.revision = revision;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public boolean isClosed() {
            Object object = this.closedLock;
            synchronized (object) {
                return this.closed;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void setClosed() {
            Object object = this.closedLock;
            synchronized (object) {
                this.closed = true;
            }
        }

        private long getWatchID() {
            return this.watchID;
        }

        private void setWatchID(long watchID) {
            this.watchID = watchID;
        }

        private WatchOption getWatchOption() {
            return this.watchOption;
        }

        private ByteSequence getKey() {
            return this.key;
        }

        private void enqueue(WatchResponseWithError watchResponse) {
            try {
                this.eventsQueue.put(watchResponse);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                logger.log(Level.WARNING, "Interrupted", e);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void close() {
            Object object = this.closedLock;
            synchronized (object) {
                if (this.isClosed()) {
                    return;
                }
                this.setClosed();
            }
            this.owner.cancelWatcher(this.watchID);
            this.executor.shutdownNow();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public synchronized com.coreos.jetcd.watch.WatchResponse listen() throws InterruptedException {
            if (this.isClosed()) {
                throw EtcdExceptionFactory.newClosedWatcherException();
            }
            try {
                return this.createWatchResponseFuture().get();
            }
            catch (ExecutionException e) {
                Object object = this.closedLock;
                synchronized (object) {
                    if (this.isClosed()) {
                        throw EtcdExceptionFactory.newClosedWatcherException();
                    }
                }
                Throwable t = e.getCause();
                if (t instanceof EtcdException) {
                    throw (EtcdException)t;
                }
                throw EtcdExceptionFactory.toEtcdException(e);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw e;
            }
            catch (RejectedExecutionException e) {
                throw EtcdExceptionFactory.newClosedWatcherException();
            }
        }

        private Future<com.coreos.jetcd.watch.WatchResponse> createWatchResponseFuture() {
            return this.executor.submit(() -> {
                WatchResponseWithError watchResponse = this.eventsQueue.take();
                if (watchResponse.getException() != null) {
                    throw watchResponse.getException();
                }
                return new com.coreos.jetcd.watch.WatchResponse(watchResponse.getWatchResponse());
            });
        }
    }
}

