package com.jzt.wotu.etl.core.datasource.jdbc;

import com.baomidou.mybatisplus.core.toolkit.StringUtils;
import com.jzt.wotu.YvanUtil;
import com.jzt.wotu.etl.core.DatasourceManager;
import com.jzt.wotu.etl.core.ZookeeperService;
import com.jzt.wotu.etl.core.datasource.cron.CronExpressionUtil;
import com.jzt.wotu.etl.core.datasource.jdbc.BatchDataReaderCallback;
import com.jzt.wotu.etl.core.job.JobContext;
import com.jzt.wotu.etl.core.job.LoadData;
import com.jzt.wotu.etl.core.job.SynchronizeState;
import com.jzt.wotu.etl.core.model.SqlConfig;
import com.jzt.wotu.etl.core.schema.extract.AbstractExtract;
import com.jzt.wotu.etl.core.utils.SqlUtils;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.apache.logging.log4j.util.Strings;

/* loaded from: input_file:com/jzt/wotu/etl/core/datasource/jdbc/JdbcExtract.class */
public class JdbcExtract extends AbstractExtract<JdbcExtractDsl> {
    public final int Per_Extract_Count = 200;
    public static final ScheduledExecutorService Daemon_Executor = Executors.newSingleThreadScheduledExecutor();
    private volatile boolean lock;
    Throwable exception;
    private Long lastRunTime;
    private Long nextRunTime;
    private ScheduledFuture<?> scheduledFuture;
    private final JdbcDataSource jdbcDataSource;

    public JdbcExtract(JdbcExtractDsl jdbcExtractDsl, JobContext<JdbcExtractDsl> jobContext) {
        super(jdbcExtractDsl, jobContext);
        this.Per_Extract_Count = 200;
        this.lock = false;
        this.exception = null;
        this.jdbcDataSource = DatasourceManager.INSTANCE.getDatasource(getExtractConfig().getDataSource());
    }

    @Override // com.jzt.wotu.etl.core.schema.extract.AbstractExtract
    public void extract(BiConsumer<LoadData, String> biConsumer, Consumer<Throwable> consumer) {
        try {
            JdbcExtractDsl extractConfig = getExtractConfig();
            if (extractConfig.getEnableCron() == null || !extractConfig.getEnableCron().booleanValue()) {
                doExtract(biConsumer);
            } else {
                if (!CronExpressionUtil.isValidExpression(extractConfig.getCron())) {
                    throw new RuntimeException("方案： " + extractConfig.getKey() + " 表达式不正确：" + extractConfig.getCron());
                }
                doTimeTask(biConsumer);
            }
            consumer.accept(null);
        } catch (Exception e) {
            consumer.accept(e);
        } catch (Throwable th) {
            consumer.accept(null);
            throw th;
        }
    }

    void doTimeTask(BiConsumer<LoadData, String> biConsumer) {
        this.scheduledFuture = Daemon_Executor.scheduleWithFixedDelay(() -> {
            if (this.lock) {
                return;
            }
            this.lock = true;
            try {
                try {
                    if (getNextRunTime() == null) {
                        Date nextTime = CronExpressionUtil.getNextTime(getExtractConfig().getCron());
                        setNextRunTime(Long.valueOf(nextTime.getTime()));
                        getContext().getJobLogger().info("[CronExtract] 已启动. cron:{}. 执行时间:{}", getExtractConfig().getCron(), YvanUtil.getDateString(nextTime));
                        this.lock = false;
                        return;
                    }
                    long currentTimeMillis = System.currentTimeMillis();
                    if (getNextRunTime() != null) {
                        try {
                            if (currentTimeMillis >= getNextRunTime().longValue()) {
                                try {
                                    getContext().getJobLogger().info("[CronExtract] 开始执行");
                                    if (getExtractConfig().disEnableExtract == null || !getExtractConfig().disEnableExtract.booleanValue()) {
                                        getContext().getJobLogger().info("[CronExtract] 执行定时全量数据抽取的操作");
                                        doExtract(biConsumer);
                                    } else {
                                        getContext().getJobLogger().info("[CronExtract] 执行定时非数据抽取的操作");
                                        biConsumer.accept(new LoadData((List<?>) Arrays.asList("softDelte")), ExtractType.cronDeleteUseSelfDefineSql.name());
                                    }
                                    Date nextTime2 = CronExpressionUtil.getNextTime(getExtractConfig().getCron());
                                    setNextRunTime(Long.valueOf(nextTime2.getTime()));
                                    getContext().getJobLogger().info("[CronExtract] 下次执行:" + YvanUtil.getDateString(nextTime2));
                                } catch (Exception e) {
                                    this.exception = e;
                                    getContext().getJobLogger().error("[CronExtract] 调度时发生异常", (Throwable) e);
                                    Date nextTime3 = CronExpressionUtil.getNextTime(getExtractConfig().getCron());
                                    setNextRunTime(Long.valueOf(nextTime3.getTime()));
                                    getContext().getJobLogger().info("[CronExtract] 下次执行:" + YvanUtil.getDateString(nextTime3));
                                }
                            }
                        } catch (Throwable th) {
                            Date nextTime4 = CronExpressionUtil.getNextTime(getExtractConfig().getCron());
                            setNextRunTime(Long.valueOf(nextTime4.getTime()));
                            getContext().getJobLogger().info("[CronExtract] 下次执行:" + YvanUtil.getDateString(nextTime4));
                            throw th;
                        }
                    }
                    this.lock = false;
                } catch (Throwable th2) {
                    this.lock = false;
                    throw th2;
                }
            } catch (Exception e2) {
                this.exception = e2;
                this.lock = false;
            }
        }, 0L, 1000L, TimeUnit.MILLISECONDS);
    }

