/*
 * Decompiled with CFR 0.152.
 */
package darabonba.core.async;

import com.aliyun.core.utils.BinaryUtils;
import com.aliyun.core.utils.FunctionalUtils;
import darabonba.core.ResponseBytes;
import darabonba.core.async.AsyncResponseHandler;
import java.io.ByteArrayOutputStream;
import java.nio.ByteBuffer;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

public final class ByteArrayAsyncResponseHandler<ResponseT>
implements AsyncResponseHandler<ResponseT, ResponseBytes<ResponseT>> {
    private BaosSubscriber subscriber;

    @Override
    public void onStream(Publisher<ByteBuffer> publisher) {
        this.subscriber = new BaosSubscriber();
        publisher.subscribe((Subscriber)this.subscriber);
    }

    @Override
    public ResponseBytes<ResponseT> transform(ResponseT response) {
        return ResponseBytes.fromByteArrayUnsafe(response, this.subscriber.baos.toByteArray());
    }

    static class BaosSubscriber
    implements Subscriber<ByteBuffer> {
        private ByteArrayOutputStream baos = new ByteArrayOutputStream();
        private Subscription subscription;

        BaosSubscriber() {
        }

        public void onSubscribe(Subscription s) {
            if (this.subscription != null) {
                s.cancel();
                return;
            }
            this.subscription = s;
            this.subscription.request(Long.MAX_VALUE);
        }

        public void onNext(ByteBuffer byteBuffer) {
            FunctionalUtils.invokeSafely(() -> this.baos.write(BinaryUtils.copyBytesFrom((ByteBuffer)byteBuffer)));
            this.subscription.request(1L);
        }

        public void onError(Throwable throwable) {
            this.baos = null;
        }

        public void onComplete() {
        }
    }
}

