package com.openblocks.domain.query.service;

import com.openblocks.domain.datasource.model.Datasource;
import com.openblocks.domain.datasource.model.DatasourceConnectionHolder;
import com.openblocks.domain.datasource.service.DatasourceConnectionPool;
import com.openblocks.domain.plugin.client.DatasourcePluginClient;
import com.openblocks.domain.plugin.service.DatasourceMetaInfoService;
import com.openblocks.domain.query.util.QueryTimeoutUtils;
import com.openblocks.sdk.exception.BizError;
import com.openblocks.sdk.exception.BizException;
import com.openblocks.sdk.exception.PluginCommonError;
import com.openblocks.sdk.exception.PluginException;
import com.openblocks.sdk.models.DatasourceConnectionConfig;
import com.openblocks.sdk.models.QueryExecutionResult;
import com.openblocks.sdk.plugin.common.QueryExecutor;
import com.openblocks.sdk.query.QueryExecutionContext;
import com.openblocks.sdk.query.QueryVisitorContext;
import com.openblocks.sdk.util.ExceptionUtils;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;

@Service
/* loaded from: input_file:com/openblocks/domain/query/service/QueryExecutionService.class */
public class QueryExecutionService {
    private static final Logger log = LoggerFactory.getLogger(QueryExecutionService.class);

    @Autowired
    private DatasourceConnectionPool datasourceConnectionPool;

    @Autowired
    private DatasourceMetaInfoService datasourceMetaInfoService;

    @Autowired
    private DatasourcePluginClient datasourcePluginClient;

    public Mono<QueryExecutionResult> executeQuery(Datasource datasource, Map<String, Object> map, Map<String, Object> map2, String str, QueryVisitorContext queryVisitorContext) {
        int parseQueryTimeoutMs = QueryTimeoutUtils.parseQueryTimeoutMs(str, map2);
        return Mono.defer(() -> {
            return this.datasourceMetaInfoService.isJsDatasourcePlugin(datasource.getType()) ? executeByNodeJs(datasource, map, map2) : executeLocally(datasource, map, map2, queryVisitorContext);
        }).timeout(Duration.ofMillis(parseQueryTimeoutMs)).onErrorMap(TimeoutException.class, timeoutException -> {
            return new PluginException(PluginCommonError.QUERY_EXECUTION_TIMEOUT, "PLUGIN_EXECUTION_TIMEOUT", new Object[]{Integer.valueOf(parseQueryTimeoutMs)});
        }).onErrorResume(PluginException.class, pluginException -> {
            return Mono.just(QueryExecutionResult.error(pluginException));
        }).onErrorMap(th -> {
            if (th instanceof BizException) {
                return th;
            }
            log.error("query exception", th);
            return ExceptionUtils.ofException(BizError.QUERY_EXECUTION_ERROR, "QUERY_EXECUTION_ERROR", new Object[]{th.getMessage()});
        });
    }

    private Mono<QueryExecutionResult> executeLocally(Datasource datasource, Map<String, Object> map, Map<String, Object> map2, QueryVisitorContext queryVisitorContext) {
        QueryExecutor<? extends DatasourceConnectionConfig, Object, ? extends QueryExecutionContext> queryExecutor = this.datasourceMetaInfoService.getQueryExecutor(datasource.getType());
        return queryExecutor.buildQueryExecutionContextMono(datasource.getDetailConfig(), map, map2, queryVisitorContext).zipWhen(queryExecutionContext -> {
            return this.datasourceConnectionPool.getOrCreateConnection(datasource);
        }).flatMap(tuple2 -> {
            QueryExecutionContext queryExecutionContext2 = (QueryExecutionContext) tuple2.getT1();
            DatasourceConnectionHolder datasourceConnectionHolder = (DatasourceConnectionHolder) tuple2.getT2();
            Mono doExecuteQuery = queryExecutor.doExecuteQuery(datasourceConnectionHolder.connection(), queryExecutionContext2);
            Objects.requireNonNull(datasourceConnectionHolder);
            return doExecuteQuery.doOnError(datasourceConnectionHolder::onQueryError);
        });
    }

    private Mono<QueryExecutionResult> executeByNodeJs(Datasource datasource, Map<String, Object> map, Map<String, Object> map2) {
        return this.datasourcePluginClient.executeQuery(datasource.getType(), map, (List) map2.entrySet().stream().map(entry -> {
            return Map.of("key", entry.getKey(), "value", entry.getValue());
        }).collect(Collectors.toList()), datasource.getDetailConfig());
    }
}
