/*
 * Decompiled with CFR 0.152.
 */
package com.tencentcloudapi.cls.producer.common;

import com.google.common.math.LongMath;
import com.tencentcloudapi.cls.producer.AsyncProducerConfig;
import com.tencentcloudapi.cls.producer.common.Attempt;
import com.tencentcloudapi.cls.producer.common.LogException;
import com.tencentcloudapi.cls.producer.common.LogItem;
import com.tencentcloudapi.cls.producer.common.Logs;
import com.tencentcloudapi.cls.producer.common.ProducerBatch;
import com.tencentcloudapi.cls.producer.common.RetryQueue;
import com.tencentcloudapi.cls.producer.http.client.Sender;
import com.tencentcloudapi.cls.producer.http.comm.HttpMethod;
import com.tencentcloudapi.cls.producer.http.comm.RequestMessage;
import com.tencentcloudapi.cls.producer.request.PutLogsRequest;
import com.tencentcloudapi.cls.producer.response.PutLogsResponse;
import com.tencentcloudapi.cls.producer.response.Response;
import com.tencentcloudapi.cls.producer.util.LZ4Encoder;
import com.tencentcloudapi.cls.producer.util.QcloudClsSignature;
import java.io.UnsupportedEncodingException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SendProducerBatchTask
implements Runnable {
    private static final Logger LOGGER = LoggerFactory.getLogger(SendProducerBatchTask.class);
    private final ProducerBatch batch;
    private final AsyncProducerConfig producerConfig;
    private final RetryQueue retryQueue;
    private final BlockingQueue<ProducerBatch> successQueue;
    private final BlockingQueue<ProducerBatch> failureQueue;
    private final AtomicInteger batchCount;

    public SendProducerBatchTask(ProducerBatch batch, AsyncProducerConfig producerConfig, RetryQueue retryQueue, BlockingQueue<ProducerBatch> successQueue, BlockingQueue<ProducerBatch> failureQueue, AtomicInteger batchCount) {
        this.batch = batch;
        this.producerConfig = producerConfig;
        this.retryQueue = retryQueue;
        this.successQueue = successQueue;
        this.failureQueue = failureQueue;
        this.batchCount = batchCount;
    }

    @Override
    public void run() {
        try {
            this.sendProducerBatch(System.currentTimeMillis());
        }
        catch (Throwable t) {
            LOGGER.error("Uncaught error in send producer batch task, topic_id=" + this.batch.getTopicId() + ", e=", t);
        }
    }

    private Map<String, String> getCommonHeadPara() {
        HashMap<String, String> headParameter = new HashMap<String, String>(3);
        headParameter.put("Content-Length", "0");
        headParameter.put("Content-Type", "application/x-protobuf");
        headParameter.put("Host", this.producerConfig.getHostName());
        return headParameter;
    }

    private PutLogsRequest buildPutLogsRequest(ProducerBatch batch) {
        List<LogItem> list = batch.getLogItems();
        Logs.LogGroup.Builder logGroup = Logs.LogGroup.newBuilder();
        for (LogItem tmp : list) {
            logGroup.addLogs(tmp.mContents);
        }
        return new PutLogsRequest(batch.getTopicId(), this.producerConfig.getSourceIp(), "", logGroup);
    }

    private PutLogsResponse sendLogs(Map<String, String> urlParameter, Map<String, String> headParameter, byte[] body) throws LogException {
        PutLogsResponse response;
        String signature;
        headParameter.put("Content-Length", String.valueOf(body.length));
        try {
            signature = QcloudClsSignature.buildSignature(this.producerConfig.getSecretId(), this.producerConfig.getSecretKey(), HttpMethod.POST.toString(), "/structuredlog", urlParameter, headParameter, 300000L);
        }
        catch (UnsupportedEncodingException e) {
            throw new LogException("EncodingException", e.getMessage());
        }
        headParameter.put("Authorization", signature);
        headParameter.put("x-cls-compress-type", "lz4");
        URI uri = this.getHostURI();
        byte[] compressedData = LZ4Encoder.compressToLhLz4Chunk(body);
        RequestMessage requestMessage = SendProducerBatchTask.buildRequest(uri, urlParameter, headParameter, compressedData, compressedData.length);
        String requestId = "";
        try {
            response = Sender.doPost(requestMessage);
            if (response != null) {
                requestId = response.GetRequestId();
            }
        }
        catch (Exception e) {
            throw new LogException("SendFailed", e.getMessage());
        }
        switch (response.GetHttpStatusCode()) {
            case 200: {
                return response;
            }
            case 500: {
                throw new LogException(response.GetHttpStatusCode(), "BadResponse", "internal server error", requestId);
            }
            case 429: {
                throw new LogException(response.GetHttpStatusCode(), "SpeedQuotaExceed", "speed quota exceed", requestId);
            }
            case 413: {
                throw new LogException(response.GetHttpStatusCode(), "ContentIsTooLarge", "content is too large", requestId);
            }
            case 404: {
                throw new LogException(response.GetHttpStatusCode(), "TopicNotExists", "topic not exists", requestId);
            }
            case 403: {
                throw new LogException(response.GetHttpStatusCode(), "SingleValueExceed1M", "single log value exceed 1M", requestId);
            }
            case 401: {
                throw new LogException(response.GetHttpStatusCode(), "AuthFailure", "auth failed", requestId);
            }
            case 400: {
                throw new LogException(response.GetHttpStatusCode(), "InvalidParam", "invalid param", requestId);
            }
        }
        throw new LogException(response.GetHttpStatusCode(), "BadResponse", response.GetAllHeaders().toString(), requestId);
    }

    private URI getHostURI() {
        String endPointUrl = this.producerConfig.getHttpType() + this.producerConfig.getHostName();
        try {
            return new URI(endPointUrl);
        }
        catch (URISyntaxException e) {
            throw new IllegalArgumentException("EndpointInvalid", e);
        }
    }

    private static RequestMessage buildRequest(URI endpoint, Map<String, String> parameters, Map<String, String> headers, byte[] content, long size) {
        RequestMessage request = new RequestMessage();
        request.setMethod(HttpMethod.POST);
        request.setEndpoint(endpoint);
        request.setResourcePath("/structuredlog");
        request.setParameters(parameters);
        request.setHeaders(headers);
        request.setContent(content);
        request.setContentLength(size);
        return request;
    }

    private void sendProducerBatch(long nowMs) throws InterruptedException {
        block6: {
            Response response = null;
            try {
                PutLogsRequest request = this.buildPutLogsRequest(this.batch);
                Map<String, String> headParameter = this.getCommonHeadPara();
                request.SetParam("topic_id", request.GetTopic());
                Map<String, String> urlParameter = request.GetAllParams();
                byte[] logBytes = request.GetLogGroupBytes(this.producerConfig.getSourceIp(), this.batch.getPackageId());
                response = this.sendLogs(urlParameter, headParameter, logBytes);
                Attempt attempt = new Attempt(true, response.GetRequestId(), "", "", nowMs);
                this.batch.appendAttempt(attempt);
                this.successQueue.put(this.batch);
            }
            catch (Exception e) {
                String requestId = "";
                if (response != null) {
                    requestId = response.GetRequestId();
                }
                Attempt attempt = this.buildAttempt(e, nowMs, requestId);
                this.batch.appendAttempt(attempt);
                if (this.meetFailureCondition(e)) {
                    this.failureQueue.put(this.batch);
                }
                long retryBackoffMs = this.calculateRetryBackoffMs();
                this.batch.setNextRetryMs(System.currentTimeMillis() + retryBackoffMs);
                try {
                    this.retryQueue.put(this.batch);
                }
                catch (IllegalStateException e1) {
                    if (!this.retryQueue.isClosed()) break block6;
                    this.failureQueue.put(this.batch);
                }
            }
        }
    }

    private Attempt buildAttempt(Exception e, long nowMs, String requestId) {
        if (e instanceof LogException) {
            LogException logException = (LogException)e;
            return new Attempt(false, logException.GetRequestId(), logException.GetErrorCode(), logException.GetErrorMessage(), nowMs);
        }
        return new Attempt(false, "", "BadResponse", e.getMessage(), nowMs);
    }

    private boolean meetFailureCondition(Exception e) {
        if (!this.isRetrievableException(e)) {
            return true;
        }
        if (this.retryQueue.isClosed()) {
            return true;
        }
        return this.batch.getRetries() >= this.producerConfig.getRetries() && this.failureQueue.size() <= this.batchCount.get() / 2;
    }

    private boolean isRetrievableException(Exception e) {
        if (e instanceof LogException) {
            LogException logException = (LogException)e;
            return logException.GetErrorCode().equals("SendFailed") || logException.GetErrorCode().equals("SpeedQuotaExceed") || logException.GetErrorCode().equals("BadResponse");
        }
        return false;
    }

    private long calculateRetryBackoffMs() {
        int retry = this.batch.getRetries();
        long retryBackoffMs = this.producerConfig.getBaseRetryBackoffMs() * LongMath.pow(2L, retry);
        if (retryBackoffMs <= 0L) {
            retryBackoffMs = this.producerConfig.getMaxRetryBackoffMs();
        }
        return Math.min(retryBackoffMs, this.producerConfig.getMaxRetryBackoffMs());
    }
}

