package org.apache.shenyu.sync.data.http;

import com.google.common.base.Splitter;
import com.google.common.collect.Lists;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import okhttp3.Headers;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;
import okhttp3.ResponseBody;
import org.apache.commons.lang3.StringUtils;
import org.apache.shenyu.common.concurrent.ShenyuThreadFactory;
import org.apache.shenyu.common.dto.ConfigData;
import org.apache.shenyu.common.enums.ConfigGroupEnum;
import org.apache.shenyu.common.exception.ShenyuException;
import org.apache.shenyu.common.utils.GsonUtils;
import org.apache.shenyu.common.utils.ThreadUtils;
import org.apache.shenyu.sync.data.api.AuthDataSubscriber;
import org.apache.shenyu.sync.data.api.DiscoveryUpstreamDataSubscriber;
import org.apache.shenyu.sync.data.api.MetaDataSubscriber;
import org.apache.shenyu.sync.data.api.PluginDataSubscriber;
import org.apache.shenyu.sync.data.api.ProxySelectorDataSubscriber;
import org.apache.shenyu.sync.data.api.SyncDataService;
import org.apache.shenyu.sync.data.http.config.HttpConfig;
import org.apache.shenyu.sync.data.http.refresh.DataRefreshFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.Assert;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.web.util.UriComponentsBuilder;

/* loaded from: input_file:org/apache/shenyu/sync/data/http/HttpSyncDataService.class */
public class HttpSyncDataService implements SyncDataService {
    private static final Logger LOG = LoggerFactory.getLogger(HttpSyncDataService.class);
    private static final AtomicBoolean RUNNING = new AtomicBoolean(false);
    private ExecutorService executor;
    private final List<String> serverList;
    private final DataRefreshFactory factory;
    private final AccessTokenManager accessTokenManager;
    private final OkHttpClient okHttpClient;

    /* loaded from: input_file:org/apache/shenyu/sync/data/http/HttpSyncDataService$HttpLongPollingTask.class */
    class HttpLongPollingTask implements Runnable {
        private final String server;

        HttpLongPollingTask(String str) {
            this.server = str;
        }

        @Override // java.lang.Runnable
        public void run() {
            while (HttpSyncDataService.RUNNING.get()) {
                for (int i = 1; i <= 10; i++) {
                    try {
                        HttpSyncDataService.this.doLongPolling(this.server);
                    } catch (Exception e) {
                        if (i < 10) {
                            HttpSyncDataService.LOG.warn("Long polling failed, tried {} times, {} times left, will be suspended for a while! {}", new Object[]{Integer.valueOf(i), Integer.valueOf(10 - i), e.getMessage()});
                            ThreadUtils.sleep(TimeUnit.SECONDS, 5);
                        } else {
                            HttpSyncDataService.LOG.error("Long polling failed, try again after 5 minutes!", e);
                            ThreadUtils.sleep(TimeUnit.MINUTES, 5);
                        }
                    }
                }
            }
            HttpSyncDataService.LOG.warn("Stop http long polling.");
        }
    }

    public HttpSyncDataService(HttpConfig httpConfig, PluginDataSubscriber pluginDataSubscriber, OkHttpClient okHttpClient, List<MetaDataSubscriber> list, List<AuthDataSubscriber> list2, List<ProxySelectorDataSubscriber> list3, List<DiscoveryUpstreamDataSubscriber> list4, AccessTokenManager accessTokenManager) {
        this.accessTokenManager = accessTokenManager;
        this.factory = new DataRefreshFactory(pluginDataSubscriber, list, list2, list3, list4);
        this.serverList = Lists.newArrayList(Splitter.on(",").split(httpConfig.getUrl()));
        this.okHttpClient = okHttpClient;
        start();
    }

    private void start() {
        if (!RUNNING.compareAndSet(false, true)) {
            LOG.info("shenyu http long polling was started, executor=[{}]", this.executor);
            return;
        }
        fetchGroupConfig(ConfigGroupEnum.values());
        int size = this.serverList.size();
        this.executor = new ThreadPoolExecutor(size, size, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue(), ShenyuThreadFactory.create("http-long-polling", true));
        this.serverList.forEach(str -> {
            this.executor.execute(new HttpLongPollingTask(str));
        });
    }

    private void fetchGroupConfig(ConfigGroupEnum... configGroupEnumArr) throws ShenyuException {
        for (int i = 0; i < this.serverList.size(); i++) {
            try {
                doFetchGroupConfig(this.serverList.get(i), configGroupEnumArr);
                return;
            } catch (ShenyuException e) {
                if (i >= this.serverList.size() - 1) {
                    throw e;
                }
                LOG.warn("fetch config fail, try another one: {}", this.serverList.get(i + 1));
            }
        }
    }

