package com.jzt.wotu.etl.core.datasource.cron;

import com.jzt.wotu.YvanUtil;
import com.jzt.wotu.etl.core.datasource.jdbc.ExtractType;
import com.jzt.wotu.etl.core.job.JobContext;
import com.jzt.wotu.etl.core.job.LoadData;
import com.jzt.wotu.etl.core.schema.extract.AbstractExtract;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;

/* loaded from: input_file:com/jzt/wotu/etl/core/datasource/cron/CronExtract.class */
public class CronExtract extends AbstractExtract<CronExtractDsl> {
    public static final ScheduledExecutorService Daemon_Executor = Executors.newSingleThreadScheduledExecutor();
    private volatile boolean lock;
    Throwable exception;
    private Long lastRunTime;
    private Long nextRunTime;
    private ScheduledFuture<?> scheduledFuture;

    public CronExtract(CronExtractDsl cronExtractDsl, JobContext<CronExtractDsl> jobContext) {
        super(cronExtractDsl, jobContext);
        this.lock = false;
        this.exception = null;
        if (((CronExtractDsl) this.extractConfig).onTriggerFunc == null) {
            throw new RuntimeException("没有为 CronExtract.onTriggerFunc 不能为空");
        }
        if (!CronExpressionUtil.isValidExpression(cronExtractDsl.getCron())) {
            throw new RuntimeException("非法的 cron 表达式:" + cronExtractDsl.getCron());
        }
    }

    public synchronized void stop() {
        if (this.scheduledFuture == null) {
            return;
        }
        this.scheduledFuture.cancel(true);
        this.scheduledFuture = null;
    }

    @Override // com.jzt.wotu.etl.core.schema.extract.AbstractExtract
    public void extract(BiConsumer<LoadData, String> biConsumer, Consumer<Throwable> consumer) {
        try {
            try {
                doExtract(biConsumer);
                consumer.accept(this.exception);
            } catch (Exception e) {
                this.exception = e;
                consumer.accept(this.exception);
            }
        } catch (Throwable th) {
            consumer.accept(this.exception);
            throw th;
        }
    }

    void doExtract(BiConsumer<LoadData, String> biConsumer) {
        this.scheduledFuture = Daemon_Executor.scheduleWithFixedDelay(() -> {
            if (this.lock) {
                return;
            }
            this.lock = true;
            try {
                try {
                    if (getNextRunTime() == null) {
                        Date nextTime = CronExpressionUtil.getNextTime(getExtractConfig().getCron());
                        setNextRunTime(Long.valueOf(nextTime.getTime()));
                        getContext().getJobLogger().info("[CronExtract] 已启动. cron:{}. 执行时间:{}", getExtractConfig().getCron(), YvanUtil.getDateString(nextTime));
                        this.lock = false;
                        return;
                    }
                    long currentTimeMillis = System.currentTimeMillis();
                    if (getNextRunTime() != null && currentTimeMillis >= getNextRunTime().longValue()) {
                        try {
                            try {
                                getContext().getJobLogger().info("[CronExtract] 开始执行");
                                List<Map<String, Object>> apply = ((CronExtractDsl) this.extractConfig).onTriggerFunc.apply(getContext());
                                if (apply != null) {
                                    biConsumer.accept(new LoadData((List<?>) apply.stream().filter(map -> {
                                        return map != null;
                                    }).collect(Collectors.toList())), ExtractType.full.name());
                                }
                                Date nextTime2 = CronExpressionUtil.getNextTime(getExtractConfig().getCron());
                                setNextRunTime(Long.valueOf(nextTime2.getTime()));
                                getContext().getJobLogger().info("[CronExtract] 下次执行:" + YvanUtil.getDateString(nextTime2));
                            } catch (Exception e) {
                                this.exception = e;
                                getContext().getJobLogger().error("[CronExtract] 调度时发生异常", (Throwable) e);
                                Date nextTime3 = CronExpressionUtil.getNextTime(getExtractConfig().getCron());
                                setNextRunTime(Long.valueOf(nextTime3.getTime()));
                                getContext().getJobLogger().info("[CronExtract] 下次执行:" + YvanUtil.getDateString(nextTime3));
                            }
                        } catch (Throwable th) {
                            Date nextTime4 = CronExpressionUtil.getNextTime(getExtractConfig().getCron());
                            setNextRunTime(Long.valueOf(nextTime4.getTime()));
                            getContext().getJobLogger().info("[CronExtract] 下次执行:" + YvanUtil.getDateString(nextTime4));
                            throw th;
                        }
                    }
                    this.lock = false;
                } catch (Exception e2) {
                    this.exception = e2;
                    this.lock = false;
                }
            } catch (Throwable th2) {
                this.lock = false;
                throw th2;
            }
        }, 0L, 1000L, TimeUnit.MILLISECONDS);
    }

    public Long getLastRunTime() {
        return this.lastRunTime;
    }

    public void setLastRunTime(Long l) {
        this.lastRunTime = l;
    }

    public Long getNextRunTime() {
        return this.nextRunTime;
    }

    public void setNextRunTime(Long l) {
        this.nextRunTime = l;
    }
}
