/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dubbo.rpc.protocol.tri.h12.http1;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.stream.StreamObserver;
import org.apache.dubbo.remoting.http12.HttpChannel;
import org.apache.dubbo.remoting.http12.HttpHeaderNames;
import org.apache.dubbo.remoting.http12.HttpInputMessage;
import org.apache.dubbo.remoting.http12.RequestMetadata;
import org.apache.dubbo.remoting.http12.h1.Http1ServerChannelObserver;
import org.apache.dubbo.remoting.http12.h1.Http1ServerTransportListener;
import org.apache.dubbo.remoting.http12.h1.Http1SseServerChannelObserver;
import org.apache.dubbo.remoting.http12.message.DefaultListeningDecoder;
import org.apache.dubbo.remoting.http12.message.MediaType;
import org.apache.dubbo.remoting.http12.message.codec.JsonCodec;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.model.FrameworkModel;
import org.apache.dubbo.rpc.model.MethodDescriptor;
import org.apache.dubbo.rpc.protocol.tri.Http3Exchanger;
import org.apache.dubbo.rpc.protocol.tri.RpcInvocationBuildContext;
import org.apache.dubbo.rpc.protocol.tri.h12.AbstractServerTransportListener;
import org.apache.dubbo.rpc.protocol.tri.h12.DefaultHttpMessageListener;
import org.apache.dubbo.rpc.protocol.tri.h12.HttpMessageListener;
import org.apache.dubbo.rpc.protocol.tri.h12.ServerCallListener;
import org.apache.dubbo.rpc.protocol.tri.h12.ServerStreamServerCallListener;
import org.apache.dubbo.rpc.protocol.tri.h12.UnaryServerCallListener;
import org.apache.dubbo.rpc.protocol.tri.h12.http1.Http1UnaryServerChannelObserver;

public class DefaultHttp11ServerTransportListener
extends AbstractServerTransportListener<RequestMetadata, HttpInputMessage>
implements Http1ServerTransportListener {
    private final HttpChannel httpChannel;
    private Http1ServerChannelObserver responseObserver;

    public DefaultHttp11ServerTransportListener(HttpChannel httpChannel, URL url, FrameworkModel frameworkModel) {
        super(frameworkModel, url, httpChannel);
        this.httpChannel = httpChannel;
        this.responseObserver = this.prepareResponseObserver(new Http1UnaryServerChannelObserver(httpChannel));
    }

    private Http1ServerChannelObserver prepareResponseObserver(Http1ServerChannelObserver responseObserver) {
        responseObserver.setExceptionCustomizer(this.getExceptionCustomizer());
        RpcInvocationBuildContext context = this.getContext();
        responseObserver.setResponseEncoder(context == null ? JsonCodec.INSTANCE : context.getHttpMessageEncoder());
        return responseObserver;
    }

    @Override
    protected HttpMessageListener buildHttpMessageListener() {
        RpcInvocationBuildContext context = this.getContext();
        RpcInvocation rpcInvocation = this.buildRpcInvocation(context);
        ServerCallListener serverCallListener = this.startListener(rpcInvocation, context.getMethodDescriptor(), context.getInvoker());
        DefaultListeningDecoder listeningDecoder = new DefaultListeningDecoder(context.getHttpMessageDecoder(), context.getMethodMetadata().getActualRequestTypes());
        listeningDecoder.setListener(serverCallListener::onMessage);
        return new DefaultHttpMessageListener(listeningDecoder);
    }

    private ServerCallListener startListener(RpcInvocation invocation, MethodDescriptor methodDescriptor, Invoker<?> invoker) {
        switch (methodDescriptor.getRpcType()) {
            case UNARY: {
                return new AutoCompleteUnaryServerCallListener(invocation, invoker, this.responseObserver);
            }
            case SERVER_STREAM: {
                this.responseObserver = this.prepareResponseObserver(new Http1SseServerChannelObserver(this.httpChannel));
                this.responseObserver.addHeadersCustomizer((hs, t) -> hs.set(HttpHeaderNames.CONTENT_TYPE.getKey(), MediaType.TEXT_EVENT_STREAM.getName()));
                return new AutoCompleteServerStreamServerCallListener(invocation, invoker, this.responseObserver);
            }
        }
        throw new UnsupportedOperationException("HTTP1.x only support unary and server-stream");
    }

    @Override
    protected void onMetadataCompletion(RequestMetadata metadata) {
        this.responseObserver.setResponseEncoder(this.getContext().getHttpMessageEncoder());
    }

    @Override
    protected void onError(Throwable throwable) {
        this.responseObserver.onError(throwable);
    }

    @Override
    protected void initializeAltSvc(URL url) {
        String protocolId = Http3Exchanger.isEnabled(url) ? "h3" : "h2";
        String value = protocolId + "=\":" + url.getParameter("bind.port", url.getPort()) + '\"';
        this.responseObserver.addHeadersCustomizer((hs, t) -> hs.set(HttpHeaderNames.ALT_SVC.getKey(), value));
    }

    private static final class AutoCompleteUnaryServerCallListener
    extends UnaryServerCallListener {
        AutoCompleteUnaryServerCallListener(RpcInvocation invocation, Invoker<?> invoker, StreamObserver<Object> responseObserver) {
            super(invocation, invoker, responseObserver);
        }

        @Override
        public void onMessage(Object message) {
            super.onMessage(message);
            this.onComplete();
        }
    }

    private static final class AutoCompleteServerStreamServerCallListener
    extends ServerStreamServerCallListener {
        AutoCompleteServerStreamServerCallListener(RpcInvocation invocation, Invoker<?> invoker, StreamObserver<Object> responseObserver) {
            super(invocation, invoker, responseObserver);
        }

        @Override
        public void onMessage(Object message) {
            super.onMessage(message);
            this.onComplete();
        }
    }
}

