package com.jzt.wotu.etl.core.datasource.kafka;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import com.baomidou.mybatisplus.core.toolkit.StringUtils;
import com.jzt.wotu.JsonWapper;
import com.jzt.wotu.YvanUtil;
import com.jzt.wotu.etl.core.ZookeeperService;
import com.jzt.wotu.etl.core.datasource.Thread.ThreadPoolManager;
import com.jzt.wotu.etl.core.datasource.jdbc.ExtractType;
import com.jzt.wotu.etl.core.datasource.jdbc.SaveType;
import com.jzt.wotu.etl.core.datasource.jdbc.dialects.IDialect;
import com.jzt.wotu.etl.core.job.JobContext;
import com.jzt.wotu.etl.core.job.LoadData;
import com.jzt.wotu.etl.core.kafkaRetry.config.EtlConsumerConfig;
import com.jzt.wotu.etl.core.kafkaRetry.config.EtlKafkaConsumerFactory;
import com.jzt.wotu.etl.core.kafkaRetry.config.KafkaRetryServiceImpl;
import com.jzt.wotu.etl.core.kafkaRetry.entity.OracleProperties;
import com.jzt.wotu.etl.core.model.Extract;
import com.jzt.wotu.etl.core.schema.extract.AbstractExtract;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.lang.management.ManagementFactory;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.JedisCluster;

/* loaded from: input_file:com/jzt/wotu/etl/core/datasource/kafka/KafkaExtract.class */
public class KafkaExtract extends AbstractExtract<KafkaExtractDsl> {
    private static final Logger log = LoggerFactory.getLogger(KafkaExtract.class);
    private KafkaExtractDsl dsl;
    private List<String> brokerList;
    private String threadName;
    private static final String PROPERTIES = "Properties";
    private static final String DATA = "data";
    private static final String SIZE = "size";
    private static final String COLS = "cols";
    private static final String TYPE = "type";
    private static final int SINGLE_SIZE = 1;
    private static final String TABLENAME = "tableName";
    private static final String OPERATETYPE = "operateType";
    private static JedisCluster jedis;

    public KafkaExtract(KafkaExtractDsl kafkaExtractDsl, JobContext<KafkaExtractDsl> jobContext) {
        super(kafkaExtractDsl, jobContext);
        this.brokerList = new ArrayList();
        this.dsl = kafkaExtractDsl;
    }

    @Override // com.jzt.wotu.etl.core.schema.extract.AbstractExtract
    public void extract(BiConsumer<LoadData, Map<String, Object>> biConsumer, Consumer<Throwable> consumer) {
        this.threadName = this.context.getThreadName();
        if (StringUtils.isBlank(this.threadName)) {
            this.threadName = "NotThreadName";
        }
        System.out.println("threadName start " + this.threadName);
        Thread thread = new Thread(() -> {
            doExtract(biConsumer, this.threadName);
        }, this.threadName);
        ZookeeperService.THREAD_MANAGER.put(this.threadName, thread);
        thread.start();
    }

