package reactor.core.publisher;

import java.io.Serializable;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.locks.LockSupport;
import java.util.function.Consumer;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
import java.util.stream.Stream;
import org.reactivestreams.Subscription;
import reactor.core.Exceptions;
import reactor.core.Scannable;
import reactor.core.publisher.RingBuffer;
import reactor.util.annotation.Nullable;
import reactor.util.concurrent.Queues;
import reactor.util.concurrent.WaitStrategy;
import reactor.util.context.Context;

/* loaded from: input_file:BOOT-INF/lib/reactor-core-3.3.0.RELEASE.jar:reactor/core/publisher/EventLoopProcessor.class */
abstract class EventLoopProcessor<IN> extends FluxProcessor<IN, IN> implements Runnable {
    final ExecutorService executor;
    final ExecutorService requestTaskExecutor;
    final EventLoopContext contextClassLoader;
    final String name;
    final boolean autoCancel;
    final RingBuffer<Slot<IN>> ringBuffer;
    final WaitStrategy readWait = WaitStrategy.liteBlocking();
    Subscription upstreamSubscription;
    volatile boolean cancelled;
    volatile int terminated;
    volatile Throwable error;
    volatile int subscriberCount;
    static final int SHUTDOWN = 1;
    static final int FORCED_SHUTDOWN = 2;
    static final AtomicIntegerFieldUpdater<EventLoopProcessor> SUBSCRIBER_COUNT = AtomicIntegerFieldUpdater.newUpdater(EventLoopProcessor.class, "subscriberCount");
    static final AtomicIntegerFieldUpdater<EventLoopProcessor> TERMINATED = AtomicIntegerFieldUpdater.newUpdater(EventLoopProcessor.class, "terminated");

    /* loaded from: input_file:BOOT-INF/lib/reactor-core-3.3.0.RELEASE.jar:reactor/core/publisher/EventLoopProcessor$EventLoopContext.class */
    static final class EventLoopContext extends ClassLoader {
        final boolean multiproducer;

        EventLoopContext(boolean z) {
            super(Thread.currentThread().getContextClassLoader());
            this.multiproducer = z;
        }
    }

    /* loaded from: input_file:BOOT-INF/lib/reactor-core-3.3.0.RELEASE.jar:reactor/core/publisher/EventLoopProcessor$EventLoopFactory.class */
    static final class EventLoopFactory implements ThreadFactory, Supplier<String> {
        static final AtomicInteger COUNT = new AtomicInteger();
        final String name;
        final boolean daemon;

        /* JADX INFO: Access modifiers changed from: package-private */
        public EventLoopFactory(String str, boolean z) {
            this.name = str;
            this.daemon = z;
        }

        @Override // java.util.concurrent.ThreadFactory
        public Thread newThread(Runnable runnable) {
            Thread thread = new Thread(runnable, this.name + "-" + COUNT.incrementAndGet());
            thread.setDaemon(this.daemon);
            return thread;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.function.Supplier
        public String get() {
            return this.name;
        }
    }

    /* loaded from: input_file:BOOT-INF/lib/reactor-core-3.3.0.RELEASE.jar:reactor/core/publisher/EventLoopProcessor$RequestTask.class */
    static final class RequestTask implements Runnable {
        final LongSupplier readCount;
        final Subscription upstream;
        final EventLoopProcessor<?> parent;
        final Consumer<Long> postWaitCallback;

        RequestTask(Subscription subscription, EventLoopProcessor<?> eventLoopProcessor, @Nullable Consumer<Long> consumer, LongSupplier longSupplier) {
            this.parent = eventLoopProcessor;
            this.readCount = longSupplier;
            this.postWaitCallback = consumer;
            this.upstream = subscription;
        }

        @Override // java.lang.Runnable
        public void run() {
            long bufferSize = this.parent.ringBuffer.bufferSize();
            long max = bufferSize == 1 ? bufferSize : bufferSize - Math.max(bufferSize >> 2, 1L);
            long j = -1;
            try {
                this.parent.run();
                this.upstream.request(bufferSize);
                while (true) {
                    long j2 = j + max;
                    j = this.parent.readWait.waitFor(j2, this.readCount, this.parent);
                    if (this.postWaitCallback != null) {
                        this.postWaitCallback.accept(Long.valueOf(j));
                    }
                    this.upstream.request(max + (j - j2));
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            } catch (Throwable th) {
                if (!WaitStrategy.isAlert(th)) {
                    this.parent.onError(Operators.onOperatorError(th, this.parent.currentContext()));
                } else if (this.parent.cancelled) {
                    this.upstream.cancel();
                }
            }
        }
    }

