/*
 * Decompiled with CFR 0.152.
 */
package io.rsocket.internal;

import io.netty.buffer.ByteBuf;
import io.rsocket.DuplexConnection;
import io.rsocket.internal.UnboundedProcessor;
import reactor.core.Scannable;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;

public abstract class BaseDuplexConnection
implements DuplexConnection {
    protected Sinks.Empty<Void> onClose = Sinks.empty();
    protected UnboundedProcessor sender = new UnboundedProcessor();

    public BaseDuplexConnection() {
        this.onClose().doFinally(s -> this.doOnClose()).subscribe();
    }

    @Override
    public void sendFrame(int streamId, ByteBuf frame) {
        if (streamId == 0) {
            this.sender.onNextPrioritized(frame);
        } else {
            this.sender.onNext(frame);
        }
    }

    protected abstract void doOnClose();

    @Override
    public final Mono<Void> onClose() {
        return this.onClose.asMono();
    }

    public final void dispose() {
        this.onClose.tryEmitEmpty();
    }

    public final boolean isDisposed() {
        return (Boolean)this.onClose.scan(Scannable.Attr.TERMINATED) != false || (Boolean)this.onClose.scan(Scannable.Attr.CANCELLED) != false;
    }
}

