package org.apache.shenyu.admin.listener.http;

import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.servlet.AsyncContext;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.shenyu.admin.config.properties.HttpSyncProperties;
import org.apache.shenyu.admin.listener.AbstractDataChangedListener;
import org.apache.shenyu.admin.listener.ConfigDataCache;
import org.apache.shenyu.admin.model.result.ShenyuAdminResult;
import org.apache.shenyu.admin.utils.ShenyuResultMessage;
import org.apache.shenyu.common.concurrent.ShenyuThreadFactory;
import org.apache.shenyu.common.constant.HttpConstants;
import org.apache.shenyu.common.dto.AppAuthData;
import org.apache.shenyu.common.dto.DiscoverySyncData;
import org.apache.shenyu.common.dto.MetaData;
import org.apache.shenyu.common.dto.PluginData;
import org.apache.shenyu.common.dto.ProxySelectorData;
import org.apache.shenyu.common.dto.RuleData;
import org.apache.shenyu.common.dto.SelectorData;
import org.apache.shenyu.common.enums.ConfigGroupEnum;
import org.apache.shenyu.common.enums.DataEventTypeEnum;
import org.apache.shenyu.common.exception.ShenyuException;
import org.apache.shenyu.common.utils.GsonUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/shenyu/admin/listener/http/HttpLongPollingDataChangedListener.class */
public class HttpLongPollingDataChangedListener extends AbstractDataChangedListener {
    private static final Logger LOG = LoggerFactory.getLogger(HttpLongPollingDataChangedListener.class);
    private static final String X_REAL_IP = "X-Real-IP";
    private static final String X_FORWARDED_FOR = "X-Forwarded-For";
    private static final String X_FORWARDED_FOR_SPLIT_SYMBOL = ",";
    private final BlockingQueue<LongPollingClient> clients = new ArrayBlockingQueue(1024);
    private final ScheduledExecutorService scheduler = new ScheduledThreadPoolExecutor(1, ShenyuThreadFactory.create("long-polling", true));
    private final HttpSyncProperties httpSyncProperties;

    /* loaded from: input_file:org/apache/shenyu/admin/listener/http/HttpLongPollingDataChangedListener$DataChangeTask.class */
    class DataChangeTask implements Runnable {
        private final ConfigGroupEnum groupKey;
        private final long changeTime = System.currentTimeMillis();

        DataChangeTask(ConfigGroupEnum configGroupEnum) {
            this.groupKey = configGroupEnum;
        }

        @Override // java.lang.Runnable
        public void run() {
            if (HttpLongPollingDataChangedListener.this.clients.size() <= HttpLongPollingDataChangedListener.this.httpSyncProperties.getNotifyBatchSize()) {
                doRun(HttpLongPollingDataChangedListener.this.clients);
                return;
            }
            ArrayList arrayList = new ArrayList(HttpLongPollingDataChangedListener.this.clients.size());
            HttpLongPollingDataChangedListener.this.clients.drainTo(arrayList);
            Lists.partition(arrayList, HttpLongPollingDataChangedListener.this.httpSyncProperties.getNotifyBatchSize()).forEach(list -> {
                HttpLongPollingDataChangedListener.this.scheduler.execute(() -> {
                    doRun(list);
                });
            });
        }

