package com.alibaba.nacos.naming.push;

import com.alibaba.nacos.api.naming.utils.NamingUtils;
import com.alibaba.nacos.common.utils.JacksonUtils;
import com.alibaba.nacos.naming.core.Service;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.misc.SwitchDomain;
import com.alibaba.nacos.naming.misc.UtilsAndCommons;
import com.alibaba.nacos.naming.pojo.Subscriber;
import com.alibaba.nacos.naming.push.ClientInfo;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetSocketAddress;
import java.net.SocketException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.GZIPOutputStream;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.codehaus.jackson.util.VersionUtil;
import org.javatuples.Pair;
import org.slf4j.Logger;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:com/alibaba/nacos/naming/push/PushService.class */
public class PushService implements ApplicationContextAware, ApplicationListener<ServiceChangeEvent> {

    @Autowired
    private SwitchDomain switchDomain;
    private ApplicationContext applicationContext;
    private static final int MAX_RETRY_TIMES = 1;
    private static DatagramSocket udpSocket;
    private static final long ACK_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(10);
    private static volatile ConcurrentMap<String, Receiver.AckEntry> ackMap = new ConcurrentHashMap();
    private static ConcurrentMap<String, ConcurrentMap<String, PushClient>> clientMap = new ConcurrentHashMap();
    private static volatile ConcurrentHashMap<String, Long> udpSendTimeMap = new ConcurrentHashMap<>();
    public static volatile ConcurrentHashMap<String, Long> pushCostMap = new ConcurrentHashMap<>();
    private static int totalPush = 0;
    private static int failedPush = 0;
    private static ConcurrentHashMap<String, Long> lastPushMillisMap = new ConcurrentHashMap<>();
    private static Map<String, Future> futureMap = new ConcurrentHashMap();
    private static ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { // from class: com.alibaba.nacos.naming.push.PushService.1
        @Override // java.util.concurrent.ThreadFactory
        public Thread newThread(Runnable runnable) {
            Thread thread = new Thread(runnable);
            thread.setDaemon(true);
            thread.setName("com.alibaba.nacos.naming.push.retransmitter");
            return thread;
        }
    });
    private static ScheduledExecutorService udpSender = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { // from class: com.alibaba.nacos.naming.push.PushService.2
        @Override // java.util.concurrent.ThreadFactory
        public Thread newThread(Runnable runnable) {
            Thread thread = new Thread(runnable);
            thread.setDaemon(true);
            thread.setName("com.alibaba.nacos.naming.push.udpSender");
            return thread;
        }
    });

    /* loaded from: input_file:com/alibaba/nacos/naming/push/PushService$PushClient.class */
    public class PushClient {
        private String namespaceId;
        private String serviceName;
        private String clusters;
        private String agent;
        private String tenant;
        private String app;
        private InetSocketAddress socketAddr;
        private DataSource dataSource;
        private Map<String, String[]> params;
        public long lastRefTime = System.currentTimeMillis();

        public Map<String, String[]> getParams() {
            return this.params;
        }

        public void setParams(Map<String, String[]> map) {
            this.params = map;
        }

        public PushClient(String str, String str2, String str3, String str4, InetSocketAddress inetSocketAddress, DataSource dataSource, String str5, String str6) {
            this.namespaceId = str;
            this.serviceName = str2;
            this.clusters = str3;
            this.agent = str4;
            this.socketAddr = inetSocketAddress;
            this.dataSource = dataSource;
            this.tenant = str5;
            this.app = str6;
        }

        public DataSource getDataSource() {
            return this.dataSource;
        }

        public boolean zombie() {
            return System.currentTimeMillis() - this.lastRefTime > PushService.this.switchDomain.getPushCacheMillis(this.serviceName);
        }

        public String toString() {
            StringBuilder sb = new StringBuilder();
            sb.append("serviceName: ").append(this.serviceName).append(", clusters: ").append(this.clusters).append(", address: ").append(this.socketAddr).append(", agent: ").append(this.agent);
            return sb.toString();
        }

        public String getAgent() {
            return this.agent;
        }

        public String getAddrStr() {
            return this.socketAddr.getAddress().getHostAddress() + UtilsAndCommons.IP_PORT_SPLITER + this.socketAddr.getPort();
        }

        public String getIp() {
            return this.socketAddr.getAddress().getHostAddress();
        }

        public int hashCode() {
            return Objects.hash(this.serviceName, this.clusters, this.socketAddr);
        }

        public boolean equals(Object obj) {
            if (!(obj instanceof PushClient)) {
                return false;
            }
            PushClient pushClient = (PushClient) obj;
            return this.serviceName.equals(pushClient.serviceName) && this.clusters.equals(pushClient.clusters) && this.socketAddr.equals(pushClient.socketAddr);
        }

        public String getClusters() {
            return this.clusters;
        }

        public void setClusters(String str) {
            this.clusters = str;
        }

        public String getNamespaceId() {
            return this.namespaceId;
        }

        public void setNamespaceId(String str) {
            this.namespaceId = str;
        }

        public String getServiceName() {
            return this.serviceName;
        }

        public void setServiceName(String str) {
            this.serviceName = str;
        }

        public String getTenant() {
            return this.tenant;
        }

        public void setTenant(String str) {
            this.tenant = str;
        }

        public String getApp() {
            return this.app;
        }

        public void setApp(String str) {
            this.app = str;
        }

        public InetSocketAddress getSocketAddr() {
            return this.socketAddr;
        }

        public void refresh() {
            this.lastRefTime = System.currentTimeMillis();
        }
    }

    /* loaded from: input_file:com/alibaba/nacos/naming/push/PushService$Receiver.class */
    public static class Receiver implements Runnable {

        /* loaded from: input_file:com/alibaba/nacos/naming/push/PushService$Receiver$AckEntry.class */
        public static class AckEntry {
            public String key;
            public DatagramPacket origin;
            private AtomicInteger retryTimes = new AtomicInteger(0);
            public Map<String, Object> data;

            public AckEntry(String str, DatagramPacket datagramPacket) {
                this.key = str;
                this.origin = datagramPacket;
            }

            public void increaseRetryTime() {
                this.retryTimes.incrementAndGet();
            }

            public int getRetryTimes() {
                return this.retryTimes.get();
            }
        }

        /* loaded from: input_file:com/alibaba/nacos/naming/push/PushService$Receiver$AckPacket.class */
        public static class AckPacket {
            public String type;
            public long lastRefTime;
            public String data;
        }

        @Override // java.lang.Runnable
        public void run() {
            String trim;
            String hostAddress;
            int port;
            String aCKKey;
            while (true) {
                byte[] bArr = new byte[65536];
                DatagramPacket datagramPacket = new DatagramPacket(bArr, bArr.length);
                try {
                    PushService.udpSocket.receive(datagramPacket);
                    trim = new String(datagramPacket.getData(), 0, datagramPacket.getLength(), StandardCharsets.UTF_8).trim();
                    AckPacket ackPacket = (AckPacket) JacksonUtils.toObj(trim, AckPacket.class);
                    InetSocketAddress inetSocketAddress = (InetSocketAddress) datagramPacket.getSocketAddress();
                    hostAddress = inetSocketAddress.getAddress().getHostAddress();
                    port = inetSocketAddress.getPort();
                    if (System.nanoTime() - ackPacket.lastRefTime > PushService.ACK_TIMEOUT_NANOS) {
                        Loggers.PUSH.warn("ack takes too long from {} ack json: {}", datagramPacket.getSocketAddress(), trim);
                    }
                    aCKKey = PushService.getACKKey(hostAddress, port, ackPacket.lastRefTime);
                } catch (Throwable th) {
                    Loggers.PUSH.error("[NACOS-PUSH] error while receiving ack data", th);
                }
                if (((AckEntry) PushService.ackMap.remove(aCKKey)) == null) {
                    throw new IllegalStateException("unable to find ackEntry for key: " + aCKKey + ", ack json: " + trim);
                }
                long currentTimeMillis = System.currentTimeMillis() - ((Long) PushService.udpSendTimeMap.get(aCKKey)).longValue();
                Loggers.PUSH.info("received ack: {} from: {}:{}, cost: {} ms, unacked: {}, total push: {}", new Object[]{trim, hostAddress, Integer.valueOf(port), Long.valueOf(currentTimeMillis), Integer.valueOf(PushService.ackMap.size()), Integer.valueOf(PushService.totalPush)});
                PushService.pushCostMap.put(aCKKey, Long.valueOf(currentTimeMillis));
                PushService.udpSendTimeMap.remove(aCKKey);
            }
        }
    }

    /* loaded from: input_file:com/alibaba/nacos/naming/push/PushService$Retransmitter.class */
    public static class Retransmitter implements Runnable {
        Receiver.AckEntry ackEntry;

        public Retransmitter(Receiver.AckEntry ackEntry) {
            this.ackEntry = ackEntry;
        }

        @Override // java.lang.Runnable
        public void run() {
            if (PushService.ackMap.containsKey(this.ackEntry.key)) {
                Loggers.PUSH.info("retry to push data, key: " + this.ackEntry.key);
                PushService.udpPush(this.ackEntry);
            }
        }
    }

    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }

    public void onApplicationEvent(ServiceChangeEvent serviceChangeEvent) {
        Service service = serviceChangeEvent.getService();
        final String name = service.getName();
        final String namespaceId = service.getNamespaceId();
        futureMap.put(UtilsAndCommons.assembleFullServiceName(namespaceId, name), udpSender.schedule(new Runnable() { // from class: com.alibaba.nacos.naming.push.PushService.4
            @Override // java.lang.Runnable
            public void run() {
                Receiver.AckEntry prepareAckEntry;
                try {
                    try {
                        Loggers.PUSH.info(name + " is changed, add it to push queue.");
                        ConcurrentMap concurrentMap = (ConcurrentMap) PushService.clientMap.get(UtilsAndCommons.assembleFullServiceName(namespaceId, name));
                        if (MapUtils.isEmpty(concurrentMap)) {
                            PushService.futureMap.remove(UtilsAndCommons.assembleFullServiceName(namespaceId, name));
                            return;
                        }
                        HashMap hashMap = new HashMap(16);
                        long nanoTime = System.nanoTime();
                        for (PushClient pushClient : concurrentMap.values()) {
                            if (pushClient.zombie()) {
                                Loggers.PUSH.debug("client is zombie: " + pushClient.toString());
                                concurrentMap.remove(pushClient.toString());
                                Loggers.PUSH.debug("client is zombie: " + pushClient.toString());
                            } else {
                                Loggers.PUSH.debug("push serviceName: {} to client: {}", name, pushClient.toString());
                                String pushCacheKey = PushService.getPushCacheKey(name, pushClient.getIp(), pushClient.getAgent());
                                byte[] bArr = null;
                                Map map = null;
                                if (PushService.this.switchDomain.getDefaultPushCacheMillis() >= 20000 && hashMap.containsKey(pushCacheKey)) {
                                    Pair pair = (Pair) hashMap.get(pushCacheKey);
                                    bArr = (byte[]) pair.getValue0();
                                    map = (Map) pair.getValue1();
                                    Loggers.PUSH.debug("[PUSH-CACHE] cache hit: {}:{}", name, pushClient.getAddrStr());
                                }
                                if (bArr != null) {
                                    prepareAckEntry = PushService.prepareAckEntry(pushClient, bArr, map, nanoTime);
                                } else {
                                    prepareAckEntry = PushService.prepareAckEntry(pushClient, PushService.prepareHostsData(pushClient), nanoTime);
                                    if (prepareAckEntry != null) {
                                        hashMap.put(pushCacheKey, new Pair(prepareAckEntry.origin.getData(), prepareAckEntry.data));
                                    }
                                }
                                Logger logger = Loggers.PUSH;
                                Object[] objArr = new Object[4];
                                objArr[0] = pushClient.getServiceName();
                                objArr[PushService.MAX_RETRY_TIMES] = pushClient.getAddrStr();
                                objArr[2] = pushClient.getAgent();
                                objArr[3] = prepareAckEntry == null ? null : prepareAckEntry.key;
                                logger.info("serviceName: {} changed, schedule push for: {}, agent: {}, key: {}", objArr);
                                PushService.udpPush(prepareAckEntry);
                            }
                        }
                        PushService.futureMap.remove(UtilsAndCommons.assembleFullServiceName(namespaceId, name));
                    } catch (Exception e) {
                        Loggers.PUSH.error("[NACOS-PUSH] failed to push serviceName: {} to client, error: {}", name, e);
                        PushService.futureMap.remove(UtilsAndCommons.assembleFullServiceName(namespaceId, name));
                    }
                } catch (Throwable th) {
                    PushService.futureMap.remove(UtilsAndCommons.assembleFullServiceName(namespaceId, name));
                    throw th;
                }
            }
        }, 1000L, TimeUnit.MILLISECONDS));
    }

    public int getTotalPush() {
        return totalPush;
    }

    public void setTotalPush(int i) {
        totalPush = i;
    }

    public void addClient(String str, String str2, String str3, String str4, InetSocketAddress inetSocketAddress, DataSource dataSource, String str5, String str6) {
        addClient(new PushClient(str, str2, str3, str4, inetSocketAddress, dataSource, str5, str6));
    }

    public void addClient(PushClient pushClient) {
        String assembleFullServiceName = UtilsAndCommons.assembleFullServiceName(pushClient.getNamespaceId(), pushClient.getServiceName());
        ConcurrentMap<String, PushClient> concurrentMap = clientMap.get(assembleFullServiceName);
        if (concurrentMap == null) {
            clientMap.putIfAbsent(assembleFullServiceName, new ConcurrentHashMap(1024));
            concurrentMap = clientMap.get(assembleFullServiceName);
        }
        PushClient pushClient2 = concurrentMap.get(pushClient.toString());
        if (pushClient2 != null) {
            pushClient2.refresh();
            return;
        }
        PushClient putIfAbsent = concurrentMap.putIfAbsent(pushClient.toString(), pushClient);
        if (putIfAbsent != null) {
            Loggers.PUSH.warn("client: {} already associated with key {}", putIfAbsent.getAddrStr(), putIfAbsent.toString());
        }
        Loggers.PUSH.debug("client: {} added for serviceName: {}", pushClient.getAddrStr(), pushClient.getServiceName());
    }

    public List<Subscriber> getClients(String str, String str2) {
        ConcurrentMap<String, PushClient> concurrentMap = clientMap.get(UtilsAndCommons.assembleFullServiceName(str2, str));
        if (Objects.isNull(concurrentMap)) {
            return null;
        }
        ArrayList arrayList = new ArrayList();
        concurrentMap.forEach((str3, pushClient) -> {
            arrayList.add(new Subscriber(pushClient.getAddrStr(), pushClient.getAgent(), pushClient.getApp(), pushClient.getIp(), str2, str));
        });
        return arrayList;
    }

    public List<Subscriber> getClientsFuzzy(String str, String str2) {
        ArrayList arrayList = new ArrayList();
        clientMap.forEach((str3, concurrentMap) -> {
            String str3 = str3.split("##")[MAX_RETRY_TIMES];
            String groupName = NamingUtils.getGroupName(str3);
            String serviceName = NamingUtils.getServiceName(str3);
            if (!str3.startsWith(str2) || serviceName.indexOf(NamingUtils.getServiceName(str)) < 0 || groupName.indexOf(NamingUtils.getGroupName(str)) < 0) {
                return;
            }
            concurrentMap.forEach((str4, pushClient) -> {
                arrayList.add(new Subscriber(pushClient.getAddrStr(), pushClient.getAgent(), pushClient.getApp(), pushClient.getIp(), str2, str3));
            });
        });
        return arrayList;
    }

    public static void removeClientIfZombie() {
        int i = 0;
        Iterator<Map.Entry<String, ConcurrentMap<String, PushClient>>> it = clientMap.entrySet().iterator();
        while (it.hasNext()) {
            ConcurrentMap<String, PushClient> value = it.next().getValue();
            for (Map.Entry<String, PushClient> entry : value.entrySet()) {
                if (entry.getValue().zombie()) {
                    value.remove(entry.getKey());
                }
            }
            i += value.size();
        }
        if (Loggers.PUSH.isDebugEnabled()) {
            Loggers.PUSH.debug("[NACOS-PUSH] clientMap size: {}", Integer.valueOf(i));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Receiver.AckEntry prepareAckEntry(PushClient pushClient, byte[] bArr, Map<String, Object> map, long j) {
        try {
            Receiver.AckEntry ackEntry = new Receiver.AckEntry(getACKKey(pushClient.getSocketAddr().getAddress().getHostAddress(), pushClient.getSocketAddr().getPort(), j), new DatagramPacket(bArr, bArr.length, pushClient.socketAddr));
            ackEntry.data = map;
            return ackEntry;
        } catch (Exception e) {
            Loggers.PUSH.error("[NACOS-PUSH] failed to prepare data: {} to client: {}, error: {}", new Object[]{map, pushClient.getSocketAddr(), e});
            return null;
        }
    }

    public static String getPushCacheKey(String str, String str2, String str3) {
        return str + UtilsAndCommons.CACHE_KEY_SPLITER + str3;
    }

    public void serviceChanged(Service service) {
        if (futureMap.containsKey(UtilsAndCommons.assembleFullServiceName(service.getNamespaceId(), service.getName()))) {
            return;
        }
        this.applicationContext.publishEvent(new ServiceChangeEvent(this, service));
    }

    public boolean canEnablePush(String str) {
        if (!this.switchDomain.isPushEnabled()) {
            return false;
        }
        ClientInfo clientInfo = new ClientInfo(str);
        if (ClientInfo.ClientType.JAVA == clientInfo.type && clientInfo.version.compareTo(VersionUtil.parseVersion(this.switchDomain.getPushJavaVersion())) >= 0) {
            return true;
        }
        if (ClientInfo.ClientType.DNS == clientInfo.type && clientInfo.version.compareTo(VersionUtil.parseVersion(this.switchDomain.getPushPythonVersion())) >= 0) {
            return true;
        }
        if (ClientInfo.ClientType.C != clientInfo.type || clientInfo.version.compareTo(VersionUtil.parseVersion(this.switchDomain.getPushCVersion())) < 0) {
            return ClientInfo.ClientType.GO == clientInfo.type && clientInfo.version.compareTo(VersionUtil.parseVersion(this.switchDomain.getPushGoVersion())) >= 0;
        }
        return true;
    }

    public static List<Receiver.AckEntry> getFailedPushes() {
        return new ArrayList(ackMap.values());
    }

    public int getFailedPushCount() {
        return ackMap.size() + failedPush;
    }

    public void setFailedPush(int i) {
        failedPush = i;
    }

    public static void resetPushState() {
        ackMap.clear();
    }

    private static byte[] compressIfNecessary(byte[] bArr) throws IOException {
        if (bArr.length < 1024) {
            return bArr;
        }
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        GZIPOutputStream gZIPOutputStream = new GZIPOutputStream(byteArrayOutputStream);
        gZIPOutputStream.write(bArr);
        gZIPOutputStream.close();
        return byteArrayOutputStream.toByteArray();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Map<String, Object> prepareHostsData(PushClient pushClient) throws Exception {
        HashMap hashMap = new HashMap(2);
        hashMap.put("type", "dom");
        hashMap.put("data", pushClient.getDataSource().getData(pushClient));
        return hashMap;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Receiver.AckEntry prepareAckEntry(PushClient pushClient, Map<String, Object> map, long j) {
        if (MapUtils.isEmpty(map)) {
            Loggers.PUSH.error("[NACOS-PUSH] pushing empty data for client is not allowed: {}", pushClient);
            return null;
        }
        map.put("lastRefTime", Long.valueOf(j));
        String aCKKey = getACKKey(pushClient.getSocketAddr().getAddress().getHostAddress(), pushClient.getSocketAddr().getPort(), j);
        try {
            byte[] compressIfNecessary = compressIfNecessary(JacksonUtils.toJson(map).getBytes(StandardCharsets.UTF_8));
            Receiver.AckEntry ackEntry = new Receiver.AckEntry(aCKKey, new DatagramPacket(compressIfNecessary, compressIfNecessary.length, pushClient.socketAddr));
            ackEntry.data = map;
            return ackEntry;
        } catch (Exception e) {
            Loggers.PUSH.error("[NACOS-PUSH] failed to prepare data: {} to client: {}, error: {}", new Object[]{map, pushClient.getSocketAddr(), e});
            return null;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Receiver.AckEntry udpPush(Receiver.AckEntry ackEntry) {
        if (ackEntry == null) {
            Loggers.PUSH.error("[NACOS-PUSH] ackEntry is null.");
            return null;
        }
        if (ackEntry.getRetryTimes() > MAX_RETRY_TIMES) {
            Loggers.PUSH.warn("max re-push times reached, retry times {}, key: {}", ackEntry.retryTimes, ackEntry.key);
            ackMap.remove(ackEntry.key);
            udpSendTimeMap.remove(ackEntry.key);
            failedPush += MAX_RETRY_TIMES;
            return ackEntry;
        }
        try {
            if (!ackMap.containsKey(ackEntry.key)) {
                totalPush += MAX_RETRY_TIMES;
            }
            ackMap.put(ackEntry.key, ackEntry);
            udpSendTimeMap.put(ackEntry.key, Long.valueOf(System.currentTimeMillis()));
            Loggers.PUSH.info("send udp packet: " + ackEntry.key);
            udpSocket.send(ackEntry.origin);
            ackEntry.increaseRetryTime();
            executorService.schedule(new Retransmitter(ackEntry), TimeUnit.NANOSECONDS.toMillis(ACK_TIMEOUT_NANOS), TimeUnit.MILLISECONDS);
            return ackEntry;
        } catch (Exception e) {
            Loggers.PUSH.error("[NACOS-PUSH] failed to push data: {} to client: {}, error: {}", new Object[]{ackEntry.data, ackEntry.origin.getAddress().getHostAddress(), e});
            ackMap.remove(ackEntry.key);
            udpSendTimeMap.remove(ackEntry.key);
            failedPush += MAX_RETRY_TIMES;
            return null;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static String getACKKey(String str, int i, long j) {
        return StringUtils.strip(str) + "," + i + "," + j;
    }

    static {
        try {
            udpSocket = new DatagramSocket();
            Thread thread = new Thread(new Receiver());
            thread.setDaemon(true);
            thread.setName("com.alibaba.nacos.naming.push.receiver");
            thread.start();
            executorService.scheduleWithFixedDelay(new Runnable() { // from class: com.alibaba.nacos.naming.push.PushService.3
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        PushService.removeClientIfZombie();
                    } catch (Throwable th) {
                        Loggers.PUSH.warn("[NACOS-PUSH] failed to remove client zombie");
                    }
                }
            }, 0L, 20L, TimeUnit.SECONDS);
        } catch (SocketException e) {
            Loggers.SRV_LOG.error("[NACOS-PUSH] failed to init push service");
        }
    }
}
