/*
 * Decompiled with CFR 0.152.
 */
package software.amazon.awssdk.transfer.s3.internal;

import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.utils.Logger;
import software.amazon.awssdk.utils.Validate;

@SdkInternalApi
public class AsyncBufferingSubscriber<T>
implements Subscriber<T> {
    private static final Logger log = Logger.loggerFor(AsyncBufferingSubscriber.class);
    private final CompletableFuture<?> returnFuture;
    private final Function<T, CompletableFuture<?>> consumer;
    private final int maxConcurrentExecutions;
    private final AtomicInteger numRequestsInFlight;
    private volatile boolean upstreamDone;
    private Subscription subscription;
    private final Set<CompletableFuture<?>> requestsInFlight;

    public AsyncBufferingSubscriber(Function<T, CompletableFuture<?>> consumer, CompletableFuture<Void> returnFuture, int maxConcurrentExecutions) {
        this.returnFuture = returnFuture;
        this.consumer = consumer;
        this.maxConcurrentExecutions = maxConcurrentExecutions;
        this.numRequestsInFlight = new AtomicInteger(0);
        this.requestsInFlight = ConcurrentHashMap.newKeySet();
        returnFuture.whenComplete((r, t) -> {
            if (t != null) {
                this.requestsInFlight.forEach(f -> f.cancel(true));
            }
        });
    }

    public void onSubscribe(Subscription subscription) {
        Validate.paramNotNull((Object)subscription, (String)"subscription");
        if (this.subscription != null) {
            log.warn(() -> "The subscriber has already been subscribed. Cancelling the incoming subscription");
            subscription.cancel();
            return;
        }
        this.subscription = subscription;
        subscription.request((long)this.maxConcurrentExecutions);
    }

    public void onNext(T item) {
        this.numRequestsInFlight.incrementAndGet();
        CompletableFuture<?> currentRequest = this.consumer.apply(item);
        this.requestsInFlight.add(currentRequest);
        currentRequest.whenComplete((r, t) -> {
            this.checkForCompletion(this.numRequestsInFlight.decrementAndGet());
            this.requestsInFlight.remove(currentRequest);
            AsyncBufferingSubscriber asyncBufferingSubscriber = this;
            synchronized (asyncBufferingSubscriber) {
                this.subscription.request(1L);
            }
        });
    }

    public void onError(Throwable t) {
        this.returnFuture.completeExceptionally(t);
        this.upstreamDone = true;
    }

    public void onComplete() {
        this.upstreamDone = true;
        this.checkForCompletion(this.numRequestsInFlight.get());
    }

    private void checkForCompletion(int requestsInFlight) {
        if (this.upstreamDone && requestsInFlight == 0) {
            this.returnFuture.complete(null);
        }
    }

    public int numRequestsInFlight() {
        return this.numRequestsInFlight.get();
    }
}

