package com.tencent.polaris.ratelimit.client.flow;

import com.tencent.polaris.api.plugin.ratelimiter.RemoteQuotaInfo;
import com.tencent.polaris.api.utils.CollectionUtils;
import com.tencent.polaris.logging.LoggerFactory;
import com.tencent.polaris.ratelimit.client.flow.RateLimitWindow;
import com.tencent.polaris.ratelimit.client.pb.RateLimitGRPCV2Grpc;
import com.tencent.polaris.ratelimit.client.pb.RatelimitV2;
import com.tencent.polaris.ratelimit.client.utils.RateLimitConstants;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;

/* loaded from: input_file:com/tencent/polaris/ratelimit/client/flow/StreamResource.class */
public class StreamResource implements StreamObserver<RatelimitV2.RateLimitResponse> {
    private static final Logger LOG = LoggerFactory.getLogger(StreamResource.class);
    private final HostIdentifier hostIdentifier;
    private final ManagedChannel channel;
    private final StreamObserver<RatelimitV2.RateLimitRequest> streamClient;
    private final RateLimitGRPCV2Grpc.RateLimitGRPCV2BlockingStub client;
    private int clientKey;
    private final AtomicBoolean endStream = new AtomicBoolean(false);
    private final AtomicLong lastConnectFailTimeMilli = new AtomicLong(0);
    private final Map<ServiceIdentifier, InitializeRecord> initRecord = new ConcurrentHashMap();
    private final Map<Integer, DurationBaseCallback> counters = new ConcurrentHashMap();
    private final AtomicLong lastRecvTime = new AtomicLong(0);
    private final AtomicLong timeDiffMilli = new AtomicLong();
    private final AtomicLong lastSyncTimeMilli = new AtomicLong();
    private final AtomicLong syncInterval = new AtomicLong(RateLimitConstants.TIME_ADJUST_INTERVAL_MS);

    public StreamResource(HostIdentifier hostIdentifier) {
        this.channel = createConnection(hostIdentifier);
        this.hostIdentifier = hostIdentifier;
        this.streamClient = RateLimitGRPCV2Grpc.newStub(this.channel).service(this);
        this.client = RateLimitGRPCV2Grpc.newBlockingStub(this.channel);
    }

    private ManagedChannel createConnection(HostIdentifier hostIdentifier) {
        return ManagedChannelBuilder.forAddress(hostIdentifier.getHost(), hostIdentifier.getPort()).usePlaintext().build();
    }

    public HostIdentifier getHostIdentifier() {
        return this.hostIdentifier;
    }

    public void closeStream(boolean z) {
        if (this.endStream.compareAndSet(false, true)) {
            if (z && null != this.streamClient) {
                LOG.info("[ServerConnector]connection {} start to closeSend", this.hostIdentifier);
                this.streamClient.onCompleted();
            }
            if (null != this.channel) {
                this.channel.shutdown();
            }
        }
    }

    public void onNext(RatelimitV2.RateLimitResponse rateLimitResponse) {
        LOG.debug("ratelimit response receive is {}", rateLimitResponse);
        this.lastRecvTime.set(System.currentTimeMillis());
        if (RatelimitV2.RateLimitCmd.INIT.equals(rateLimitResponse.getCmd())) {
            handleRateLimitInitResponse(rateLimitResponse.getRateLimitInitResponse());
        } else if (RatelimitV2.RateLimitCmd.ACQUIRE.equals(rateLimitResponse.getCmd())) {
            handleRateLimitReportResponse(rateLimitResponse.getRateLimitReportResponse());
        }
    }

    public void onError(Throwable th) {
        LOG.error("received error from server {}", this.hostIdentifier, th);
        this.lastConnectFailTimeMilli.set(System.currentTimeMillis());
        closeStream(false);
    }

    public void onCompleted() {
        LOG.error("received EOF from server {}", this.hostIdentifier);
        closeStream(true);
    }

    public InitializeRecord addInitRecord(ServiceIdentifier serviceIdentifier, RateLimitWindow rateLimitWindow) {
        if (!this.initRecord.containsKey(serviceIdentifier)) {
            LOG.info("[RateLimit] add init record for {}, stream is {}", serviceIdentifier, this.hostIdentifier);
            this.initRecord.putIfAbsent(serviceIdentifier, new InitializeRecord(rateLimitWindow));
        }
        return this.initRecord.get(serviceIdentifier);
    }

    public void deleteInitRecord(ServiceIdentifier serviceIdentifier) {
        LOG.info("[RateLimit] delete init record for {}, stream is {}", serviceIdentifier, this.hostIdentifier);
        this.initRecord.remove(serviceIdentifier);
    }

