/*
 * Decompiled with CFR 0.152.
 */
package com.netflix.turbine;

import com.netflix.turbine.aggregator.InstanceKey;
import com.netflix.turbine.aggregator.StreamAggregator;
import com.netflix.turbine.aggregator.TypeAndNameKey;
import com.netflix.turbine.discovery.StreamAction;
import com.netflix.turbine.discovery.StreamDiscovery;
import com.netflix.turbine.internal.JsonUtility;
import io.reactivex.netty.RxNetty;
import io.reactivex.netty.pipeline.PipelineConfigurator;
import io.reactivex.netty.pipeline.PipelineConfigurators;
import io.reactivex.netty.protocol.http.client.HttpClientRequest;
import io.reactivex.netty.protocol.text.sse.ServerSentEvent;
import java.net.URI;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.observables.GroupedObservable;

public class Turbine {
    private static final Logger logger = LoggerFactory.getLogger(Turbine.class);

    public static void startServerSentEventServer(int port, Observable<GroupedObservable<TypeAndNameKey, Map<String, Object>>> streams) {
        logger.info("Turbine => Starting server on " + port);
        Observable publishedStreams = streams.doOnUnsubscribe(() -> logger.info("Turbine => Unsubscribing aggregation.")).doOnSubscribe(() -> logger.info("Turbine => Starting aggregation")).flatMap(o -> o).publish().refCount();
        RxNetty.createHttpServer((int)port, (request, response) -> {
            logger.info("Turbine => SSE Request Received");
            response.getHeaders().setHeader("Content-Type", (Object)"text/event-stream");
            return publishedStreams.doOnUnsubscribe(() -> logger.info("Turbine => Unsubscribing RxNetty server connection")).flatMap(data -> response.writeAndFlush((Object)new ServerSentEvent(null, null, JsonUtility.mapToJson(data))));
        }, (PipelineConfigurator)PipelineConfigurators.sseServerConfigurator()).startAndWait();
    }

    public static void startServerSentEventServer(int port, StreamDiscovery discovery) {
        Turbine.startServerSentEventServer(port, Turbine.aggregateHttpSSE(discovery));
    }

    public static Observable<GroupedObservable<TypeAndNameKey, Map<String, Object>>> aggregateHttpSSE(URI ... uris) {
        return Turbine.aggregateHttpSSE(() -> Observable.from((Object[])uris).map(uri -> StreamAction.create(StreamAction.ActionType.ADD, uri)).concatWith(Observable.never()));
    }

    public static Observable<GroupedObservable<TypeAndNameKey, Map<String, Object>>> aggregateHttpSSE(StreamDiscovery discovery) {
        Observable streamActions = discovery.getInstanceList().publish().refCount();
        Observable streamAdds = streamActions.filter(a -> a.getType() == StreamAction.ActionType.ADD);
        Observable streamRemoves = streamActions.filter(a -> a.getType() == StreamAction.ActionType.REMOVE);
        Observable streamPerInstance = streamAdds.map(streamAction -> {
            URI uri = streamAction.getUri();
            Observable io = Observable.defer(() -> {
                Observable flatMap = RxNetty.createHttpClient((String)uri.getHost(), (int)uri.getPort(), (PipelineConfigurator)PipelineConfigurators.sseClientConfigurator()).submit(HttpClientRequest.createGet((String)uri.toASCIIString())).flatMap(response -> {
                    if (response.getStatus().code() != 200) {
                        return Observable.error((Throwable)new RuntimeException("Failed to connect: " + response.getStatus()));
                    }
                    return response.getContent().doOnSubscribe(() -> logger.info("Turbine => Aggregate Stream from URI: " + uri.toASCIIString())).doOnUnsubscribe(() -> logger.info("Turbine => Unsubscribing Stream: " + uri)).takeUntil(streamRemoves.filter(a -> a.getUri().equals(streamAction.getUri()))).map(sse -> JsonUtility.jsonToMap(sse.getEventData()));
                });
                return flatMap.retryWhen(attempts -> attempts.flatMap(e -> Observable.timer((long)1L, (TimeUnit)TimeUnit.SECONDS).doOnEach(n -> logger.info("Turbine => Retrying connection to: " + uri))));
            });
            return GroupedObservable.from((Object)InstanceKey.create(uri.toASCIIString()), (Observable)io);
        });
        return StreamAggregator.aggregateGroupedStreams((Observable<GroupedObservable<InstanceKey, Map<String, Object>>>)streamPerInstance);
    }
}

