package com.alibaba.nacos.naming.push;

import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.api.remote.PushCallBack;
import com.alibaba.nacos.common.utils.JacksonUtils;
import com.alibaba.nacos.naming.constants.Constants;
import com.alibaba.nacos.naming.core.Service;
import com.alibaba.nacos.naming.core.v2.upgrade.UpgradeJudgement;
import com.alibaba.nacos.naming.misc.GlobalExecutor;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.misc.SwitchDomain;
import com.alibaba.nacos.naming.misc.UtilsAndCommons;
import com.alibaba.nacos.naming.monitor.MetricsMonitor;
import com.alibaba.nacos.naming.pojo.Subscriber;
import com.alibaba.nacos.naming.push.v1.ClientInfo;
import com.alibaba.nacos.naming.push.v1.NamingSubscriberServiceV1Impl;
import com.alibaba.nacos.naming.push.v1.PushClient;
import com.alibaba.nacos.naming.push.v1.ServiceChangeEvent;
import com.alibaba.nacos.naming.remote.udp.AckEntry;
import com.alibaba.nacos.naming.remote.udp.AckPacket;
import com.alibaba.nacos.naming.remote.udp.UdpConnector;
import com.alibaba.nacos.sys.utils.ApplicationUtils;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetSocketAddress;
import java.net.SocketException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.zip.GZIPOutputStream;
import org.apache.commons.collections.MapUtils;
import org.codehaus.jackson.util.VersionUtil;
import org.javatuples.Pair;
import org.slf4j.Logger;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:com/alibaba/nacos/naming/push/UdpPushService.class */
public class UdpPushService implements ApplicationContextAware, ApplicationListener<ServiceChangeEvent> {

    @Autowired
    private SwitchDomain switchDomain;

    @Autowired
    private NamingSubscriberServiceV1Impl subscriberServiceV1;
    private ApplicationContext applicationContext;
    private static DatagramSocket udpSocket;
    private final UdpConnector udpConnector;
    private static volatile ConcurrentMap<String, AckEntry> ackMap = new ConcurrentHashMap();
    private static volatile ConcurrentMap<String, Long> udpSendTimeMap = new ConcurrentHashMap();
    private static ConcurrentMap<String, Future> futureMap = new ConcurrentHashMap();

    /* loaded from: input_file:com/alibaba/nacos/naming/push/UdpPushService$Receiver.class */
    public static class Receiver implements Runnable {
        @Override // java.lang.Runnable
        public void run() {
            String trim;
            String hostAddress;
            int port;
            String ackKey;
            while (true) {
                byte[] bArr = new byte[65536];
                DatagramPacket datagramPacket = new DatagramPacket(bArr, bArr.length);
                try {
                    UdpPushService.udpSocket.receive(datagramPacket);
                    trim = new String(datagramPacket.getData(), 0, datagramPacket.getLength(), StandardCharsets.UTF_8).trim();
                    AckPacket ackPacket = (AckPacket) JacksonUtils.toObj(trim, AckPacket.class);
                    InetSocketAddress inetSocketAddress = (InetSocketAddress) datagramPacket.getSocketAddress();
                    hostAddress = inetSocketAddress.getAddress().getHostAddress();
                    port = inetSocketAddress.getPort();
                    if (System.nanoTime() - ackPacket.lastRefTime > Constants.ACK_TIMEOUT_NANOS) {
                        Loggers.PUSH.warn("ack takes too long from {} ack json: {}", datagramPacket.getSocketAddress(), trim);
                    }
                    ackKey = AckEntry.getAckKey(hostAddress, port, ackPacket.lastRefTime);
                } catch (Throwable th) {
                    Loggers.PUSH.error("[NACOS-PUSH] error while receiving ack data", th);
                }
                if (((AckEntry) UdpPushService.ackMap.remove(ackKey)) == null) {
                    throw new IllegalStateException("unable to find ackEntry for key: " + ackKey + ", ack json: " + trim);
                }
                long currentTimeMillis = System.currentTimeMillis() - ((Long) UdpPushService.udpSendTimeMap.get(ackKey)).longValue();
                Loggers.PUSH.info("received ack: {} from: {}:{}, cost: {} ms, unacked: {}, total push: {}", new Object[]{trim, hostAddress, Integer.valueOf(port), Long.valueOf(currentTimeMillis), Integer.valueOf(UdpPushService.ackMap.size()), Integer.valueOf(MetricsMonitor.getTotalPushMonitor().get())});
                MetricsMonitor.incrementPushCost(currentTimeMillis);
                UdpPushService.udpSendTimeMap.remove(ackKey);
            }
        }
    }

