package com.jzt.wotu.etl.core.datasource.kafka;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import com.baomidou.mybatisplus.core.toolkit.StringUtils;
import com.jzt.wotu.JsonWapper;
import com.jzt.wotu.YvanUtil;
import com.jzt.wotu.etl.core.MetersManager;
import com.jzt.wotu.etl.core.ZookeeperService;
import com.jzt.wotu.etl.core.datasource.jdbc.ExtractType;
import com.jzt.wotu.etl.core.datasource.jdbc.SaveType;
import com.jzt.wotu.etl.core.job.JobContext;
import com.jzt.wotu.etl.core.job.LoadData;
import com.jzt.wotu.etl.core.kafkaRetry.config.EtlConsumerConfig;
import com.jzt.wotu.etl.core.kafkaRetry.config.EtlKafkaConsumerFactory;
import com.jzt.wotu.etl.core.kafkaRetry.config.KafkaProperties;
import com.jzt.wotu.etl.core.kafkaRetry.config.KafkaRetryServiceImpl;
import com.jzt.wotu.etl.core.kafkaRetry.entity.OracleProperties;
import com.jzt.wotu.etl.core.schema.extract.AbstractExtract;
import java.lang.Thread;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/jzt/wotu/etl/core/datasource/kafka/KafkaExtract.class */
public class KafkaExtract extends AbstractExtract<KafkaExtractDsl> {
    private static final Logger log = LoggerFactory.getLogger(KafkaExtract.class);
    private KafkaExtractDsl dsl;

    public KafkaExtract(KafkaExtractDsl kafkaExtractDsl, JobContext<KafkaExtractDsl> jobContext) {
        super(kafkaExtractDsl, jobContext);
        this.dsl = kafkaExtractDsl;
    }

    @Override // com.jzt.wotu.etl.core.schema.extract.AbstractExtract
    public void extract(BiConsumer<LoadData, String> biConsumer, Consumer<Throwable> consumer) {
        String threadName = this.context.getThreadName();
        Thread thread = new Thread(() -> {
            doExtract(biConsumer);
        }, threadName);
        ZookeeperService.THREAD_MANAGER.put(threadName, thread);
        thread.start();
    }

    void doExtract(BiConsumer<LoadData, String> biConsumer) {
        KafkaExtractDsl extractConfig = getExtractConfig();
        boolean isNeedBackQuery = extractConfig.isNeedBackQuery();
        String name = isNeedBackQuery ? ExtractType.kafka_Transform.name() : ExtractType.kafka_noTransform.name();
        KafkaConsumer initKafkaConsumer = initKafkaConsumer(extractConfig);
        while (true) {
            try {
                try {
                    Thread currentThread = Thread.currentThread();
                    Thread.State state = currentThread.getState();
                    String name2 = currentThread.getName();
                    if (!Thread.State.RUNNABLE.equals(state)) {
                        System.out.println("currentThread state " + name2 + "===name==" + state.name());
                        ZookeeperService.THREAD_MANAGER.remove(name2, currentThread);
                    }
                    ConsumerRecords poll = initKafkaConsumer.poll(Duration.ofMillis(10L));
                    if (poll.partitions().size() == 0) {
                        Thread.sleep(100000L);
                    } else {
                        List<Map<String, Object>> arrayList = new ArrayList();
                        Iterator it = poll.partitions().iterator();
                        while (it.hasNext()) {
                            for (ConsumerRecord<String, String> consumerRecord : poll.records((TopicPartition) it.next())) {
                                try {
                                    arrayList = getTransferMap((String) consumerRecord.value());
                                    log.debug("=========================Pre Step 1=============================================");
                                } catch (Exception e) {
                                    log.info("===================消费数据时异常=========================================" + e.getMessage());
                                    log.info("==消费数据时异常==============kafkaRetryService.consumerLater======printRecord====" + ((String) consumerRecord.value()));
                                    new KafkaRetryServiceImpl().consumerLater(consumerRecord, initProprety(extractConfig));
                                }
                            }
                        }
                        if (CollectionUtils.isNotEmpty(arrayList)) {
                            LoadData loadData = new LoadData(arrayList);
                            log.debug("========loadData==========>" + JSON.toJSONString(loadData) + "==========isNeedBackQuery  is " + isNeedBackQuery);
                            biConsumer.accept(loadData, name);
                        }
                    }
                } catch (Throwable th) {
                    log.debug("=========================Pre Step ?=============================================");
                    initKafkaConsumer.close();
                    throw th;
                }
            } catch (Exception e2) {
                e2.printStackTrace();
                log.debug("=========================Pre Step ?=============================================");
                initKafkaConsumer.close();
                return;
            }
        }
    }

    private KafkaProperties initProprety(KafkaExtractDsl kafkaExtractDsl) {
        KafkaProperties kafkaProperties = new KafkaProperties();
        kafkaProperties.setBrokers(kafkaExtractDsl.brokers);
        kafkaProperties.setRetries(kafkaExtractDsl.producerRetryTimes);
        kafkaProperties.setRetryTopic(kafkaExtractDsl.retryTopic);
        kafkaProperties.setDltTopic(kafkaExtractDsl.dtlTopic);
        kafkaProperties.setLingerMs(kafkaExtractDsl.lingerMs);
        kafkaProperties.setBatchSize(kafkaExtractDsl.batchSize);
        kafkaProperties.setBufferMemory(kafkaExtractDsl.bufferMemory);
        kafkaProperties.setAck(kafkaExtractDsl.ack);
        kafkaProperties.setRetryKey(kafkaExtractDsl.retryKey);
        return kafkaProperties;
    }