    /* loaded from: input_file:BOOT-INF/lib/reactor-core-3.3.0.RELEASE.jar:reactor/core/publisher/EventLoopProcessor$Slot.class */
    public static final class Slot<T> implements Serializable {
        private static final long serialVersionUID = 5172014386416785095L;
        public T value;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <E> Flux<E> coldSource(RingBuffer<Slot<E>> ringBuffer, @Nullable Throwable th, @Nullable Throwable th2, RingBuffer.Sequence sequence) {
        sequence.getClass();
        Flux<E> generate = generate(sequence::getAsLong, (l, synchronousSink) -> {
            long longValue = l.longValue() + 1;
            if (longValue > ringBuffer.getCursor()) {
                synchronousSink.complete();
            } else {
                T t = ((Slot) ringBuffer.get(longValue)).value;
                if (t != 0) {
                    synchronousSink.next(t);
                }
            }
            return Long.valueOf(longValue);
        });
        return th2 != null ? th != null ? concat(generate, Flux.error(Exceptions.addSuppressed(th, th2))) : concat(generate, Flux.error(th2)) : generate;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Runnable createRequestTask(Subscription subscription, EventLoopProcessor<?> eventLoopProcessor, @Nullable Consumer<Long> consumer, LongSupplier longSupplier) {
        return new RequestTask(subscription, eventLoopProcessor, consumer, longSupplier);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static boolean waitRequestOrTerminalEvent(LongSupplier longSupplier, RingBuffer.Reader reader, AtomicBoolean atomicBoolean, LongSupplier longSupplier2, Runnable runnable) {
        while (longSupplier.getAsLong() <= 0) {
            try {
                long asLong = longSupplier2.getAsLong() + 1;
                runnable.run();
                reader.waitFor(asLong, runnable);
                if (!atomicBoolean.get()) {
                    WaitStrategy.alert();
                }
                LockSupport.parkNanos(1L);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return true;
            } catch (Exception e2) {
                if (!atomicBoolean.get() || WaitStrategy.isAlert(e2)) {
                    return false;
                }
                throw e2;
            }
        }
        return true;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void addCap(RingBuffer.Sequence sequence, long j) {
        long asLong;
        do {
            asLong = sequence.getAsLong();
            if (asLong == Long.MAX_VALUE) {
                return;
            }
        } while (!sequence.compareAndSet(asLong, Operators.addCap(asLong, j)));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static long getAndSub(RingBuffer.Sequence sequence, long j) {
        long asLong;
        do {
            asLong = sequence.getAsLong();
            if (asLong == 0 || asLong == Long.MAX_VALUE) {
                return asLong;
            }
        } while (!sequence.compareAndSet(asLong, Operators.subOrZero(asLong, j)));
        return asLong;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public EventLoopProcessor(int i, @Nullable ThreadFactory threadFactory, @Nullable ExecutorService executorService, ExecutorService executorService2, boolean z, boolean z2, Supplier<Slot<IN>> supplier, WaitStrategy waitStrategy) {
        if (!Queues.isPowerOfTwo(i)) {
            throw new IllegalArgumentException("bufferSize must be a power of 2 : " + i);
        }
        if (i < 1) {
            throw new IllegalArgumentException("bufferSize must be strictly positive, was: " + i);
        }
        this.autoCancel = z;
        this.contextClassLoader = new EventLoopContext(z2);
        this.name = defaultName(threadFactory, getClass());
        this.requestTaskExecutor = (ExecutorService) Objects.requireNonNull(executorService2, "requestTaskExecutor");
        if (executorService == null) {
            this.executor = Executors.newCachedThreadPool(threadFactory);
        } else {
            this.executor = executorService;
        }
        if (z2) {
            this.ringBuffer = RingBuffer.createMultiProducer(supplier, i, waitStrategy, this);
        } else {
            this.ringBuffer = RingBuffer.createSingleProducer(supplier, i, waitStrategy, this);
        }
    }

    public abstract long getPending();

    @Override // reactor.core.publisher.FluxProcessor, reactor.core.Scannable
    @Nullable
    public Object scanUnsafe(Scannable.Attr attr) {
        return attr == Scannable.Attr.PARENT ? this.upstreamSubscription : super.scanUnsafe(attr);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static String defaultName(@Nullable ThreadFactory threadFactory, Class<? extends EventLoopProcessor> cls) {
        String obj = threadFactory instanceof Supplier ? ((Supplier) threadFactory).get().toString() : null;
        return null != obj ? obj : cls.getSimpleName();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static ExecutorService defaultRequestTaskExecutor(String str) {
        return Executors.newCachedThreadPool(runnable -> {
            return new Thread(runnable, str + "[request-task]");
        });
    }

    public final boolean alive() {
        return 0 == this.terminated;
    }

    public final boolean awaitAndShutdown() {
        return awaitAndShutdown(Duration.ofSeconds(-1L));
    }

    @Deprecated
    public final boolean awaitAndShutdown(long j, TimeUnit timeUnit) {
        try {
            shutdown();
            return this.executor.awaitTermination(j, timeUnit);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return false;
        }
    }

    public final boolean awaitAndShutdown(Duration duration) {
        long j = -1;
        if (!duration.isNegative()) {
            j = duration.toNanos();
        }
        try {
            shutdown();
            return this.executor.awaitTermination(j, TimeUnit.NANOSECONDS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return false;
        }
    }

    @Override // reactor.core.publisher.FluxProcessor, reactor.core.Scannable
    public Stream<? extends Scannable> inners() {
        return Stream.empty();
    }

    public Flux<IN> drain() {
        return Flux.empty();
    }

    public final Flux<IN> forceShutdown() {
        int i = this.terminated;
        if (i != 2 && TERMINATED.compareAndSet(this, i, 2)) {
            this.executor.shutdownNow();
            this.requestTaskExecutor.shutdownNow();
        }
        return drain();
    }

    public final long getAvailableCapacity() {
        return this.ringBuffer.bufferSize() - this.ringBuffer.getPending();
    }

    @Override // reactor.core.publisher.FluxProcessor
    @Nullable
    public final Throwable getError() {
        return this.error;
    }

    @Override // reactor.core.publisher.Flux
    public final String toString() {
        return "/Processors/" + this.name + "/" + this.contextClassLoader.hashCode();
    }

    public final int hashCode() {
        return this.contextClassLoader.hashCode();
    }

    @Override // reactor.core.publisher.FluxProcessor
    public boolean isSerialized() {
        return this.contextClassLoader.multiproducer;
    }

    @Override // reactor.core.publisher.FluxProcessor
    public final boolean isTerminated() {
        return this.terminated > 0;
    }

    @Override // org.reactivestreams.Subscriber
    public final void onComplete() {
        if (TERMINATED.compareAndSet(this, 0, 1)) {
            this.upstreamSubscription = null;
            doComplete();
            this.executor.shutdown();
            this.readWait.signalAllWhenBlocking();
        }
    }

    @Override // org.reactivestreams.Subscriber
    public final void onError(Throwable th) {
        Objects.requireNonNull(th, "onError");
        if (!TERMINATED.compareAndSet(this, 0, 1)) {
            Operators.onErrorDropped(th, Context.empty());
            return;
        }
        this.error = th;
        this.upstreamSubscription = null;
        doError(th);
        this.executor.shutdown();
        this.readWait.signalAllWhenBlocking();
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.reactivestreams.Subscriber
    public final void onNext(IN in) {
        Objects.requireNonNull(in, "onNext");
        long next = this.ringBuffer.next();
        this.ringBuffer.get(next).value = in;
        this.ringBuffer.publish(next);
    }

    @Override // org.reactivestreams.Subscriber
    public final void onSubscribe(Subscription subscription) {
        if (Operators.validate(this.upstreamSubscription, subscription)) {
            this.upstreamSubscription = subscription;
            try {
                if (subscription != Operators.emptySubscription()) {
                    requestTask(subscription);
                }
            } catch (Throwable th) {
                onError(Operators.onOperatorError(subscription, th, currentContext()));
            }
        }
    }

    @Override // reactor.core.publisher.FluxProcessor
    protected boolean serializeAlways() {
        return !this.contextClassLoader.multiproducer;
    }

    public final void shutdown() {
        try {
            onComplete();
            this.executor.shutdown();
            this.requestTaskExecutor.shutdown();
        } catch (Throwable th) {
            onError(Operators.onOperatorError(th, currentContext()));
        }
    }

    @Override // reactor.core.publisher.FluxProcessor
    public final int getBufferSize() {
        return this.ringBuffer.bufferSize();
    }

    final void cancel() {
        this.cancelled = true;
        if (TERMINATED.compareAndSet(this, 0, 1)) {
            this.executor.shutdown();
        }
        this.readWait.signalAllWhenBlocking();
    }

    protected void doComplete() {
    }

    protected void requestTask(Subscription subscription) {
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public final void decrementSubscribers() {
        Subscription subscription = this.upstreamSubscription;
        if (SUBSCRIBER_COUNT.decrementAndGet(this) == 0 && subscription != null && this.autoCancel) {
            this.upstreamSubscription = null;
            cancel();
        }
    }

    @Override // reactor.core.publisher.FluxProcessor
    public long downstreamCount() {
        return this.subscriberCount;
    }

    abstract void doError(Throwable th);

    /* JADX INFO: Access modifiers changed from: package-private */
    public final boolean incrementSubscribers() {
        return SUBSCRIBER_COUNT.getAndIncrement(this) == 0;
    }
}
