package com.jzt.wotu.etl.core.job;

import com.alibaba.fastjson.JSON;
import com.google.common.collect.Lists;
import com.jzt.wotu.SnowFlake;
import com.jzt.wotu.YvanUtil;
import com.jzt.wotu.etl.core.EtlInstance;
import com.jzt.wotu.etl.core.ZookeeperService;
import com.jzt.wotu.etl.core.datasource.ExtractFactory;
import com.jzt.wotu.etl.core.datasource.LoadFactory;
import com.jzt.wotu.etl.core.datasource.TransformFactory;
import com.jzt.wotu.etl.core.datasource.jdbc.ExtractType;
import com.jzt.wotu.etl.core.datasource.jdbc.SaveType;
import com.jzt.wotu.etl.core.datasource.jdbc.TransformType;
import com.jzt.wotu.etl.core.log.IsEnableLogs;
import com.jzt.wotu.etl.core.log.JobLogger;
import com.jzt.wotu.etl.core.model.Extract;
import com.jzt.wotu.etl.core.model.Load;
import com.jzt.wotu.etl.core.model.SchemaDsl;
import com.jzt.wotu.etl.core.model.Transform;
import com.jzt.wotu.etl.core.schema.extract.AbstractExtract;
import com.jzt.wotu.etl.core.schema.load.AbstractLoad;
import com.jzt.wotu.etl.core.schema.transform.AbstractTransform;
import com.jzt.wotu.etl.core.utils.ExtractException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;

/* loaded from: input_file:com/jzt/wotu/etl/core/job/AbstractJob.class */
public class AbstractJob<EC extends Extract> {
    private final SchemaDsl schema;
    private final String extractKey;
    public final ZookeeperService zookeeperService;
    public String[] schemaExtractKey;
    private volatile boolean lock = false;
    protected final IsEnableLogs isEnableLogs = () -> {
        return false;
    };
    protected final IsEnableLogs isPersistence = () -> {
        return true;
    };
    private AbstractExtract<EC> extract;
    protected final JobContext<EC> context;
    private List<AbstractTransform<?>> transformJobList;
    private List<AbstractLoad<?>> loadJobList;

    protected AbstractExtract<EC> getExtractJob() {
        return this.extract;
    }

    protected List<AbstractTransform<?>> getTransformJob() {
        return this.transformJobList;
    }

    protected void init() {
    }

    protected void onCompleted() {
    }

    protected void onSuccess() {
    }

    protected void onFail(Throwable th) {
    }

    protected void onInterrupted() {
    }

    public AbstractJob(Class<ZookeeperService> cls) throws Exception {
        try {
            this.zookeeperService = ZookeeperService.getInstance("com.jzt.wotu.etl.worker.ZkService");
            String[] mountWorker = this.zookeeperService.mountWorker();
            String str = mountWorker[4];
            System.out.println("ZK返回shcema信息=============" + JSON.toJSONString(mountWorker));
            if (mountWorker == null || mountWorker.length == 0) {
                this.zookeeperService.unMountWorker(str);
                throw new ExtractException("schemaExtractKey is null" + mountWorker);
            }
            String str2 = mountWorker[1];
            String str3 = mountWorker[2];
            SchemaDsl findSchema = EtlInstance.findSchema(str2);
            if (findSchema == null) {
                this.zookeeperService.unMountWorker(str);
                throw new ExtractException("方案" + str2 + "不存在");
            }
            System.out.println("test抽取名" + str2);
            this.schema = findSchema;
            this.extractKey = str3;
            String format = String.format("%s-%d", String.format("%s@%s", findSchema.getKey(), this.extractKey), Long.valueOf(SnowFlake.SNOW_FLAKE.nextId()));
            this.context = new JobContext<>(new JobLogger(format, findSchema.getKey(), true, this.isEnableLogs, this.isPersistence), new JobStateInfo(findSchema.getKey(), format), this.isEnableLogs, this.extractKey, str);
            this.extract = ExtractFactory.getExtract((Extract) YvanUtil.find(findSchema.getExtracts(), extract -> {
                return this.extractKey.equals(extract.getKey());
            }), this.context);
            this.transformJobList = Lists.newArrayList();
            if (findSchema.getTransforms() != null && !findSchema.getTransforms().isEmpty()) {
                Iterator<Transform> it = findSchema.getTransforms().iterator();
                while (it.hasNext()) {
                    this.transformJobList.add(TransformFactory.createTransform(it.next(), this.context));
                }
            }
            this.loadJobList = Lists.newArrayList();
            if (findSchema.getLoads() == null || findSchema.getLoads().isEmpty()) {
                return;
            }
            Iterator<Load> it2 = findSchema.getLoads().iterator();
            while (it2.hasNext()) {
                this.loadJobList.add(LoadFactory.createLoad(it2.next(), this.context));
            }
        } catch (Exception e) {
            throw new ExtractException(" zkService.newInstance has exception");
        }
    }