    void threadExtract(BiConsumer<LoadData, Map<String, Object>> biConsumer, String str) {
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        KafkaExtractDsl extractConfig = getExtractConfig();
        String name = extractConfig.isNeedBackQuery() ? ExtractType.kafka_Transform.name() : ExtractType.kafka_noTransform.name();
        KafkaConsumer initKafkaConsumer = initKafkaConsumer(extractConfig);
        try {
            try {
                log.debug("1Used:" + ((ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed() / 1024) / 1024) + "MB");
                while (true) {
                    int queueSize = ThreadPoolManager.getInstance().getQueueSize();
                    int activeCount = ThreadPoolManager.getInstance().getActiveCount();
                    if (queueSize < ThreadPoolManager.staticGetQueenSize() || activeCount < ThreadPoolManager.staticGetMaximumPoolSize()) {
                        ConsumerRecords poll = initKafkaConsumer.poll(Duration.ofMillis(100L));
                        if (poll == null || poll.partitions().size() == 0) {
                            Thread.sleep(100L);
                        } else {
                            System.out.println("currentQueenSize:" + queueSize + "=activeThread:" + activeCount);
                            long currentTimeMillis = System.currentTimeMillis();
                            double random = Math.random() * 1000.0d;
                            String str2 = currentTimeMillis + "_" + currentTimeMillis;
                            System.out.println(str + "==>" + str2 + "=>任务start开始提交：" + simpleDateFormat.format(new Date(System.currentTimeMillis())) + "=>recordsSize is " + poll.count());
                            try {
                                try {
                                    String str3 = name;
                                    ThreadPoolManager.getInstance().commitTask(() -> {
                                        List<String> saveToDb = saveToDb(poll, biConsumer, str3, str);
                                        System.out.print(str + "==>" + str2 + "=>Mid完成任务提交：" + simpleDateFormat.format(new Date(System.currentTimeMillis())) + "==>offSetList:" + JSON.toJSONString(saveToDb));
                                        return null;
                                    });
                                    try {
                                        initKafkaConsumer.commitAsync();
                                    } catch (CommitFailedException e) {
                                        System.out.println("====================offSet 提交失败========================");
                                        try {
                                            initKafkaConsumer.commitSync();
                                            initKafkaConsumer.close();
                                        } catch (Throwable th) {
                                            initKafkaConsumer.close();
                                            throw th;
                                            break;
                                        }
                                    }
                                    System.out.println(str2 + "=>end完成offSet提交：" + simpleDateFormat.format(new Date(System.currentTimeMillis())));
                                } catch (Exception e2) {
                                    e2.printStackTrace();
                                    System.out.println("提交任务冲突" + e2.getMessage());
                                    System.out.println(str2 + "=>end完成offSet提交：" + simpleDateFormat.format(new Date(System.currentTimeMillis())));
                                }
                            } finally {
                            }
                        }
                    } else {
                        Thread.sleep(100L);
                    }
                }
            } catch (Exception e3) {
                e3.printStackTrace();
                log.debug("=========================Pre Step ?=============================================");
                initKafkaConsumer.close();
            }
        } catch (Throwable th2) {
            log.debug("=========================Pre Step ?=============================================");
            initKafkaConsumer.close();
            throw th2;
        }
    }

    void doExtract(BiConsumer<LoadData, Map<String, Object>> biConsumer, String str) {
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        KafkaExtractDsl extractConfig = getExtractConfig();
        String name = extractConfig.isNeedBackQuery() ? ExtractType.kafka_Transform.name() : ExtractType.kafka_noTransform.name();
        int i = 0;
        LinkedBlockingQueue<KafkaConsumer> linkedBlockingQueue = new LinkedBlockingQueue(extractConfig.workLimit.intValue());
        while (extractConfig.workLimit.intValue() > i) {
            linkedBlockingQueue.add(initKafkaConsumer(extractConfig));
            i += SINGLE_SIZE;
        }
        System.out.println("threadName is " + str + "====消费者个数：" + linkedBlockingQueue.size());
        for (KafkaConsumer kafkaConsumer : linkedBlockingQueue) {
            String str2 = name;
            new Thread(() -> {
                System.out.println("当前消费者为：" + kafkaConsumer.toString());
                while (true) {
                    try {
                        try {
                            ConsumerRecords<String, String> poll = kafkaConsumer.poll(Duration.ofMillis(100L));
                            if (poll == null || poll.partitions().size() == 0) {
                                Thread.sleep(100L);
                            } else {
                                try {
                                    long currentTimeMillis = System.currentTimeMillis();
                                    double random = Math.random() * 1000.0d;
                                    String str3 = currentTimeMillis + "_" + currentTimeMillis;
                                    System.out.println(str + "==>" + str3 + "=>任务start开始提交：" + simpleDateFormat.format(new Date(System.currentTimeMillis())) + "=>recordsSize is " + poll.count());
                                    List<String> saveToDb = saveToDb(poll, biConsumer, str2, str);
                                    System.out.print(str + "==>" + str3 + "=>Mid完成任务提交：" + simpleDateFormat.format(new Date(System.currentTimeMillis())) + "==>offSetList:" + JSON.toJSONString(saveToDb));
                                    kafkaConsumer.commitAsync();
                                } catch (CommitFailedException e) {
                                    System.out.println("====================offSet 提交失败========================");
                                    try {
                                        kafkaConsumer.commitSync();
                                        kafkaConsumer.close();
                                    } finally {
                                    }
                                }
                            }
                        } catch (Exception e2) {
                            e2.printStackTrace();
                            log.debug("=========================Pre Step ?=============================================");
                            kafkaConsumer.close();
                            return;
                        }
                    } catch (Throwable th) {
                        log.debug("=========================Pre Step ?=============================================");
                        kafkaConsumer.close();
                        throw th;
                    }
                }
            }, str).start();
            i += SINGLE_SIZE;
        }
    }

