package com.openblocks.plugin.es;

import com.google.common.base.Joiner;
import com.openblocks.plugin.es.model.EsConnection;
import com.openblocks.plugin.es.model.EsDatasourceConfig;
import com.openblocks.sdk.config.CommonConfig;
import com.openblocks.sdk.config.dynamic.Conf;
import com.openblocks.sdk.config.dynamic.ConfigCenter;
import com.openblocks.sdk.exception.BizError;
import com.openblocks.sdk.exception.BizException;
import com.openblocks.sdk.exception.PluginCommonError;
import com.openblocks.sdk.models.DatasourceConnectionConfig;
import com.openblocks.sdk.models.DatasourceTestResult;
import com.openblocks.sdk.plugin.common.DatasourceConnector;
import com.openblocks.sdk.plugin.common.QueryExecutionUtils;
import com.openblocks.sdk.util.ExceptionUtils;
import com.openblocks.sdk.util.JsonUtils;
import com.openblocks.sdk.util.Preconditions;
import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import javax.annotation.Nonnull;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.Header;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.message.BasicHeader;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.pf4j.Extension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpMethod;
import reactor.core.publisher.Mono;

@Extension
/* loaded from: input_file:com/openblocks/plugin/es/EsConnector.class */
public class EsConnector implements DatasourceConnector<EsConnection, EsDatasourceConfig> {
    private static final Logger log = LoggerFactory.getLogger(EsConnector.class);
    private static final Joiner JOINER = Joiner.on("/");
    private final Conf<Duration> datasourceValidateTimeout;
    private final CommonConfig commonConfig;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/openblocks/plugin/es/EsConnector$ConnectionStringParseResult.class */
    public static class ConnectionStringParseResult {
        private String host;
        private String schema = HttpHost.DEFAULT_SCHEME_NAME;
        private int port = -1;
        private String prefix = "";

        private ConnectionStringParseResult() {
        }

        public String getSchema() {
            return this.schema;
        }

        public void setSchema(String str) {
            this.schema = str;
        }

        public String getHost() {
            return this.host;
        }

        public void setHost(String str) {
            this.host = str;
        }

        public int getPort() {
            return this.port;
        }

        public void setPort(int i) {
            this.port = i;
        }

        public String getPrefix() {
            return this.prefix;
        }

        public void setPrefix(String str) {
            this.prefix = str;
        }
    }

    public EsConnector(ConfigCenter configCenter, CommonConfig commonConfig) {
        this.datasourceValidateTimeout = configCenter.mongoPlugin().ofInteger("datasourceValidateTimeoutMillis", 6000).then((v0) -> {
            return Duration.ofMillis(v0);
        });
        this.commonConfig = commonConfig;
    }

    @Nonnull
    public EsDatasourceConfig resolveConfig(Map<String, Object> map) {
        EsDatasourceConfig esDatasourceConfig = (EsDatasourceConfig) JsonUtils.fromJson(JsonUtils.toJson(map), EsDatasourceConfig.class);
        if (esDatasourceConfig == null) {
            throw ExceptionUtils.ofPluginException(PluginCommonError.DATASOURCE_ARGUMENT_ERROR, "INVALID_ES_CONFIG", new Object[0]);
        }
        return esDatasourceConfig;
    }

    public Set<String> validateConfig(EsDatasourceConfig esDatasourceConfig) {
        HashSet hashSet = new HashSet();
        if (StringUtils.isBlank(esDatasourceConfig.getConnectionString())) {
            hashSet.add("CONNECTION_STRING_EMPTY");
        }
        return hashSet;
    }

    public Mono<EsConnection> createConnection(EsDatasourceConfig esDatasourceConfig) {
        return Mono.fromSupplier(() -> {
            return buildRestClient(esDatasourceConfig);
        }).map(ReactorRestClientAdaptor::new).map(EsConnection::new).subscribeOn(QueryExecutionUtils.querySharedScheduler());
    }