    public AbstractJob(SchemaDsl schemaDsl, String str, Class<ZookeeperService> cls) throws IllegalAccessException, InstantiationException {
        this.schema = schemaDsl;
        this.extractKey = str;
        this.zookeeperService = cls.newInstance();
        String str2 = "";
        try {
            String[] mountWorker = this.zookeeperService.mountWorker();
            System.out.println("========schemaExtractKey=====" + JSON.toJSONString(mountWorker));
            str2 = mountWorker[4];
        } catch (Exception e) {
            e.printStackTrace();
        }
        String format = String.format("%s-%d", String.format("%s@%s", schemaDsl.getKey(), str), Long.valueOf(SnowFlake.SNOW_FLAKE.nextId()));
        this.context = new JobContext<>(new JobLogger(format, schemaDsl.getKey(), true, this.isEnableLogs, this.isPersistence), new JobStateInfo(schemaDsl.getKey(), format), this.isEnableLogs, str, str2);
        this.extract = ExtractFactory.getExtract((Extract) YvanUtil.find(schemaDsl.getExtracts(), extract -> {
            return str.equals(extract.getKey());
        }), this.context);
        this.transformJobList = Lists.newArrayList();
        if (schemaDsl.getTransforms() != null && !schemaDsl.getTransforms().isEmpty()) {
            Iterator<Transform> it = schemaDsl.getTransforms().iterator();
            while (it.hasNext()) {
                this.transformJobList.add(TransformFactory.createTransform(it.next(), this.context));
            }
        }
        this.loadJobList = Lists.newArrayList();
        if (schemaDsl.getLoads() == null || schemaDsl.getLoads().isEmpty()) {
            return;
        }
        Iterator<Load> it2 = schemaDsl.getLoads().iterator();
        while (it2.hasNext()) {
            this.loadJobList.add(LoadFactory.createLoad(it2.next(), this.context));
        }
    }

    public void start() {
        if (this.lock) {
            this.context.jobLogger.info("[SchemeJob] 当前任务正在执行，跳过当前执行");
        }
        try {
            this.lock = true;
            this.context.jobLogger.info("[SchemeJob] 开始执行同步任务...");
            JobStateInfo jobStateInfo = this.context.getJobStateInfo();
            clearInterrupt();
            jobStateInfo.resetState();
            this.context.setStartTime(Long.valueOf(System.currentTimeMillis()));
            this.context.setEndTime(null);
            jobStateInfo.setRunState(RunState.Initializing);
            init();
            jobStateInfo.setRunState(RunState.Initialized);
            AbstractExtract<EC> extractJob = getExtractJob();
            jobStateInfo.setRunState(RunState.Synchronizing);
            List<AbstractTransform<?>> transformJob = getTransformJob();
            jobStateInfo.setSynchronizeState(SynchronizeState.Extracting);
            extractJob.extract((loadData, str) -> {
                HashMap hashMap = new HashMap();
                jobStateInfo.setSynchronizeState(SynchronizeState.Transform);
                if (ExtractType.kafka_Transform.name().equalsIgnoreCase(str)) {
                    ArrayList arrayList = new ArrayList();
                    ArrayList arrayList2 = new ArrayList();
                    ArrayList arrayList3 = new ArrayList();
                    Iterator it = loadData.getData().iterator();
                    while (it.hasNext()) {
                        HashMap hashMap2 = (HashMap) it.next();
                        String str = (String) hashMap2.get("operateType");
                        if (SaveType.delete.name().equalsIgnoreCase(str)) {
                            arrayList3.add(hashMap2);
                        } else if (SaveType.delete2ReplaceInto.name().equalsIgnoreCase(str)) {
                            arrayList2.add(hashMap2);
                        } else {
                            arrayList.add(hashMap2);
                        }
                    }
                    LoadData loadData = new LoadData(arrayList);
                    LoadData loadData2 = new LoadData(arrayList3);
                    LoadData loadData3 = new LoadData(arrayList2);
                    if (arrayList3.size() > 0) {
                        hashMap.put(SaveType.softDelete.name(), loadData2);
                    }
                    if (arrayList.size() > 0 || arrayList2.size() > 0) {
                        if (transformJob.size() <= 0) {
                            throw new RuntimeException("Your groovy needs at least one transform");
                        }
                        LoadData loadData4 = new LoadData(new ArrayList());
                        LoadData loadData5 = new LoadData(new ArrayList());
                        LinkedList linkedList = new LinkedList();
                        LinkedList linkedList2 = new LinkedList();
                        Iterator it2 = transformJob.iterator();
                        while (it2.hasNext()) {
                            AbstractTransform abstractTransform = (AbstractTransform) it2.next();
                            if (TransformType.deleteTransform.name().equalsIgnoreCase(abstractTransform.getTransformConfig().getKey())) {
                                linkedList.add(abstractTransform);
                            } else {
                                linkedList2.add(abstractTransform);
                            }
                        }
                        Iterator it3 = linkedList.iterator();
                        while (it3.hasNext()) {
                            loadData5 = ((AbstractTransform) it3.next()).transform(loadData3, "isDelete");
                        }
                        Iterator it4 = linkedList2.iterator();
                        while (it4.hasNext()) {
                            loadData4 = ((AbstractTransform) it4.next()).transform(loadData, "isDelete");
                        }
                        hashMap.put(SaveType.replaceInto.name(), loadData4);
                        hashMap.put(SaveType.delete2ReplaceInto.name(), loadData5);
                    }
                } else if (ExtractType.full.name().equalsIgnoreCase(str)) {
                    if (transformJob.size() > 0) {
                        Iterator it5 = transformJob.iterator();
                        while (it5.hasNext()) {
                            AbstractTransform abstractTransform2 = (AbstractTransform) it5.next();
                            System.out.println("============执行全量 Transform====================");
                            loadData = abstractTransform2.transform(loadData, "isDelete");
                        }
                    }
                    hashMap.put(SaveType.replaceInto.name(), loadData);
                } else {
                    ArrayList arrayList4 = new ArrayList();
                    ArrayList arrayList5 = new ArrayList();
                    ArrayList arrayList6 = new ArrayList();
                    Iterator it6 = loadData.getData().iterator();
                    while (it6.hasNext()) {
                        HashMap hashMap3 = (HashMap) it6.next();
                        String str2 = (String) hashMap3.get("operateType");
                        if (SaveType.update.name().equalsIgnoreCase(str2)) {
                            arrayList4.add(hashMap3);
                        } else if (SaveType.delete.name().equalsIgnoreCase(str2)) {
                            arrayList5.add(hashMap3);
                        } else if (SaveType.insert.name().equalsIgnoreCase(str2)) {
                            arrayList6.add(hashMap3);
                        } else {
                            this.context.jobLogger.info("THis is a illegality operateType :" + str2);
                        }
                    }
                    LoadData loadData6 = new LoadData(arrayList4);
                    LoadData loadData7 = new LoadData(arrayList5);
                    LoadData loadData8 = new LoadData(arrayList6);
                    hashMap.put(SaveType.update.name(), loadData6);
                    hashMap.put(SaveType.insert.name(), loadData8);
                    hashMap.put(SaveType.delete.name(), loadData7);
                }
                jobStateInfo.setSynchronizeState(SynchronizeState.Loading);
                Iterator<AbstractLoad<?>> it7 = this.loadJobList.iterator();
                while (it7.hasNext()) {
                    try {
                        it7.next().loadDateMapHandler(hashMap);
                    } catch (Exception e) {
                        getContext().getJobStateInfo().addFailCount();
                        exceptionHandle(e);
                    }
                }
                jobStateInfo.setSynchronizeState(SynchronizeState.Extracting);
                jobStateInfo.setSynchronizeState(SynchronizeState.Extracting);
            }, this::onSyncEnd);
        } catch (Exception e) {
            onSyncEnd(e);
        }
    }

