/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.cloud.ai.dashscope.protocol;

import com.alibaba.cloud.ai.dashscope.api.ApiUtils;
import com.alibaba.cloud.ai.dashscope.protocol.DashScopeWebSocketClientOptions;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import okhttp3.ConnectionPool;
import okhttp3.Dispatcher;
import okhttp3.Headers;
import okhttp3.Interceptor;
import okhttp3.OkHttpClient;
import okhttp3.Protocol;
import okhttp3.Request;
import okhttp3.Response;
import okhttp3.WebSocket;
import okhttp3.WebSocketListener;
import okhttp3.logging.HttpLoggingInterceptor;
import okio.ByteString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;

public class DashScopeWebSocketClient
extends WebSocketListener {
    private final Logger logger = LoggerFactory.getLogger(DashScopeWebSocketClient.class);
    private final DashScopeWebSocketClientOptions options;
    private WebSocket webSocketClient;
    private AtomicBoolean isOpen;
    FluxSink<ByteBuffer> emitter;
    FluxSink<ByteBuffer> binary_emitter;
    FluxSink<String> text_emitter;

    public DashScopeWebSocketClient(DashScopeWebSocketClientOptions options) {
        this.options = options;
        this.isOpen = new AtomicBoolean(false);
    }

    public Flux<ByteBuffer> streamBinaryOut(String text) {
        Flux flux = Flux.create(emitter -> {
            this.binary_emitter = emitter;
        }, (FluxSink.OverflowStrategy)FluxSink.OverflowStrategy.BUFFER);
        this.sendText(text);
        return flux;
    }

    public Flux<String> streamTextOut(Flux<ByteBuffer> binary) {
        Flux flux = Flux.create(emitter -> {
            this.text_emitter = emitter;
        }, (FluxSink.OverflowStrategy)FluxSink.OverflowStrategy.BUFFER);
        binary.subscribe(this::sendBinary);
        return flux;
    }

    public void sendText(String text) {
        boolean success;
        if (!this.isOpen.get()) {
            this.establishWebSocketClient();
        }
        if (!(success = this.webSocketClient.send(text))) {
            this.logger.error("send text failed");
        }
    }

    public void sendBinary(ByteBuffer binary) {
        boolean success = this.webSocketClient.send(ByteString.of((ByteBuffer)binary));
        if (!success) {
            this.logger.error("send binary failed");
        }
    }

    private void establishWebSocketClient() {
        HttpLoggingInterceptor logging = new HttpLoggingInterceptor();
        logging.setLevel(HttpLoggingInterceptor.Level.valueOf((String)"NONE"));
        Dispatcher dispatcher = new Dispatcher();
        dispatcher.setMaxRequests(Constants.DEFAULT_MAXIMUM_ASYNC_REQUESTS.intValue());
        dispatcher.setMaxRequestsPerHost(Constants.DEFAULT_MAXIMUM_ASYNC_REQUESTS_PER_HOST.intValue());
        OkHttpClient.Builder clientBuilder = new OkHttpClient.Builder();
        clientBuilder.connectTimeout(Constants.DEFAULT_CONNECT_TIMEOUT).readTimeout(Constants.DEFAULT_READ_TIMEOUT).writeTimeout(Constants.DEFAULT_WRITE_TIMEOUT).addInterceptor((Interceptor)logging).dispatcher(dispatcher).protocols(Collections.singletonList(Protocol.HTTP_1_1)).connectionPool(new ConnectionPool(Constants.DEFAULT_CONNECTION_POOL_SIZE.intValue(), Constants.DEFAULT_CONNECTION_IDLE_TIMEOUT.getSeconds(), TimeUnit.SECONDS));
        OkHttpClient httpClient = clientBuilder.build();
        try {
            this.webSocketClient = httpClient.newWebSocket(this.buildConnectionRequest(), (WebSocketListener)this);
        }
        catch (Throwable ex) {
            this.logger.error("create websocket failed: msg={}", (Object)ex.getMessage());
        }
    }

    private Request buildConnectionRequest() {
        Request.Builder bd = new Request.Builder();
        bd.headers(Headers.of(ApiUtils.getMapContentHeaders(this.options.getApiKey(), false, this.options.getWorkSpaceId(), null)));
        return bd.url(this.options.getUrl()).build();
    }

    private String getRequestBody(Response response) {
        String responseBody = "";
        if (response != null && response.body() != null) {
            try {
                responseBody = response.body().string();
            }
            catch (IOException ex) {
                this.logger.error("get response body failed: {}", (Object)ex.getMessage());
            }
        }
        return responseBody;
    }

    public void onOpen(WebSocket webSocket, Response response) {
        this.logger.info("receive ws event onOpen: handle={}, body={}", (Object)webSocket, (Object)this.getRequestBody(response));
        this.isOpen.set(true);
    }

    public void onClosed(WebSocket webSocket, int code, String reason) {
        this.logger.info("receive ws event onClosed: handle={}, code={}, reason={}", new Object[]{webSocket, code, reason});
        this.isOpen.set(false);
        this.emittersComplete("closed");
    }

    public void onClosing(WebSocket webSocket, int code, String reason) {
        this.logger.info("receive ws event onClosing: handle={}, code={}, reason={}", new Object[]{webSocket.toString(), code, reason});
        this.emittersComplete("closing");
    }

    public void onFailure(WebSocket webSocket, Throwable t, Response response) {
        String failureMessage = String.format("msg=%s, cause=%s, body=%s", t.getMessage(), t.getCause(), this.getRequestBody(response));
        this.logger.error("receive ws event onFailure: handle={}, {}", (Object)webSocket, (Object)failureMessage);
        this.isOpen.set(false);
        this.emittersError("failure", new Exception(failureMessage, t));
    }

    public void onMessage(WebSocket webSocket, String text) {
        this.logger.debug("receive ws event onMessage(text): handle={}, text={}", (Object)webSocket, (Object)text);
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
        try {
            EventMessage message = (EventMessage)objectMapper.readValue(text, EventMessage.class);
            switch (message.header.event) {
                case TASK_STARTED: {
                    this.logger.info("task started: text={}", (Object)text);
                    break;
                }
                case TASK_FINISHED: {
                    this.logger.info("task finished: text={}", (Object)text);
                    this.emittersComplete("finished");
                    break;
                }
                case TASK_FAILED: {
                    this.logger.error("task failed: text={}", (Object)text);
                    this.emittersError("task failed", new Exception());
                    break;
                }
                case RESULT_GENERATED: {
                    if (this.text_emitter != null) {
                        this.text_emitter.next((Object)text);
                    }
                    break;
                }
                default: {
                    this.logger.error("task error: text={}", (Object)text);
                    this.emittersError("unsupported event", new Exception());
                    break;
                }
            }
        }
        catch (Exception e) {
            this.logger.error("parse message failed: text={}, msg={}", (Object)text, (Object)e.getMessage());
        }
    }

    public void onMessage(WebSocket webSocket, ByteString bytes) {
        this.logger.debug("receive ws event onMessage(bytes): handle={}, size={}", (Object)webSocket, (Object)bytes.size());
        if (this.binary_emitter != null) {
            this.binary_emitter.next((Object)bytes.asByteBuffer());
        }
    }

    private void emittersComplete(String event) {
        if (this.binary_emitter != null && !this.binary_emitter.isCancelled()) {
            this.logger.info("binary emitter handling: complete on {}", (Object)event);
            this.binary_emitter.complete();
        }
        if (this.text_emitter != null && !this.text_emitter.isCancelled()) {
            this.logger.info("text emitter handling: complete on {}", (Object)event);
            this.text_emitter.complete();
            this.logger.info("done");
        }
    }

    private void emittersError(String event, Throwable t) {
        if (this.binary_emitter != null && !this.binary_emitter.isCancelled()) {
            this.logger.info("binary emitter handling: error on {}", (Object)event);
            this.binary_emitter.error(t);
        }
        if (this.text_emitter != null && !this.text_emitter.isCancelled()) {
            this.logger.info("text emitter handling: error on {}", (Object)event);
            this.text_emitter.error(t);
        }
    }

    public static class Constants {
        private static final Duration DEFAULT_CONNECT_TIMEOUT = Duration.ofSeconds(120L);
        private static final Duration DEFAULT_WRITE_TIMEOUT = Duration.ofSeconds(60L);
        private static final Duration DEFAULT_READ_TIMEOUT = Duration.ofSeconds(300L);
        private static final Duration DEFAULT_CONNECTION_IDLE_TIMEOUT = Duration.ofSeconds(300L);
        private static final Integer DEFAULT_CONNECTION_POOL_SIZE = 32;
        private static final Integer DEFAULT_MAXIMUM_ASYNC_REQUESTS = 32;
        private static final Integer DEFAULT_MAXIMUM_ASYNC_REQUESTS_PER_HOST = 32;
        private static final String DEFAULT_HTTP_LOGGING_LEVEL = "NONE";
    }

    @JsonInclude(value=JsonInclude.Include.NON_NULL)
    public record EventMessage(@JsonProperty(value="header") EventMessageHeader header, @JsonProperty(value="payload") EventMessagePayload payload) {

        public record EventMessageHeader(@JsonProperty(value="task_id") String taskId, @JsonProperty(value="event") EventType event, @JsonProperty(value="error_code") String code, @JsonProperty(value="error_message") String message) {
        }

        public record EventMessagePayload(@JsonProperty(value="output") JsonNode output, @JsonProperty(value="usage") JsonNode usage) {
        }
    }

    public static enum EventType {
        TASK_STARTED("task-started"),
        RESULT_GENERATED("result-generated"),
        TASK_FINISHED("task-finished"),
        TASK_FAILED("task-failed"),
        RUN_TASK("run-task"),
        CONTINUE_TASK("continue-task"),
        FINISH_TASK("finish-task");

        private final String value;

        private EventType(String value) {
            this.value = value;
        }

        public String getValue() {
            return this.value;
        }
    }
}