    private RestClient buildRestClient(EsDatasourceConfig esDatasourceConfig) {
        ConnectionStringParseResult parseConnectionString = parseConnectionString(esDatasourceConfig.getConnectionString());
        if (this.commonConfig.getDisallowedHosts().contains(parseConnectionString.getHost())) {
            throw new BizException(BizError.INVALID_DATASOURCE_CONFIG_TYPE, "INVALID_CONNECTION_STRING", new Object[0]);
        }
        RestClientBuilder builder = RestClient.builder(new HttpHost(parseConnectionString.getHost(), parseConnectionString.getPort(), parseConnectionString.getSchema()));
        if (StringUtils.isNotBlank(esDatasourceConfig.getUsername()) && StringUtils.isNotBlank(esDatasourceConfig.getPassword())) {
            UsernamePasswordCredentials usernamePasswordCredentials = new UsernamePasswordCredentials(esDatasourceConfig.getUsername(), esDatasourceConfig.getPassword());
            BasicCredentialsProvider basicCredentialsProvider = new BasicCredentialsProvider();
            basicCredentialsProvider.setCredentials(AuthScope.ANY, usernamePasswordCredentials);
            builder.setHttpClientConfigCallback(httpAsyncClientBuilder -> {
                return httpAsyncClientBuilder.setDefaultCredentialsProvider(basicCredentialsProvider);
            });
        }
        if (parseConnectionString.getPort() == -1) {
            builder.setDefaultHeaders(new Header[]{new BasicHeader("Host", parseConnectionString.getHost())});
        }
        if (StringUtils.isNotBlank(parseConnectionString.getPrefix())) {
            builder.setPathPrefix("/" + parseConnectionString.getPrefix());
        }
        return builder.build();
    }

    public Mono<Void> destroyConnection(EsConnection esConnection) {
        return Mono.fromRunnable(() -> {
            Optional.ofNullable(esConnection).ifPresent(esConnection2 -> {
                try {
                    esConnection2.close();
                } catch (IOException e) {
                    throw ExceptionUtils.ofException(BizError.DATASOURCE_CLOSE_FAILED, "DATASOURCE_CLOSE_FAILED", new Object[]{e.getMessage()});
                }
            });
        }).subscribeOn(QueryExecutionUtils.querySharedScheduler());
    }

    public Mono<DatasourceTestResult> testConnection(EsDatasourceConfig esDatasourceConfig) {
        return Mono.from(doCreateConnection(esDatasourceConfig)).zipWhen(esConnection -> {
            return esConnection.reactorRestClientAdaptor().request(new Request(HttpMethod.HEAD.name(), ""));
        }).timeout((Duration) this.datasourceValidateTimeout.get()).map(tuple2 -> {
            try {
                ((EsConnection) tuple2.getT1()).close();
            } catch (IOException e) {
                log.error("close rest client error.", e);
            }
            Response response = (Response) tuple2.getT2();
            if (response.getStatusLine().getStatusCode() == 200) {
                return DatasourceTestResult.testSuccess();
            }
            log.error("test es fail.{},{}", JsonUtils.toJson(esDatasourceConfig), response);
            return DatasourceTestResult.testFail(response.getStatusLine().getReasonPhrase());
        }).onErrorResume(th -> {
            log.error("test es error.{}", JsonUtils.toJson(esDatasourceConfig), th);
            return Mono.just(DatasourceTestResult.testFail(th));
        }).subscribeOn(QueryExecutionUtils.querySharedScheduler());
    }

    private ConnectionStringParseResult parseConnectionString(String str) {
        ConnectionStringParseResult connectionStringParseResult = new ConnectionStringParseResult();
        String[] split = str.split("://");
        Preconditions.check(split.length == 1 || split.length == 2, BizError.INVALID_DATASOURCE_CONFIG_TYPE, "INVALID_CONNECTION_STRING", new Object[0]);
        if (split.length == 2) {
            connectionStringParseResult.setSchema(split[0].trim());
        }
        String[] split2 = split[split.length - 1].split("/");
        String[] split3 = split2[0].split(":");
        Preconditions.check(split3.length == 1 || split3.length == 2, BizError.INVALID_DATASOURCE_CONFIG_TYPE, "INVALID_CONNECTION_STRING", new Object[0]);
        connectionStringParseResult.setHost(split3[0].trim());
        if (split3.length == 2) {
            try {
                connectionStringParseResult.setPort(Integer.parseInt(split3[1].trim()));
            } catch (NumberFormatException e) {
                log.error("port parse error.{}", str, e);
                throw ExceptionUtils.ofException(BizError.INVALID_DATASOURCE_CONFIG_TYPE, "INVALID_CONNECTION_STRING", new Object[0]);
            }
        }
        if (split2.length > 1) {
            connectionStringParseResult.setPrefix(JOINER.join(Arrays.stream(split2).map((v0) -> {
                return v0.trim();
            }).toList().subList(1, split2.length)));
        }
        return connectionStringParseResult;
    }

    @Nonnull
    /* renamed from: resolveConfig, reason: collision with other method in class */
    public /* bridge */ /* synthetic */ DatasourceConnectionConfig m1resolveConfig(Map map) {
        return resolveConfig((Map<String, Object>) map);
    }
}
