package com.jzt.wotu.etl.core.datasource.jdbc;

import com.jzt.wotu.etl.core.DatasourceManager;
import com.jzt.wotu.etl.core.datasource.jdbc.BatchDataReaderCallback;
import com.jzt.wotu.etl.core.job.JobContext;
import com.jzt.wotu.etl.core.job.LoadData;
import com.jzt.wotu.etl.core.job.SynchronizeState;
import com.jzt.wotu.etl.core.model.SqlConfig;
import com.jzt.wotu.etl.core.schema.extract.AbstractExtract;
import com.jzt.wotu.etl.core.utils.SqlUtils;
import java.util.HashMap;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.apache.logging.log4j.util.Strings;

/* loaded from: input_file:com/jzt/wotu/etl/core/datasource/jdbc/JdbcExtract.class */
public class JdbcExtract extends AbstractExtract<JdbcExtractDsl> {
    public final int Per_Extract_Count = 200;
    private final JdbcDataSource jdbcDataSource;

    public JdbcExtract(JdbcExtractDsl jdbcExtractDsl, JobContext<JdbcExtractDsl> jobContext) {
        super(jdbcExtractDsl, jobContext);
        this.Per_Extract_Count = 200;
        this.jdbcDataSource = DatasourceManager.INSTANCE.getDatasource(getExtractConfig().getDataSource());
    }

    @Override // com.jzt.wotu.etl.core.schema.extract.AbstractExtract
    public void extract(BiConsumer<LoadData, Boolean> biConsumer, Consumer<Throwable> consumer) {
        try {
            doExtract(biConsumer);
            consumer.accept(null);
        } catch (Exception e) {
            consumer.accept(e);
        } catch (Throwable th) {
            consumer.accept(null);
            throw th;
        }
    }

    void doExtract(BiConsumer<LoadData, Boolean> biConsumer) {
        JdbcExtractDsl extractConfig = getExtractConfig();
        this.jobLogger.info("[JdbcExtract] CursorMode=[{}] -> 开始提取数据", Boolean.valueOf(extractConfig.isCursorMode()));
        String sql = getSql();
        this.jobLogger.info("[JdbcExtract] CursorMode=[{}] Sql=[{}]", Boolean.valueOf(extractConfig.isCursorMode()), sql);
        if (Strings.isEmpty(sql)) {
            this.jobLogger.info("[JdbcExtract] CursorMode=[{}] -> Sql为空取消提取数据", Boolean.valueOf(extractConfig.isCursorMode()));
        } else if (extractConfig.isCursorMode()) {
            int fetchSize = extractConfig.getFetchSize() <= 0 ? 200 : extractConfig.getFetchSize();
            this.jobLogger.info("[JdbcExtract] CursorMode=[{}] CursorPerCount=[{}] -> 开始使用游标读取数据", Boolean.valueOf(extractConfig.isCursorMode()), Integer.valueOf(fetchSize));
            useCursorExtract(sql, fetchSize, batchData -> {
                this.jobLogger.addBatchNo();
                LoadData loadData = new LoadData(batchData.getRowDataList());
                getContext().getJobStateInfo().addExtractDataCount(loadData.getSize());
                this.jobLogger.info("[JdbcExtract] CursorMode=[{}] CursorPerCount=[{}] -> 当前批次读取数据量: {} | 当前游标位置: {}", Boolean.valueOf(extractConfig.isCursorMode()), Integer.valueOf(fetchSize), Integer.valueOf(loadData.getSize()), Integer.valueOf(batchData.getRowCount()));
                biConsumer.accept(loadData, false);
            });
        } else {
            LoadData directExtract = directExtract(sql);
            getContext().getJobStateInfo().addExtractDataCount(directExtract.getSize());
            this.jobLogger.info("[JdbcExtract] CursorMode=[{}] DataSize=[{}]", Boolean.valueOf(extractConfig.isCursorMode()), Integer.valueOf(directExtract.getSize()));
            biConsumer.accept(directExtract, false);
        }
        getContext().getJobStateInfo().setSynchronizeState(SynchronizeState.End);
    }

    protected void useCursorExtract(String str, int i, Consumer<BatchDataReaderCallback.BatchData> consumer) {
        this.jdbcDataSource.execute(jdbcTemplate -> {
            jdbcTemplate.setQueryTimeout(0);
            jdbcTemplate.setFetchSize(i);
            BatchDataReaderCallback batchDataReaderCallback = new BatchDataReaderCallback(i, consumer);
            jdbcTemplate.query(str, resultSet -> {
                while (resultSet.next()) {
                    if (isInterrupted()) {
                        this.jobLogger.info("[JdbcExtract] 中断成功!");
                        return null;
                    }
                    batchDataReaderCallback.processRow(resultSet);
                }
                batchDataReaderCallback.processEnd();
                return null;
            });
        });
    }

    protected LoadData directExtract(String str) {
        return new LoadData(this.jdbcDataSource.queryForList(str));
    }

    protected String getSql() {
        JdbcExtractDsl extractConfig = getExtractConfig();
        SqlConfig sqlConfig = new SqlConfig();
        sqlConfig.setSql(extractConfig.getQuery());
        sqlConfig.setQueryParam(extractConfig.getQueryParams());
        sqlConfig.setType(SqlConfig.TYPE_READ);
        sqlConfig.setDialect(this.jdbcDataSource.getDbType().getDb());
        Map<String, Object> queryParams = extractConfig.getQueryParams();
        if (queryParams == null) {
            queryParams = new HashMap();
        }
        return SqlUtils.getSql(sqlConfig, queryParams);
    }

    public JdbcDataSource getJdbcDataSource() {
        return this.jdbcDataSource;
    }
}