    public List<String> saveToDb(ConsumerRecords<String, String> consumerRecords, BiConsumer<LoadData, Map<String, Object>> biConsumer, String str, String str2) {
        List<String> synchronizedList = Collections.synchronizedList(new ArrayList());
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        log.debug(((KafkaExtractDsl) this.extractConfig).getKey() + "=》开始消费数据=时间：" + simpleDateFormat.format(new Date(System.currentTimeMillis())) + "||分区数:" + consumerRecords.partitions().size());
        for (TopicPartition topicPartition : consumerRecords.partitions()) {
            consumerRecords.records(topicPartition).forEach(consumerRecord -> {
                String format = simpleDateFormat.format(new Date(System.currentTimeMillis()));
                long currentTimeMillis = System.currentTimeMillis();
                double random = Math.random() * 1000.0d;
                String str3 = currentTimeMillis + "_" + currentTimeMillis;
                try {
                    long offset = consumerRecord.offset();
                    int partition = topicPartition.partition();
                    Logger logger = log;
                    long offset2 = consumerRecord.offset();
                    String key = ((KafkaExtractDsl) this.extractConfig).getKey();
                    String str4 = consumerRecord.topic();
                    int partition2 = topicPartition.partition();
                    long offset3 = consumerRecord.offset();
                    consumerRecords.count();
                    logger.info(str2 + "=offSet=" + offset2 + "==>" + logger + ":" + str3 + ":获取到=topic=>" + key + "partitionIs" + str4 + "=>time is" + partition2 + "===offset is " + format + "=>records.countIs=>" + offset3);
                    synchronizedList.add(topicPartition.partition() + "_" + consumerRecord.offset());
                    String str5 = (String) consumerRecord.value();
                    List<Map<String, Object>> transferMap = getTransferMap(str5);
                    System.out.println();
                    log.debug(str2 + "==>" + str3 + ":==获取到数据=============" + str5);
                    if (CollectionUtils.isEmpty(transferMap)) {
                        return;
                    }
                    doTask(biConsumer, transferMap, str, str3, str2, offset, partition);
                } catch (Exception e) {
                    if (e.getMessage().equalsIgnoreCase("taskReject")) {
                        throw new RuntimeException("任务被拒绝");
                    }
                    log.info("===================消费数据时异常======" + e.getMessage());
                    log.info("==异常数据====" + ((String) consumerRecord.value()));
                    KafkaRetryServiceImpl.getInstance().consumerLater(consumerRecord, this.brokerList, JSON.toJSONString(e));
                }
            });
        }
        return synchronizedList;
    }

    public void doTask(BiConsumer<LoadData, Map<String, Object>> biConsumer, List<Map<String, Object>> list, String str, String str2, String str3, long j, int i) {
        log.debug(str3 + "==>" + str2 + ":开始消费数据=============rowListSize" + list.size() + "==>" + JSONArray.toJSONString(list));
        Logger logger = log;
        logger.info(str3 + "==>" + str2 + ":开始消费数据============offset is " + j + "===》partition:" + logger);
        LoadData loadData = new LoadData(list);
        HashMap hashMap = new HashMap();
        hashMap.put("ExtractType", str);
        hashMap.put("offSet", Long.valueOf(j));
        biConsumer.accept(loadData, hashMap);
        log.debug(str3 + "==>" + str2 + ":" + ((KafkaExtractDsl) this.extractConfig).getKey() + ":结束消费数据======rowListSize" + list.size() + "==>" + JSONArray.toJSONString(list));
        Logger logger2 = log;
        logger2.info(str3 + "==>" + str2 + ":" + ((KafkaExtractDsl) this.extractConfig).getKey() + ":结束消费数据=====offset is " + j + "===》partition:" + logger2);
    }

