package com.jzt.wotu.etl.core.datasource.hermes;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.jzt.wotu.etl.core.job.JobContext;
import com.jzt.wotu.etl.core.job.LoadData;
import com.jzt.wotu.etl.core.kafkaRetry.config.EtlConsumerConfig;
import com.jzt.wotu.etl.core.kafkaRetry.config.EtlKafkaConsumerFactory;
import com.jzt.wotu.etl.core.kafkaRetry.entity.OracleCols;
import com.jzt.wotu.etl.core.kafkaRetry.entity.OracleMsg;
import com.jzt.wotu.etl.core.kafkaRetry.entity.OracleProperties;
import com.jzt.wotu.etl.core.schema.extract.AbstractExtract;
import java.io.PrintStream;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.springframework.beans.factory.annotation.Autowired;

/* loaded from: input_file:com/jzt/wotu/etl/core/datasource/hermes/HermesExtract.class */
public class HermesExtract extends AbstractExtract<HermesExtractDsl> {

    @Autowired
    private EtlKafkaConsumerFactory factory;
    private final HermesExtractDsl dsl;

    public HermesExtract(HermesExtractDsl hermesExtractDsl, JobContext<HermesExtractDsl> jobContext) {
        super(hermesExtractDsl, jobContext);
        this.dsl = hermesExtractDsl;
    }

    @Override // com.jzt.wotu.etl.core.schema.extract.AbstractExtract
    public void extract(BiConsumer<LoadData, Boolean> biConsumer, Consumer<Throwable> consumer) {
        doExtract(biConsumer);
    }

    void doExtract(BiConsumer<LoadData, Boolean> biConsumer) {
        KafkaConsumer initKafkaConsumer = initKafkaConsumer();
        while (true) {
            try {
                try {
                    ConsumerRecords poll = initKafkaConsumer.poll(Duration.ofMillis(0L));
                    for (TopicPartition topicPartition : poll.partitions()) {
                        List<ConsumerRecord> records = poll.records(topicPartition);
                        for (ConsumerRecord consumerRecord : records) {
                            try {
                                PrintStream printStream = System.out;
                                long offset = consumerRecord.offset();
                                printStream.println("======================Kafka======extract==================" + offset + ": " + printStream);
                                OracleMsg oracleMsg = (OracleMsg) JSONObject.parseObject(JSON.toJSONString(JSONObject.parseObject((String) consumerRecord.value()).get("data")), OracleMsg.class);
                                OracleProperties properties = oracleMsg.getProperties();
                                String gg_table = properties.getGG_TABLE();
                                String gg_optype = properties.getGG_OPTYPE();
                                List<OracleCols> cols = oracleMsg.getCols();
                                cols.stream().forEach(oracleCols -> {
                                    oracleCols.getName();
                                    oracleCols.getBefore();
                                    oracleCols.getAfter();
                                });
                                System.out.println("======================Kafka======ProducerMsg=========tableName====is =====" + gg_table + "===========================operateType====is =====" + gg_optype + "===========================cols====is =====" + JSONArray.toJSONString(cols));
                            } catch (Exception e) {
                                System.out.println("================kafkaRetryService.consumerLater====================");
                            }
                        }
                        long offset2 = ((ConsumerRecord) records.get(records.size() - 1)).offset();
                        initKafkaConsumer.commitSync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(offset2 + 1)));
                        biConsumer.accept(new LoadData((List<?>) Arrays.asList(Long.valueOf(offset2))), true);
                    }
                } catch (Exception e2) {
                    e2.printStackTrace();
                    initKafkaConsumer.close();
                    return;
                }
            } catch (Throwable th) {
                initKafkaConsumer.close();
                throw th;
            }
        }
    }

    private String dealData(ConsumerRecord<String, String> consumerRecord) {
        return null;
    }

    private KafkaConsumer initKafkaConsumer() {
        HermesExtractDsl extractConfig = getExtractConfig();
        EtlConsumerConfig etlConsumerConfig = new EtlConsumerConfig();
        etlConsumerConfig.setBrokers(extractConfig.getBrokers());
        etlConsumerConfig.setAutoOffsetReset(extractConfig.getAutoOffsetReset());
        etlConsumerConfig.setClientId(extractConfig.getClientId());
        etlConsumerConfig.setGroupId(extractConfig.getGroupId());
        etlConsumerConfig.setTopics(Arrays.asList(extractConfig.getTopic()));
        KafkaConsumer buildKafkaConsumer = new EtlKafkaConsumerFactory().buildKafkaConsumer(etlConsumerConfig);
        buildKafkaConsumer.subscribe(etlConsumerConfig.getTopics());
        return buildKafkaConsumer;
    }
}