    /* loaded from: input_file:com/alibaba/nacos/naming/push/UdpPushService$Retransmitter.class */
    public static class Retransmitter implements Runnable {
        AckEntry ackEntry;

        public Retransmitter(AckEntry ackEntry) {
            this.ackEntry = ackEntry;
        }

        @Override // java.lang.Runnable
        public void run() {
            if (UdpPushService.ackMap.containsKey(this.ackEntry.getKey())) {
                Loggers.PUSH.info("retry to push data, key: " + this.ackEntry.getKey());
                UdpPushService.udpPush(this.ackEntry);
            }
        }
    }

    public UdpPushService(UdpConnector udpConnector) {
        this.udpConnector = udpConnector;
    }

    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }

    public void onApplicationEvent(ServiceChangeEvent serviceChangeEvent) {
        if (((UpgradeJudgement) ApplicationUtils.getBean(UpgradeJudgement.class)).isUseGrpcFeatures()) {
            return;
        }
        Service service = serviceChangeEvent.getService();
        String name = service.getName();
        String namespaceId = service.getNamespaceId();
        if (futureMap.containsKey(UtilsAndCommons.assembleFullServiceName(namespaceId, name))) {
            return;
        }
        futureMap.put(UtilsAndCommons.assembleFullServiceName(namespaceId, name), GlobalExecutor.scheduleUdpSender(() -> {
            AckEntry prepareAckEntry;
            try {
                try {
                    Loggers.PUSH.info(name + " is changed, add it to push queue.");
                    ConcurrentMap<String, PushClient> concurrentMap = this.subscriberServiceV1.getClientMap().get(UtilsAndCommons.assembleFullServiceName(namespaceId, name));
                    if (MapUtils.isEmpty(concurrentMap)) {
                        futureMap.remove(UtilsAndCommons.assembleFullServiceName(namespaceId, name));
                        return;
                    }
                    HashMap hashMap = new HashMap(16);
                    long nanoTime = System.nanoTime();
                    for (PushClient pushClient : concurrentMap.values()) {
                        if (pushClient.zombie()) {
                            Loggers.PUSH.debug("client is zombie: " + pushClient);
                            concurrentMap.remove(pushClient.toString());
                            Loggers.PUSH.debug("client is zombie: " + pushClient);
                        } else {
                            Loggers.PUSH.debug("push serviceName: {} to client: {}", name, pushClient);
                            String pushCacheKey = getPushCacheKey(name, pushClient.getIp(), pushClient.getAgent());
                            byte[] bArr = null;
                            Map map = null;
                            if (this.switchDomain.getDefaultPushCacheMillis() >= 20000 && hashMap.containsKey(pushCacheKey)) {
                                Pair pair = (Pair) hashMap.get(pushCacheKey);
                                bArr = (byte[]) pair.getValue0();
                                map = (Map) pair.getValue1();
                                Loggers.PUSH.debug("[PUSH-CACHE] cache hit: {}:{}", name, pushClient.getAddrStr());
                            }
                            if (bArr != null) {
                                prepareAckEntry = prepareAckEntry(pushClient, bArr, (Map<String, Object>) map, nanoTime);
                            } else {
                                prepareAckEntry = prepareAckEntry(pushClient, prepareHostsData(pushClient), nanoTime);
                                if (prepareAckEntry != null) {
                                    hashMap.put(pushCacheKey, new Pair(prepareAckEntry.getOrigin().getData(), prepareAckEntry.getData()));
                                }
                            }
                            Logger logger = Loggers.PUSH;
                            Object[] objArr = new Object[4];
                            objArr[0] = pushClient.getServiceName();
                            objArr[1] = pushClient.getAddrStr();
                            objArr[2] = pushClient.getAgent();
                            objArr[3] = prepareAckEntry == null ? null : prepareAckEntry.getKey();
                            logger.info("serviceName: {} changed, schedule push for: {}, agent: {}, key: {}", objArr);
                            udpPush(prepareAckEntry);
                        }
                    }
                    futureMap.remove(UtilsAndCommons.assembleFullServiceName(namespaceId, name));
                } catch (Exception e) {
                    Loggers.PUSH.error("[NACOS-PUSH] failed to push serviceName: {} to client, error: {}", name, e);
                    futureMap.remove(UtilsAndCommons.assembleFullServiceName(namespaceId, name));
                }
            } catch (Throwable th) {
                futureMap.remove(UtilsAndCommons.assembleFullServiceName(namespaceId, name));
                throw th;
            }
        }, 1000L, TimeUnit.MILLISECONDS));
    }

    public void pushDataWithoutCallback(Subscriber subscriber, ServiceInfo serviceInfo) {
        String serviceName = subscriber.getServiceName();
        try {
            Loggers.PUSH.info(serviceName + " is changed, add it to push queue.");
            AckEntry prepareAckEntry = prepareAckEntry(subscriber, serviceInfo);
            Logger logger = Loggers.PUSH;
            Object[] objArr = new Object[4];
            objArr[0] = serviceInfo;
            objArr[1] = subscriber.getAddrStr();
            objArr[2] = subscriber.getAgent();
            objArr[3] = prepareAckEntry == null ? null : prepareAckEntry.getKey();
            logger.info("serviceName: {} changed, schedule push for: {}, agent: {}, key: {}", objArr);
            this.udpConnector.sendData(prepareAckEntry);
        } catch (Exception e) {
            Loggers.PUSH.error("[NACOS-PUSH] failed to push serviceName: {} to client, error: {}", serviceName, e);
        }
    }

    public void pushDataWithCallback(Subscriber subscriber, ServiceInfo serviceInfo, PushCallBack pushCallBack) {
        String serviceName = subscriber.getServiceName();
        try {
            Loggers.PUSH.info(serviceName + " is changed, add it to push queue.");
            AckEntry prepareAckEntry = prepareAckEntry(subscriber, serviceInfo);
            Logger logger = Loggers.PUSH;
            Object[] objArr = new Object[4];
            objArr[0] = serviceInfo;
            objArr[1] = subscriber.getAddrStr();
            objArr[2] = subscriber.getAgent();
            objArr[3] = prepareAckEntry == null ? null : prepareAckEntry.getKey();
            logger.info("serviceName: {} changed, schedule push for: {}, agent: {}, key: {}", objArr);
            this.udpConnector.sendDataWithCallback(prepareAckEntry, pushCallBack);
        } catch (Exception e) {
            Loggers.PUSH.error("[NACOS-PUSH] failed to push serviceName: {} to client, error: {}", serviceName, e);
        }
    }

    private AckEntry prepareAckEntry(Subscriber subscriber, ServiceInfo serviceInfo) {
        return prepareAckEntry(new InetSocketAddress(subscriber.getIp(), subscriber.getPort()), prepareHostsData(JacksonUtils.toJson(serviceInfo)), System.nanoTime());
    }

    private static AckEntry prepareAckEntry(PushClient pushClient, Map<String, Object> map, long j) {
        return prepareAckEntry(pushClient.getSocketAddr(), map, j);
    }

    private static AckEntry prepareAckEntry(InetSocketAddress inetSocketAddress, Map<String, Object> map, long j) {
        if (MapUtils.isEmpty(map)) {
            Loggers.PUSH.error("[NACOS-PUSH] pushing empty data for client is not allowed: {}", inetSocketAddress);
            return null;
        }
        map.put("lastRefTime", Long.valueOf(j));
        try {
            return prepareAckEntry(inetSocketAddress, compressIfNecessary(JacksonUtils.toJson(map).getBytes(StandardCharsets.UTF_8)), map, j);
        } catch (Exception e) {
            Loggers.PUSH.error("[NACOS-PUSH] failed to compress data: {} to client: {}, error: {}", new Object[]{map, inetSocketAddress, e});
            return null;
        }
    }

    private static AckEntry prepareAckEntry(PushClient pushClient, byte[] bArr, Map<String, Object> map, long j) {
        return prepareAckEntry(pushClient.getSocketAddr(), bArr, map, j);
    }

    private static AckEntry prepareAckEntry(InetSocketAddress inetSocketAddress, byte[] bArr, Map<String, Object> map, long j) {
        try {
            AckEntry ackEntry = new AckEntry(AckEntry.getAckKey(inetSocketAddress.getAddress().getHostAddress(), inetSocketAddress.getPort(), j), new DatagramPacket(bArr, bArr.length, inetSocketAddress));
            ackEntry.setData(map);
            return ackEntry;
        } catch (Exception e) {
            Loggers.PUSH.error("[NACOS-PUSH] failed to prepare data: {} to client: {}, error: {}", new Object[]{map, inetSocketAddress, e});
            return null;
        }
    }

    public static String getPushCacheKey(String str, String str2, String str3) {
        return str + UtilsAndCommons.CACHE_KEY_SPLITTER + str3;
    }

    public void serviceChanged(Service service) {
        this.applicationContext.publishEvent(new ServiceChangeEvent(this, service));
    }

    public boolean canEnablePush(String str) {
        if (!this.switchDomain.isPushEnabled()) {
            return false;
        }
        ClientInfo clientInfo = new ClientInfo(str);
        if (ClientInfo.ClientType.JAVA == clientInfo.type && clientInfo.version.compareTo(VersionUtil.parseVersion(this.switchDomain.getPushJavaVersion())) >= 0) {
            return true;
        }
        if (ClientInfo.ClientType.DNS == clientInfo.type && clientInfo.version.compareTo(VersionUtil.parseVersion(this.switchDomain.getPushPythonVersion())) >= 0) {
            return true;
        }
        if (ClientInfo.ClientType.C == clientInfo.type && clientInfo.version.compareTo(VersionUtil.parseVersion(this.switchDomain.getPushCVersion())) >= 0) {
            return true;
        }
        if (ClientInfo.ClientType.GO != clientInfo.type || clientInfo.version.compareTo(VersionUtil.parseVersion(this.switchDomain.getPushGoVersion())) < 0) {
            return ClientInfo.ClientType.CSHARP == clientInfo.type && clientInfo.version.compareTo(VersionUtil.parseVersion(this.switchDomain.getPushCSharpVersion())) >= 0;
        }
        return true;
    }

    public static List<AckEntry> getFailedPushes() {
        return new ArrayList(ackMap.values());
    }

    public static void resetPushState() {
        ackMap.clear();
    }

    private static byte[] compressIfNecessary(byte[] bArr) throws IOException {
        if (bArr.length < 1024) {
            return bArr;
        }
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        GZIPOutputStream gZIPOutputStream = new GZIPOutputStream(byteArrayOutputStream);
        gZIPOutputStream.write(bArr);
        gZIPOutputStream.close();
        return byteArrayOutputStream.toByteArray();
    }

    private static Map<String, Object> prepareHostsData(PushClient pushClient) throws Exception {
        return prepareHostsData(pushClient.getDataSource().getData(pushClient));
    }

    private static Map<String, Object> prepareHostsData(String str) {
        HashMap hashMap = new HashMap(2);
        hashMap.put("type", "dom");
        hashMap.put("data", str);
        return hashMap;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static AckEntry udpPush(AckEntry ackEntry) {
        if (ackEntry == null) {
            Loggers.PUSH.error("[NACOS-PUSH] ackEntry is null.");
            return null;
        }
        if (ackEntry.getRetryTimes() > 1) {
            Loggers.PUSH.warn("max re-push times reached, retry times {}, key: {}", Integer.valueOf(ackEntry.getRetryTimes()), ackEntry.getKey());
            ackMap.remove(ackEntry.getKey());
            udpSendTimeMap.remove(ackEntry.getKey());
            MetricsMonitor.incrementFailPush();
            return ackEntry;
        }
        try {
            if (!ackMap.containsKey(ackEntry.getKey())) {
                MetricsMonitor.incrementPush();
            }
            ackMap.put(ackEntry.getKey(), ackEntry);
            udpSendTimeMap.put(ackEntry.getKey(), Long.valueOf(System.currentTimeMillis()));
            Loggers.PUSH.info("send udp packet: " + ackEntry.getKey());
            udpSocket.send(ackEntry.getOrigin());
            ackEntry.increaseRetryTime();
            GlobalExecutor.scheduleRetransmitter(new Retransmitter(ackEntry), TimeUnit.NANOSECONDS.toMillis(Constants.ACK_TIMEOUT_NANOS), TimeUnit.MILLISECONDS);
            return ackEntry;
        } catch (Exception e) {
            Loggers.PUSH.error("[NACOS-PUSH] failed to push data: {} to client: {}, error: {}", new Object[]{ackEntry.getData(), ackEntry.getOrigin().getAddress().getHostAddress(), e});
            ackMap.remove(ackEntry.getKey());
            udpSendTimeMap.remove(ackEntry.getKey());
            MetricsMonitor.incrementFailPush();
            return null;
        }
    }

    static {
        try {
            udpSocket = new DatagramSocket();
            Thread thread = new Thread(new Receiver());
            thread.setDaemon(true);
            thread.setName("com.alibaba.nacos.naming.push.receiver");
            thread.start();
        } catch (SocketException e) {
            Loggers.SRV_LOG.error("[NACOS-PUSH] failed to init push service");
        }
    }
}