    void doExtractTest(BiConsumer<LoadData, Map<String, Object>> biConsumer, String str) {
        KafkaExtractDsl extractConfig = getExtractConfig();
        String name = extractConfig.isNeedBackQuery() ? ExtractType.kafka_Transform.name() : ExtractType.kafka_noTransform.name();
        initKafkaConsumer(extractConfig);
        new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date(System.currentTimeMillis()));
        String testData = getTestData();
        List<Map<String, Object>> transferMap = getTransferMap(testData);
        System.out.println();
        log.debug(str + "==>:==获取到数据=============" + testData);
        if (CollectionUtils.isEmpty(transferMap)) {
            return;
        }
        doTask(biConsumer, transferMap, name, "", str, 0L, 0);
    }

    private String getTestData() {
        return readJsonFile(KafkaExtract.class.getClassLoader().getResource("test.json").getPath());
    }

    private String readJsonFile(String str) {
        try {
            File file = new File(str);
            FileReader fileReader = new FileReader(file);
            InputStreamReader inputStreamReader = new InputStreamReader(new FileInputStream(file), "utf-8");
            StringBuffer stringBuffer = new StringBuffer();
            while (true) {
                int read = inputStreamReader.read();
                if (read == -1) {
                    fileReader.close();
                    inputStreamReader.close();
                    return stringBuffer.toString();
                }
                stringBuffer.append((char) read);
            }
        } catch (IOException e) {
            e.printStackTrace();
            return null;
        }
    }

    private List<String> spitRecords(String str) {
        ArrayList arrayList = new ArrayList();
        Map jsonToMap = YvanUtil.jsonToMap(str);
        Object obj = jsonToMap.get(DATA);
        if (SINGLE_SIZE == ((Integer) jsonToMap.get(SIZE)).intValue()) {
            arrayList.add(str);
            return arrayList;
        }
        ((List) JSON.parseObject(JSON.toJSONString(obj), List.class)).parallelStream().forEach(obj2 -> {
            ArrayList arrayList2 = new ArrayList();
            arrayList2.add(obj2);
            jsonToMap.put(DATA, arrayList2);
            jsonToMap.put(SIZE, Integer.valueOf(SINGLE_SIZE));
            arrayList.add(JSON.toJSONString(jsonToMap));
        });
        return arrayList;
    }

    private List<Map<String, Object>> getTransferMap(String str) {
        ArrayList arrayList = new ArrayList();
        ((List) JSON.parseObject(JSON.toJSONString(YvanUtil.jsonToMap(str).get(DATA)), List.class)).forEach(obj -> {
            String string;
            Object obj;
            HashMap hashMap = new HashMap();
            JSONObject parseObject = JSON.parseObject(JSON.toJSONString(obj));
            Object obj2 = parseObject.get(COLS);
            if (parseObject.containsKey(PROPERTIES)) {
                string = ((OracleProperties) JSON.parseObject(JSON.toJSONString(parseObject.get(PROPERTIES)), OracleProperties.class)).getGG_OPTYPE();
            } else {
                if (!parseObject.containsKey("type")) {
                    throw new RuntimeException("数据库操作类型异常");
                }
                string = parseObject.getString("type");
            }
            hashMap.put(TABLENAME, parseObject.get("table"));
            hashMap.put(OPERATETYPE, string);
            List list = (List) JSON.parseObject(JSON.toJSONString(obj2), List.class);
            String str2 = null;
            for (int i = 0; i < list.size(); i += SINGLE_SIZE) {
                JSONObject parseObject2 = JSONObject.parseObject(JSON.toJSONString(list.get(i)));
                String string2 = parseObject2.getString("name");
                if (SaveType.delete.name().equalsIgnoreCase(string)) {
                    if (getExtractConfig().isDeleteNeedBackQuerySwitch) {
                        hashMap.put(OPERATETYPE, SaveType.delete2ReplaceInto.name());
                    }
                    obj = parseObject2.get("before");
                } else if (SaveType.insert.name().equalsIgnoreCase(string)) {
                    obj = parseObject2.get("after");
                } else {
                    obj = parseObject2.get("after");
                    if (obj == null) {
                        obj = parseObject2.get("before");
                    }
                }
                if ("PK".equalsIgnoreCase(string2)) {
                    str2 = (String) obj;
                }
                hashMap.put(string2, obj);
            }
            System.out.print("table is " + parseObject.get("table") + "<=>operateType is " + string + "===>pkIs" + str2 + "||");
            arrayList.add(hashMap);
        });
        return arrayList;
    }

    private KafkaConsumer initKafkaConsumer(KafkaExtractDsl kafkaExtractDsl) {
        EtlConsumerConfig etlConsumerConfig = new EtlConsumerConfig();
        String str = ZookeeperService.ENV_MAP.get("prefix");
        if (Extract.TEST.equalsIgnoreCase(str)) {
            str = kafkaExtractDsl.getPreTopic() + "-" + str;
        }
        String str2 = ZookeeperService.ENV_MAP.get("kafkaBrokers");
        if (StringUtils.isNotBlank(str2) && str2.contains(IDialect.COMMA)) {
            Collections.addAll(this.brokerList, str2.split(IDialect.COMMA));
        } else {
            try {
                String brokerByHermesTag = ZookeeperService.getInstance("com.jzt.wotu.etl.worker.ZkService").getBrokerByHermesTag(str);
                if (StringUtils.isBlank(brokerByHermesTag)) {
                    throw new RuntimeException("通过环境变量:" + str + "  获取brokers is null");
                }
                this.brokerList = (List) new JsonWapper(brokerByHermesTag).getInnerMap().get("kafkaBroker");
                System.out.println("从zk获取到kafka brokers" + this.brokerList);
            } catch (Exception e) {
                e.printStackTrace();
                throw new RuntimeException("初始化Kafka 消费者异常：" + e.getMessage());
            }
        }
        if (kafkaExtractDsl.getDebugBrokers() != null) {
            this.brokerList = new ArrayList();
            Collections.addAll(this.brokerList, kafkaExtractDsl.getDebugBrokers().split(IDialect.COMMA));
        }
        etlConsumerConfig.setBrokers(this.brokerList);
        etlConsumerConfig.setAutoOffsetReset(kafkaExtractDsl.getAutoOffsetReset());
        etlConsumerConfig.setClientId(kafkaExtractDsl.getClientId() + ((int) (Math.random() * 100.0d)));
        String str3 = kafkaExtractDsl.getPreTopic() + "." + kafkaExtractDsl.getTopic();
        if (!StringUtils.isBlank(kafkaExtractDsl.getIdentifier())) {
            str3 = str3 + "-" + kafkaExtractDsl.getIdentifier();
        }
        if (!StringUtils.isBlank(kafkaExtractDsl.getDebugKey())) {
            str3 = str3 + "-" + kafkaExtractDsl.getDebugKey();
        }
        etlConsumerConfig.setGroupId(str3);
        etlConsumerConfig.setMaxPollRecords(kafkaExtractDsl.getMaxPollRecords());
        String str4 = str + "." + kafkaExtractDsl.getTopic();
        if (kafkaExtractDsl.getDebugTopic() != null) {
            str4 = kafkaExtractDsl.getDebugTopic();
        }
        System.out.print(" topic = " + str4);
        System.out.println(" groupId = " + str3);
        etlConsumerConfig.setTopics(Arrays.asList(str4));
        KafkaConsumer buildKafkaConsumer = new EtlKafkaConsumerFactory().buildKafkaConsumer(etlConsumerConfig);
        buildKafkaConsumer.subscribe(etlConsumerConfig.getTopics());
        return buildKafkaConsumer;
    }
}