    private void doFetchGroupConfig(String str, ConfigGroupEnum... configGroupEnumArr) {
        StringBuilder sb = new StringBuilder();
        for (ConfigGroupEnum configGroupEnum : configGroupEnumArr) {
            sb.append("groupKeys").append("=").append(configGroupEnum.name()).append("&");
        }
        String str2 = str + "/configs/fetch?" + StringUtils.removeEnd(sb.toString(), "&");
        LOG.info("request configs: [{}]", str2);
        try {
            Response execute = this.okHttpClient.newCall(new Request.Builder().url(str2).addHeader("X-Access-Token", this.accessTokenManager.getAccessToken()).get().build()).execute();
            try {
                if (!execute.isSuccessful()) {
                    String format = String.format("fetch config fail from server[%s], http status code[%s]", str2, Integer.valueOf(execute.code()));
                    LOG.warn(format);
                    throw new ShenyuException(format);
                }
                ResponseBody body = execute.body();
                Assert.notNull(body, "Resolve response responseBody failed.");
                String string = body.string();
                if (execute != null) {
                    execute.close();
                }
                if (updateCacheWithJson(string)) {
                    LOG.debug("get latest configs: [{}]", string);
                } else {
                    LOG.info("The config of the server[{}] has not been updated or is out of date. Wait for listening for changes again.", str);
                    ThreadUtils.sleep(TimeUnit.SECONDS, 5);
                }
            } finally {
            }
        } catch (IOException e) {
            String format2 = String.format("fetch config fail from server[%s], %s", str2, e.getMessage());
            LOG.warn(format2);
            throw new ShenyuException(format2, e);
        }
    }

    private boolean updateCacheWithJson(String str) {
        return this.factory.executor(((JsonObject) GsonUtils.getGson().fromJson(str, JsonObject.class)).getAsJsonObject("data"));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void doLongPolling(String str) {
        LinkedMultiValueMap linkedMultiValueMap = new LinkedMultiValueMap(8);
        for (ConfigGroupEnum configGroupEnum : ConfigGroupEnum.values()) {
            ConfigData<?> cacheConfigData = this.factory.cacheConfigData(configGroupEnum);
            if (cacheConfigData != null) {
                linkedMultiValueMap.put(configGroupEnum.name(), Lists.newArrayList(new String[]{String.join(",", cacheConfigData.getMd5(), String.valueOf(cacheConfigData.getLastModifyTime()))}));
            }
        }
        LOG.debug("listener params: [{}]", linkedMultiValueMap);
        try {
            Response execute = this.okHttpClient.newCall(new Request.Builder().url(UriComponentsBuilder.fromHttpUrl(str + "/configs/listener").queryParams(linkedMultiValueMap).build(true).toUriString()).headers(new Headers.Builder().add("X-Access-Token", this.accessTokenManager.getAccessToken()).add("Content-Type", "application/x-www-form-urlencoded").build()).post(RequestBody.create("", (MediaType) null)).build()).execute();
            try {
                if (!execute.isSuccessful()) {
                    throw new ShenyuException(String.format("listener configs fail, server:[%s], http status code[%s]", str, Integer.valueOf(execute.code())));
                }
                ResponseBody body = execute.body();
                Assert.notNull(body, "Resolve response body failed.");
                String string = body.string();
                LOG.info("listener result: [{}]", string);
                JsonArray asJsonArray = ((JsonObject) GsonUtils.getGson().fromJson(string, JsonObject.class)).getAsJsonArray("data");
                if (execute != null) {
                    execute.close();
                }
                if (!Objects.nonNull(asJsonArray) || asJsonArray.isEmpty()) {
                    return;
                }
                ConfigGroupEnum[] configGroupEnumArr = (ConfigGroupEnum[]) GsonUtils.getGson().fromJson(asJsonArray, ConfigGroupEnum[].class);
                LOG.info("Group config changed: {}", Arrays.toString(configGroupEnumArr));
                doFetchGroupConfig(str, configGroupEnumArr);
            } finally {
            }
        } catch (IOException e) {
            throw new ShenyuException(String.format("listener configs fail, server:[%s], %s", str, e.getMessage()), e);
        }
    }

    public void close() {
        RUNNING.set(false);
        if (Objects.nonNull(this.executor)) {
            this.executor.shutdownNow();
            this.executor = null;
        }
    }
}
