/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.http.codec;

import java.nio.CharBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.IntPredicate;
import java.util.stream.Collectors;
import org.reactivestreams.Publisher;
import org.springframework.core.ResolvableType;
import org.springframework.core.codec.CodecException;
import org.springframework.core.codec.Decoder;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.MediaType;
import org.springframework.http.ReactiveHttpInputMessage;
import org.springframework.http.codec.HttpMessageReader;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.util.Assert;
import org.springframework.util.MimeTypeUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;

public class ServerSentEventHttpMessageReader
implements HttpMessageReader<Object> {
    private static final IntPredicate NEWLINE_DELIMITER = b -> b == 10 || b == 13;
    private final List<Decoder<?>> dataDecoders;

    public ServerSentEventHttpMessageReader() {
        this.dataDecoders = Collections.emptyList();
    }

    public ServerSentEventHttpMessageReader(List<Decoder<?>> dataDecoders) {
        Assert.notNull(dataDecoders, (String)"'dataDecoders' must not be null");
        this.dataDecoders = new ArrayList(dataDecoders);
    }

    @Override
    public boolean canRead(ResolvableType elementType, MediaType mediaType) {
        return MediaType.TEXT_EVENT_STREAM.isCompatibleWith(mediaType) || ServerSentEvent.class.isAssignableFrom(elementType.getRawClass());
    }

    @Override
    public Flux<Object> read(ResolvableType elementType, ReactiveHttpInputMessage inputMessage, Map<String, Object> hints) {
        boolean isSseElementType = ServerSentEvent.class.isAssignableFrom(elementType.getRawClass());
        ResolvableType dataType = isSseElementType ? elementType.getGeneric(new int[]{0}) : elementType;
        return Flux.from(inputMessage.getBody()).concatMap(ServerSentEventHttpMessageReader::splitOnNewline).map(buffer -> Tuples.of((Object)this.decodeDataBuffer((DataBuffer)buffer), (Object)buffer.factory())).bufferUntil(data -> ((String)data.getT1()).equals("\n")).concatMap(list -> {
            String[] lines;
            ServerSentEvent.Builder<String> sseBuilder = ServerSentEvent.builder();
            StringBuilder dataBuilder = new StringBuilder();
            StringBuilder commentBuilder = new StringBuilder();
            DataBufferFactory bufferFactory = (DataBufferFactory)((Tuple2)list.stream().findFirst().get()).getT2();
            for (String line : lines = list.stream().map(t -> (String)t.getT1()).collect(Collectors.joining()).split("\\r?\\n")) {
                if (line.startsWith("id:")) {
                    sseBuilder.id(line.substring(3));
                    continue;
                }
                if (line.startsWith("event:")) {
                    sseBuilder.event(line.substring(6));
                    continue;
                }
                if (line.startsWith("data:")) {
                    dataBuilder.append(line.substring(5)).append("\n");
                    continue;
                }
                if (line.startsWith("retry:")) {
                    sseBuilder.retry(Duration.ofMillis(Long.valueOf(line.substring(6))));
                    continue;
                }
                if (!line.startsWith(":")) continue;
                commentBuilder.append(line.substring(1)).append("\n");
            }
            if (dataBuilder.length() > 0) {
                String data = dataBuilder.toString();
                if (String.class.isAssignableFrom(dataType.getRawClass())) {
                    sseBuilder.data(data.substring(0, data.length() - 1));
                } else {
                    sseBuilder.data((String)this.decode(data, bufferFactory, dataType, hints));
                }
            }
            if (commentBuilder.length() > 0) {
                String comment = commentBuilder.toString();
                sseBuilder.comment(comment.substring(0, comment.length() - 1));
            }
            ServerSentEvent sse = sseBuilder.build();
            return isSseElementType ? Mono.just(sse) : Mono.justOrEmpty(sse.data());
        }).cast(Object.class);
    }

    private static Flux<DataBuffer> splitOnNewline(DataBuffer dataBuffer) {
        int endIdx;
        ArrayList<DataBuffer> results = new ArrayList<DataBuffer>();
        int startIdx = 0;
        int limit = dataBuffer.readableByteCount();
        do {
            int length = (endIdx = dataBuffer.indexOf(NEWLINE_DELIMITER, startIdx)) != -1 ? endIdx - startIdx + 1 : limit - startIdx;
            DataBuffer token = dataBuffer.slice(startIdx, length);
            results.add(DataBufferUtils.retain((DataBuffer)token));
        } while ((startIdx = endIdx + 1) < limit && endIdx != -1);
        DataBufferUtils.release((DataBuffer)dataBuffer);
        return Flux.fromIterable(results);
    }

    private String decodeDataBuffer(DataBuffer dataBuffer) {
        CharBuffer charBuffer = StandardCharsets.UTF_8.decode(dataBuffer.asByteBuffer());
        DataBufferUtils.release((DataBuffer)dataBuffer);
        return charBuffer.toString();
    }

    private <T> T decode(String data, DataBufferFactory bufferFactory, ResolvableType elementType, Map<String, Object> hints) {
        Optional<Decoder> decoder = this.dataDecoders.stream().filter(e -> e.canDecode(elementType, MimeTypeUtils.APPLICATION_JSON)).findFirst();
        return (T)decoder.orElseThrow(() -> new CodecException("No suitable decoder found!")).decodeToMono((Publisher)Mono.just((Object)bufferFactory.wrap(data.getBytes(StandardCharsets.UTF_8))), elementType, MimeTypeUtils.APPLICATION_JSON, hints).block();
    }

    @Override
    public Mono<Object> readMono(ResolvableType elementType, ReactiveHttpInputMessage inputMessage, Map<String, Object> hints) {
        return Mono.error((Throwable)new UnsupportedOperationException("ServerSentEventHttpMessageReader only supports reading stream of events as a Flux"));
    }

    @Override
    public List<MediaType> getReadableMediaTypes() {
        return Collections.singletonList(MediaType.TEXT_EVENT_STREAM);
    }
}