    /* JADX WARN: Can't wrap try/catch for region: R(11:1|(2:2|(10:4|5|6|7|8|(3:10|11|12)|16|17|18|19)(10:29|30|31|7|8|(0)|16|17|18|19))|35|36|(3:38|39|40)|44|45|46|47|48|(1:(0))) */
    /* JADX WARN: Code restructure failed: missing block: B:50:0x00fb, code lost:
    
        r8 = move-exception;
     */
    /* JADX WARN: Code restructure failed: missing block: B:51:0x00fd, code lost:
    
        exceptionHandle(r8);
     */
    /* JADX WARN: Removed duplicated region for block: B:10:0x006e A[DONT_GENERATE] */
    /* JADX WARN: Removed duplicated region for block: B:38:0x00cb A[FINALLY_INSNS] */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private void onSyncEnd(java.lang.Throwable r5) {
        /*
            Method dump skipped, instructions count: 262
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: com.jzt.wotu.etl.core.job.AbstractJob.onSyncEnd(java.lang.Throwable):void");
    }

    protected void exceptionHandle(Exception exc) {
        this.context.getJobLogger().warn("[SchemeJob] 数据同步异常", (Throwable) exc);
    }

    public void startOnce() {
        this.context.setOnce(true);
        start();
    }

    public boolean isRunning() {
        return this.context.isRunning();
    }

    public void clearInterrupt() {
        if (Objects.equals(RunState.Stopped, this.context.getJobStateInfo().getRunState())) {
            AbstractExtract<EC> extractJob = getExtractJob();
            if (extractJob != null) {
                extractJob.clearInterrupt();
            }
            Iterator<AbstractLoad<?>> it = this.loadJobList.iterator();
            while (it.hasNext()) {
                it.next().clearInterrupt();
            }
            this.context.jobLogger.info("[SchemeJob] 清除中断!");
        }
    }

    public JobContext<EC> getContext() {
        return this.context;
    }
}