    void doExtract(BiConsumer<LoadData, String> biConsumer) {
        JdbcExtractDsl extractConfig = getExtractConfig();
        String sql = getSql();
        if (Strings.isEmpty(sql)) {
            this.jobLogger.info("[JdbcExtract] CursorMode=[{}] -> Sql为空取消提取数据", Boolean.valueOf(extractConfig.isCursorMode()));
        } else if (extractConfig.isCursorMode()) {
            int fetchSize = extractConfig.getFetchSize() <= 0 ? 200 : extractConfig.getFetchSize();
            this.jobLogger.info("[JdbcExtract] CursorMode=[{}] CursorPerCount=[{}] -> 开始使用游标读取数据", Boolean.valueOf(extractConfig.isCursorMode()), Integer.valueOf(fetchSize));
            useCursorExtract(sql, fetchSize, batchData -> {
                this.jobLogger.addBatchNo();
                LoadData loadData = new LoadData(batchData.getRowDataList());
                getContext().getJobStateInfo().addExtractDataCount(loadData.getSize());
                this.jobLogger.info("[JdbcExtract] CursorMode=[{}] CursorPerCount=[{}] -> 当前批次读取数据量: {} | 当前游标位置: {}", Boolean.valueOf(extractConfig.isCursorMode()), Integer.valueOf(fetchSize), Integer.valueOf(loadData.getSize()), Integer.valueOf(batchData.getRowCount()));
                biConsumer.accept(loadData, ExtractType.full.name());
            });
        } else {
            LoadData directExtract = directExtract(sql);
            getContext().getJobStateInfo().addExtractDataCount(directExtract.getSize());
            this.jobLogger.info("[JdbcExtract] CursorMode=[{}] DataSize=[{}]", Boolean.valueOf(extractConfig.isCursorMode()), Integer.valueOf(directExtract.getSize()));
            biConsumer.accept(directExtract, ExtractType.full.name());
        }
        getContext().getJobStateInfo().setSynchronizeState(SynchronizeState.End);
    }

    protected void useCursorExtract(String str, int i, Consumer<BatchDataReaderCallback.BatchData> consumer) {
        this.jdbcDataSource.execute(jdbcTemplate -> {
            jdbcTemplate.setQueryTimeout(0);
            jdbcTemplate.setFetchSize(i);
            BatchDataReaderCallback batchDataReaderCallback = new BatchDataReaderCallback(i, consumer);
            jdbcTemplate.query(str, resultSet -> {
                while (resultSet.next()) {
                    if (isInterrupted()) {
                        this.jobLogger.info("[JdbcExtract] 中断成功!");
                        return null;
                    }
                    batchDataReaderCallback.processRow(resultSet);
                }
                batchDataReaderCallback.processEnd();
                return null;
            });
        });
    }

    protected LoadData directExtract(String str) {
        return new LoadData(this.jdbcDataSource.queryForList(str));
    }

    protected String getSql() {
        JdbcExtractDsl extractConfig = getExtractConfig();
        SqlConfig sqlConfig = new SqlConfig();
        sqlConfig.setSql(fixSql(extractConfig.getQuery()));
        sqlConfig.setQueryParam(extractConfig.getQueryParams());
        sqlConfig.setType(SqlConfig.TYPE_READ);
        sqlConfig.setDialect(this.jdbcDataSource.getDbType().getDb());
        Map<String, Object> queryParams = extractConfig.getQueryParams();
        if (queryParams == null) {
            queryParams = new HashMap();
        }
        return SqlUtils.getSql(sqlConfig, queryParams);
    }

    private String fixSql(String str) {
        String str2 = ZookeeperService.ENV_MAP.get("fixSql");
        if (StringUtils.isBlank(str2)) {
            System.out.println("全量执行sql : " + str);
            return str;
        }
        if (!str2.contains("全量")) {
            System.out.println("全量执行sql : " + str2);
            return str2;
        }
        String replace = str2.replace("全量", "").replace("#Table", str.split("from")[1].split("where")[0].split(" ")[0]);
        System.out.println("全量执行sql : " + replace);
        return replace;
    }

    public static void main(String[] strArr) {
        System.out.println("TB_GOS_FINANCE_PAYACCOUNTSUM a".split(" ")[0]);
        System.out.println("TB_GOS_FINANCE_PAYACCOUNTSUM".split(" ")[0]);
    }

    public Long getLastRunTime() {
        return this.lastRunTime;
    }

    public void setLastRunTime(Long l) {
        this.lastRunTime = l;
    }

    public Long getNextRunTime() {
        return this.nextRunTime;
    }

    public void setNextRunTime(Long l) {
        this.nextRunTime = l;
    }

    public JdbcDataSource getJdbcDataSource() {
        return this.jdbcDataSource;
    }
}