    private Map<String, Object> getTransferMap2(String str) {
        String string;
        HashMap hashMap = new HashMap();
        JSONObject parseObject = JSON.parseObject(JSON.toJSONString(((List) JSON.parseObject(JSON.toJSONString(YvanUtil.jsonToMap(str).get("data")), List.class)).get(0)));
        Object obj = parseObject.get("cols");
        if (parseObject.containsKey("Properties")) {
            string = ((OracleProperties) JSON.parseObject(JSON.toJSONString(parseObject.get("Properties")), OracleProperties.class)).getGG_OPTYPE();
        } else {
            if (!parseObject.containsKey(MetersManager.Tag_Thread_Pool_Type)) {
                throw new RuntimeException("数据库操作类型异常");
            }
            string = parseObject.getString(MetersManager.Tag_Thread_Pool_Type);
        }
        hashMap.put("tableName", parseObject.get("table"));
        hashMap.put("operateType", string);
        List list = (List) JSON.parseObject(JSON.toJSONString(obj), List.class);
        String str2 = string;
        list.stream().forEach(obj2 -> {
            Object obj2;
            JSONObject parseObject2 = JSONObject.parseObject(JSON.toJSONString(obj2));
            String string2 = parseObject2.getString("name");
            if (SaveType.delete.name().equalsIgnoreCase(str2)) {
                if (getExtractConfig().isDeleteNeedBackQuerySwitch) {
                    hashMap.put("operateType", SaveType.delete2ReplaceInto.name());
                }
                obj2 = parseObject2.get("before");
            } else {
                obj2 = SaveType.insert.name().equalsIgnoreCase(str2) ? parseObject2.get("after") : parseObject2.get("after");
            }
            hashMap.put(string2, obj2);
        });
        return hashMap;
    }

    private List<Map<String, Object>> getTransferMap(String str) {
        ArrayList arrayList = new ArrayList();
        ((List) JSON.parseObject(JSON.toJSONString(YvanUtil.jsonToMap(str).get("data")), List.class)).stream().forEach(obj -> {
            String string;
            HashMap hashMap = new HashMap();
            JSONObject parseObject = JSON.parseObject(JSON.toJSONString(obj));
            Object obj = parseObject.get("cols");
            if (parseObject.containsKey("Properties")) {
                string = ((OracleProperties) JSON.parseObject(JSON.toJSONString(parseObject.get("Properties")), OracleProperties.class)).getGG_OPTYPE();
            } else {
                if (!parseObject.containsKey(MetersManager.Tag_Thread_Pool_Type)) {
                    throw new RuntimeException("数据库操作类型异常");
                }
                string = parseObject.getString(MetersManager.Tag_Thread_Pool_Type);
            }
            hashMap.put("tableName", parseObject.get("table"));
            hashMap.put("operateType", string);
            List list = (List) JSON.parseObject(JSON.toJSONString(obj), List.class);
            String str2 = string;
            list.stream().forEach(obj2 -> {
                Object obj2;
                JSONObject parseObject2 = JSONObject.parseObject(JSON.toJSONString(obj2));
                String string2 = parseObject2.getString("name");
                if (SaveType.delete.name().equalsIgnoreCase(str2)) {
                    if (getExtractConfig().isDeleteNeedBackQuerySwitch) {
                        hashMap.put("operateType", SaveType.delete2ReplaceInto.name());
                    }
                    obj2 = parseObject2.get("before");
                } else {
                    obj2 = SaveType.insert.name().equalsIgnoreCase(str2) ? parseObject2.get("after") : parseObject2.get("after");
                }
                hashMap.put(string2, obj2);
            });
            arrayList.add(hashMap);
        });
        return arrayList;
    }

    private KafkaConsumer initKafkaConsumer(KafkaExtractDsl kafkaExtractDsl) {
        EtlConsumerConfig etlConsumerConfig = new EtlConsumerConfig();
        try {
            String brokerByHermesTag = ZookeeperService.getInstance("com.jzt.wotu.etl.worker.ZkService").getBrokerByHermesTag(kafkaExtractDsl.getPreTopic());
            if (StringUtils.isBlank(brokerByHermesTag)) {
                throw new RuntimeException("通过preTopic:" + kafkaExtractDsl.getPreTopic() + "  获取brokers is null");
            }
            etlConsumerConfig.setBrokers((List) new JsonWapper(brokerByHermesTag).getInnerMap().get("kafkaBroker"));
            etlConsumerConfig.setAutoOffsetReset(kafkaExtractDsl.getAutoOffsetReset());
            etlConsumerConfig.setClientId(kafkaExtractDsl.getClientId());
            etlConsumerConfig.setGroupId(kafkaExtractDsl.getGroupId());
            etlConsumerConfig.setMaxPollRecords(kafkaExtractDsl.getMaxPollRecords());
            etlConsumerConfig.setTopics(Arrays.asList(kafkaExtractDsl.getTopic()));
            KafkaConsumer buildKafkaConsumer = new EtlKafkaConsumerFactory().buildKafkaConsumer(etlConsumerConfig);
            buildKafkaConsumer.subscribe(etlConsumerConfig.getTopics());
            return buildKafkaConsumer;
        } catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException("初始化Kafka 消费者异常：" + e.getMessage());
        }
    }
}