    private void handleRateLimitInitResponse(RatelimitV2.RateLimitInitResponse rateLimitInitResponse) {
        LOG.debug("[handleRateLimitInitResponse] response:{}", rateLimitInitResponse);
        if (rateLimitInitResponse.getCode() != 200000) {
            LOG.error("[handleRateLimitInitResponse] failed. code is {}", Integer.valueOf(rateLimitInitResponse.getCode()));
            return;
        }
        RatelimitV2.LimitTarget target = rateLimitInitResponse.getTarget();
        ServiceIdentifier serviceIdentifier = new ServiceIdentifier(target.getService(), target.getNamespace(), target.getLabels());
        InitializeRecord initializeRecord = this.initRecord.get(serviceIdentifier);
        if (initializeRecord == null) {
            LOG.error("[handleRateLimitInitResponse] can not find init record:{}", serviceIdentifier);
            return;
        }
        setClientKey(rateLimitInitResponse.getClientKey());
        List countersList = rateLimitInitResponse.getCountersList();
        if (CollectionUtils.isEmpty(countersList)) {
            LOG.error("[handleRateLimitInitResponse] countersList is empty.");
            return;
        }
        initializeRecord.getDurationRecord().clear();
        long localTimeMilli = getLocalTimeMilli(rateLimitInitResponse.getTimestamp());
        RateLimitWindow rateLimitWindow = initializeRecord.getRateLimitWindow();
        countersList.forEach(quotaCounter -> {
            initializeRecord.getDurationRecord().putIfAbsent(Integer.valueOf(quotaCounter.getDuration()), Integer.valueOf(quotaCounter.getCounterKey()));
            this.counters.putIfAbsent(Integer.valueOf(quotaCounter.getCounterKey()), new DurationBaseCallback(quotaCounter.getDuration(), rateLimitWindow));
            rateLimitWindow.getAllocatingBucket().onRemoteUpdate(new RemoteQuotaInfo(quotaCounter.getLeft(), quotaCounter.getClientCount(), localTimeMilli, quotaCounter.getDuration() * 1000));
        });
        LOG.info("[RateLimit] window {} has turn to initialized", rateLimitWindow.getUniqueKey());
        rateLimitWindow.setStatus(RateLimitWindow.WindowStatus.INITIALIZED.ordinal());
    }

    void handleRateLimitReportResponse(RatelimitV2.RateLimitReportResponse rateLimitReportResponse) {
        LOG.debug("[handleRateLimitReportRequest] response:{}", rateLimitReportResponse);
        if (rateLimitReportResponse.getCode() != 200000) {
            LOG.error("[handleRateLimitReportRequest] failed. code is {}", Integer.valueOf(rateLimitReportResponse.getCode()));
            return;
        }
        List quotaLeftsList = rateLimitReportResponse.getQuotaLeftsList();
        if (CollectionUtils.isEmpty(quotaLeftsList)) {
            LOG.error("[handleRateLimitReportRequest] quotaLefts is empty.");
        } else {
            long localTimeMilli = getLocalTimeMilli(rateLimitReportResponse.getTimestamp());
            quotaLeftsList.forEach(quotaLeft -> {
                this.counters.get(Integer.valueOf(quotaLeft.getCounterKey())).getRateLimitWindow().getAllocatingBucket().onRemoteUpdate(new RemoteQuotaInfo(quotaLeft.getLeft(), quotaLeft.getClientCount(), localTimeMilli, r0.getDuration() * 1000));
            });
        }
    }

    public int getClientKey() {
        return this.clientKey;
    }

    public void setClientKey(int i) {
        this.clientKey = i;
    }

    private long getLocalTimeMilli(long j) {
        return j - this.timeDiffMilli.get();
    }

    public long getRemoteTimeMilli(long j) {
        return j + this.timeDiffMilli.get();
    }

    public void adjustTime() {
        long j = this.lastSyncTimeMilli.get();
        long currentTimeMillis = System.currentTimeMillis();
        if (j > 0 && currentTimeMillis - j < this.syncInterval.get()) {
            LOG.debug("[RateLimit] adjustTime need wait.lastSyncTimeMilli:{},sendTimeMilli:{}", this.lastSyncTimeMilli, Long.valueOf(currentTimeMillis));
            return;
        }
        long currentTimeMillis2 = System.currentTimeMillis();
        try {
            RatelimitV2.TimeAdjustResponse timeAdjust = this.client.timeAdjust(RatelimitV2.TimeAdjustRequest.newBuilder().build());
            long currentTimeMillis3 = System.currentTimeMillis();
            this.lastSyncTimeMilli.set(currentTimeMillis3);
            long serverTimestamp = timeAdjust.getServerTimestamp();
            long j2 = currentTimeMillis3 - currentTimeMillis2;
            long j3 = (serverTimestamp + (j2 / 3)) - currentTimeMillis3;
            if (this.timeDiffMilli.get() == j3 && this.syncInterval.get() < RateLimitConstants.MAX_TIME_ADJUST_INTERVAL_MS) {
                this.syncInterval.set(this.syncInterval.get() + RateLimitConstants.TIME_ADJUST_INTERVAL_MS);
            }
            this.timeDiffMilli.set(j3);
            LOG.info("[RateLimit] adjust time to server time is {}, latency is {},diff is {}", new Object[]{Long.valueOf(serverTimestamp), Long.valueOf(j2), Long.valueOf(j3)});
        } catch (Throwable th) {
            LOG.error("[RateLimit] fail to adjust time, err {}", th.getMessage());
            onError(th);
        }
    }

    public boolean isEndStream() {
        return this.endStream.get();
    }

    public boolean sendRateLimitRequest(RatelimitV2.RateLimitRequest rateLimitRequest) {
        LOG.debug("ratelimit request to send is {}", rateLimitRequest);
        try {
            this.streamClient.onNext(rateLimitRequest);
            return true;
        } catch (Throwable th) {
            LOG.error("[RateLimit] fail to send request, err {}", th.getMessage());
            onError(th);
            return false;
        }
    }

    public boolean hasInit(ServiceIdentifier serviceIdentifier) {
        return this.initRecord.containsKey(serviceIdentifier);
    }

    public InitializeRecord getInitRecord(ServiceIdentifier serviceIdentifier) {
        return this.initRecord.get(serviceIdentifier);
    }

    public Integer getCounterKey(ServiceIdentifier serviceIdentifier, Integer num) {
        InitializeRecord initializeRecord = this.initRecord.get(serviceIdentifier);
        if (null == initializeRecord) {
            return null;
        }
        return initializeRecord.getDurationRecord().get(num);
    }
}
