/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dubbo.registry.xds.util.protocol;

import io.envoyproxy.envoy.config.core.v3.Node;
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest;
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse;
import io.grpc.stub.StreamObserver;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.NamedThreadFactory;
import org.apache.dubbo.registry.xds.util.XdsChannel;
import org.apache.dubbo.registry.xds.util.protocol.DeltaResource;
import org.apache.dubbo.registry.xds.util.protocol.XdsProtocol;

public abstract class AbstractProtocol<T, S extends DeltaResource<T>>
implements XdsProtocol<T> {
    private static final Logger logger = LoggerFactory.getLogger(AbstractProtocol.class);
    protected final XdsChannel xdsChannel;
    protected final Node node;
    protected final Map<Long, Set<String>> requestParam = new ConcurrentHashMap<Long, Set<String>>();
    private final Map<Long, StreamObserver<DiscoveryRequest>> requestObserverMap = new ConcurrentHashMap<Long, StreamObserver<DiscoveryRequest>>();
    private final Map<Long, ScheduledFuture<?>> observeScheduledMap = new ConcurrentHashMap();
    private final Map<Long, CompletableFuture<T>> streamResult = new ConcurrentHashMap<Long, CompletableFuture<T>>();
    private final ScheduledExecutorService pollingExecutor;
    private final int pollingTimeout;
    protected static final AtomicLong requestId = new AtomicLong(0L);

    public AbstractProtocol(XdsChannel xdsChannel, Node node, int pollingPoolSize, int pollingTimeout) {
        this.xdsChannel = xdsChannel;
        this.node = node;
        this.pollingExecutor = new ScheduledThreadPoolExecutor(pollingPoolSize, (ThreadFactory)new NamedThreadFactory("Dubbo-registry-xds"));
        this.pollingTimeout = pollingTimeout;
    }

    public abstract String getTypeUrl();

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public T getResource(Set<String> resourceNames) {
        long request = requestId.getAndIncrement();
        resourceNames = resourceNames == null ? Collections.emptySet() : resourceNames;
        this.requestParam.put(request, resourceNames);
        StreamObserver<DiscoveryRequest> requestObserver = this.xdsChannel.createDeltaDiscoveryRequest(new ResponseObserver(request));
        CompletableFuture future = new CompletableFuture();
        this.requestObserverMap.put(request, requestObserver);
        this.streamResult.put(request, future);
        requestObserver.onNext((Object)this.buildDiscoveryRequest(resourceNames));
        try {
            Object t = future.get();
            return t;
        }
        catch (InterruptedException | ExecutionException e) {
            logger.error("Error occur when request control panel.");
            T t = null;
            return t;
        }
        finally {
            this.streamResult.remove(request);
            this.requestObserverMap.remove(request);
            this.requestParam.remove(request);
        }
    }

    @Override
    public long observeResource(Set<String> resourceNames, Consumer<T> consumer) {
        long request = requestId.getAndIncrement();
        resourceNames = resourceNames == null ? Collections.emptySet() : resourceNames;
        this.requestParam.put(request, resourceNames);
        consumer.accept(this.getResource(resourceNames));
        StreamObserver<DiscoveryRequest> requestObserver = this.xdsChannel.createDeltaDiscoveryRequest(new ResponseObserver(request));
        this.requestObserverMap.put(request, requestObserver);
        ScheduledFuture<?> scheduledFuture = this.pollingExecutor.scheduleAtFixedRate(() -> {
            try {
                Set<String> names = this.requestParam.get(request);
                CompletableFuture future = new CompletableFuture();
                this.streamResult.put(request, future);
                StreamObserver<DiscoveryRequest> observer = this.requestObserverMap.get(request);
                observer.onNext((Object)this.buildDiscoveryRequest(names));
                try {
                    consumer.accept(future.get());
                }
                catch (InterruptedException | ExecutionException e) {
                    logger.error("Error occur when request control panel.");
                }
                finally {
                    this.streamResult.remove(request);
                }
            }
            catch (Throwable t) {
                logger.error("Error when requesting observe data. Type: " + this.getTypeUrl(), t);
            }
        }, this.pollingTimeout, this.pollingTimeout, TimeUnit.SECONDS);
        this.observeScheduledMap.put(request, scheduledFuture);
        return request;
    }

    @Override
    public void updateObserve(long request, Set<String> resourceNames) {
        this.requestParam.put(request, resourceNames);
    }

    protected DiscoveryRequest buildDiscoveryRequest(Set<String> resourceNames) {
        return DiscoveryRequest.newBuilder().setNode(this.node).setTypeUrl(this.getTypeUrl()).addAllResourceNames(resourceNames).build();
    }

    protected DiscoveryRequest buildDiscoveryRequest(Set<String> resourceNames, DiscoveryResponse response) {
        return DiscoveryRequest.newBuilder().setNode(this.node).setTypeUrl(response.getTypeUrl()).setVersionInfo(response.getVersionInfo()).setResponseNonce(response.getNonce()).build();
    }

    protected abstract T decodeDiscoveryResponse(DiscoveryResponse var1);

    private class ResponseObserver
    implements StreamObserver<DiscoveryResponse> {
        private final long requestId;

        public ResponseObserver(long requestId) {
            this.requestId = requestId;
        }

        public void onNext(DiscoveryResponse value) {
            logger.info("receive notification from xds server, type: " + AbstractProtocol.this.getTypeUrl() + " requestId: " + this.requestId);
            Object result = AbstractProtocol.this.decodeDiscoveryResponse(value);
            StreamObserver observer = (StreamObserver)AbstractProtocol.this.requestObserverMap.get(this.requestId);
            if (observer == null) {
                return;
            }
            observer.onNext((Object)AbstractProtocol.this.buildDiscoveryRequest(Collections.emptySet(), value));
            CompletableFuture future = (CompletableFuture)AbstractProtocol.this.streamResult.get(this.requestId);
            if (future == null) {
                return;
            }
            future.complete(result);
        }

        public void onError(Throwable t) {
            logger.error("xDS Client received error message! detail:", t);
        }

        public void onCompleted() {
        }
    }
}