        private void doRun(Collection<LongPollingClient> collection) {
            Iterator<LongPollingClient> it = collection.iterator();
            while (it.hasNext()) {
                LongPollingClient next = it.next();
                it.remove();
                next.sendResponse(Collections.singletonList(this.groupKey));
                HttpLongPollingDataChangedListener.LOG.info("send response with the changed group,ip={}, group={}, changeTime={}", new Object[]{next.ip, this.groupKey, Long.valueOf(this.changeTime)});
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/shenyu/admin/listener/http/HttpLongPollingDataChangedListener$LongPollingClient.class */
    public class LongPollingClient implements Runnable {
        private final Logger log = LoggerFactory.getLogger(LongPollingClient.class);
        private final AsyncContext asyncContext;
        private final String ip;
        private final long timeoutTime;
        private Future<?> asyncTimeoutFuture;

        LongPollingClient(AsyncContext asyncContext, String str, long j) {
            this.asyncContext = asyncContext;
            this.ip = str;
            this.timeoutTime = j;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                this.asyncTimeoutFuture = HttpLongPollingDataChangedListener.this.scheduler.schedule(() -> {
                    HttpLongPollingDataChangedListener.this.clients.remove(this);
                    List<ConfigGroupEnum> compareChangedGroup = HttpLongPollingDataChangedListener.this.compareChangedGroup(this.asyncContext.getRequest());
                    sendResponse(compareChangedGroup);
                    this.log.debug("LongPollingClient {} ", GsonUtils.getInstance().toJson(compareChangedGroup));
                }, this.timeoutTime, TimeUnit.MILLISECONDS);
                HttpLongPollingDataChangedListener.this.clients.add(this);
            } catch (Exception e) {
                this.log.error("add long polling client error", e);
            }
        }

        void sendResponse(List<ConfigGroupEnum> list) {
            if (Objects.nonNull(this.asyncTimeoutFuture)) {
                this.asyncTimeoutFuture.cancel(false);
            }
            HttpLongPollingDataChangedListener.this.generateResponse(this.asyncContext.getResponse(), list);
            this.asyncContext.complete();
        }
    }

    public HttpLongPollingDataChangedListener(HttpSyncProperties httpSyncProperties) {
        this.httpSyncProperties = httpSyncProperties;
    }

    @Override // org.apache.shenyu.admin.listener.AbstractDataChangedListener
    protected void afterInitialize() {
        long millis = this.httpSyncProperties.getRefreshInterval().toMillis();
        this.scheduler.scheduleWithFixedDelay(() -> {
            LOG.info("http sync strategy refresh config start.");
            try {
                super.refreshLocalCache();
                LOG.info("http sync strategy refresh config success.");
            } catch (Exception e) {
                LOG.error("http sync strategy refresh config error!", e);
            }
        }, millis, millis, TimeUnit.MILLISECONDS);
        LOG.info("http sync strategy refresh interval: {}ms", Long.valueOf(millis));
    }

    public void doLongPolling(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse) {
        List<ConfigGroupEnum> compareChangedGroup = compareChangedGroup(httpServletRequest);
        String remoteIp = getRemoteIp(httpServletRequest);
        if (CollectionUtils.isNotEmpty(compareChangedGroup)) {
            generateResponse(httpServletResponse, compareChangedGroup);
            LOG.info("send response with the changed group, ip={}, group={}", remoteIp, compareChangedGroup);
        } else {
            LOG.debug("no changed group, ip={}, waiting for compare cache changed", remoteIp);
            AsyncContext startAsync = httpServletRequest.startAsync();
            startAsync.setTimeout(0L);
            this.scheduler.execute(new LongPollingClient(startAsync, remoteIp, HttpConstants.SERVER_MAX_HOLD_TIMEOUT));
        }
    }

    @Override // org.apache.shenyu.admin.listener.AbstractDataChangedListener
    protected void afterAppAuthChanged(List<AppAuthData> list, DataEventTypeEnum dataEventTypeEnum) {
        this.scheduler.execute(new DataChangeTask(ConfigGroupEnum.APP_AUTH));
    }

    @Override // org.apache.shenyu.admin.listener.AbstractDataChangedListener
    protected void afterMetaDataChanged(List<MetaData> list, DataEventTypeEnum dataEventTypeEnum) {
        this.scheduler.execute(new DataChangeTask(ConfigGroupEnum.META_DATA));
    }

    @Override // org.apache.shenyu.admin.listener.AbstractDataChangedListener
    protected void afterPluginChanged(List<PluginData> list, DataEventTypeEnum dataEventTypeEnum) {
        this.scheduler.execute(new DataChangeTask(ConfigGroupEnum.PLUGIN));
    }

    @Override // org.apache.shenyu.admin.listener.AbstractDataChangedListener
    protected void afterRuleChanged(List<RuleData> list, DataEventTypeEnum dataEventTypeEnum) {
        this.scheduler.execute(new DataChangeTask(ConfigGroupEnum.RULE));
    }

    @Override // org.apache.shenyu.admin.listener.AbstractDataChangedListener
    protected void afterSelectorChanged(List<SelectorData> list, DataEventTypeEnum dataEventTypeEnum) {
        this.scheduler.execute(new DataChangeTask(ConfigGroupEnum.SELECTOR));
    }

    @Override // org.apache.shenyu.admin.listener.AbstractDataChangedListener
    protected void afterProxySelectorChanged(List<ProxySelectorData> list, DataEventTypeEnum dataEventTypeEnum) {
        this.scheduler.execute(new DataChangeTask(ConfigGroupEnum.PROXY_SELECTOR));
    }

    @Override // org.apache.shenyu.admin.listener.AbstractDataChangedListener
    protected void afterDiscoveryUpstreamDataChanged(List<DiscoverySyncData> list, DataEventTypeEnum dataEventTypeEnum) {
        this.scheduler.execute(new DataChangeTask(ConfigGroupEnum.DISCOVER_UPSTREAM));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public List<ConfigGroupEnum> compareChangedGroup(HttpServletRequest httpServletRequest) {
        ArrayList arrayList = new ArrayList(ConfigGroupEnum.values().length);
        for (ConfigGroupEnum configGroupEnum : ConfigGroupEnum.values()) {
            String[] split = StringUtils.split(httpServletRequest.getParameter(configGroupEnum.name()), ',');
            if (Objects.isNull(split) || split.length != 2) {
                throw new ShenyuException("group param invalid:" + httpServletRequest.getParameter(configGroupEnum.name()));
            }
            if (checkCacheDelayAndUpdate(CACHE.get(configGroupEnum.name()), split[0], NumberUtils.toLong(split[1]))) {
                arrayList.add(configGroupEnum);
            }
        }
        return arrayList;
    }

    private boolean checkCacheDelayAndUpdate(ConfigDataCache configDataCache, String str, long j) {
        if (StringUtils.equals(str, configDataCache.getMd5())) {
            return false;
        }
        if (configDataCache.getLastModifyTime() >= j) {
            return true;
        }
        ConfigDataCache configDataCache2 = CACHE.get(configDataCache.getGroup());
        if (configDataCache2 != configDataCache) {
            return !StringUtils.equals(str, configDataCache2.getMd5());
        }
        synchronized (this) {
            ConfigDataCache configDataCache3 = CACHE.get(configDataCache.getGroup());
            if (configDataCache3 != configDataCache) {
                return !StringUtils.equals(str, configDataCache3.getMd5());
            }
            super.refreshLocalCache();
            return !StringUtils.equals(str, CACHE.get(configDataCache.getGroup()).getMd5());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void generateResponse(HttpServletResponse httpServletResponse, List<ConfigGroupEnum> list) {
        try {
            httpServletResponse.setHeader("Pragma", "no-cache");
            httpServletResponse.setDateHeader("Expires", 0L);
            httpServletResponse.setHeader("Cache-Control", "no-cache,no-store");
            httpServletResponse.setContentType("application/json");
            httpServletResponse.setStatus(200);
            httpServletResponse.getWriter().println(GsonUtils.getInstance().toJson(ShenyuAdminResult.success(ShenyuResultMessage.SUCCESS, list)));
        } catch (IOException e) {
            LOG.error("Sending response failed.", e);
        }
    }

    private static String getRemoteIp(HttpServletRequest httpServletRequest) {
        String header = httpServletRequest.getHeader(X_FORWARDED_FOR);
        if (!StringUtils.isBlank(header)) {
            return header.split(X_FORWARDED_FOR_SPLIT_SYMBOL)[0].trim();
        }
        String header2 = httpServletRequest.getHeader(X_REAL_IP);
        return StringUtils.isBlank(header2) ? httpServletRequest.getRemoteAddr() : header2;
    }
}
