package com.jzt.wotu.etl.core.datasource.kafka;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import com.baomidou.mybatisplus.core.toolkit.StringUtils;
import com.jzt.wotu.JsonWapper;
import com.jzt.wotu.YvanUtil;
import com.jzt.wotu.etl.core.ZookeeperService;
import com.jzt.wotu.etl.core.datasource.jdbc.ExtractType;
import com.jzt.wotu.etl.core.datasource.jdbc.SaveType;
import com.jzt.wotu.etl.core.datasource.jdbc.dialects.IDialect;
import com.jzt.wotu.etl.core.job.JobContext;
import com.jzt.wotu.etl.core.job.LoadData;
import com.jzt.wotu.etl.core.kafkaRetry.config.EtlConsumerConfig;
import com.jzt.wotu.etl.core.kafkaRetry.config.EtlKafkaConsumerFactory;
import com.jzt.wotu.etl.core.kafkaRetry.config.KafkaRetryServiceImpl;
import com.jzt.wotu.etl.core.kafkaRetry.entity.OracleProperties;
import com.jzt.wotu.etl.core.model.Extract;
import com.jzt.wotu.etl.core.schema.extract.AbstractExtract;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.JedisCluster;

/* loaded from: input_file:com/jzt/wotu/etl/core/datasource/kafka/KafkaExtract.class */
public class KafkaExtract extends AbstractExtract<KafkaExtractDsl> {
    private static final Logger log = LoggerFactory.getLogger(KafkaExtract.class);
    private KafkaExtractDsl dsl;
    private List<String> brokerList;
    private String threadName;
    private static final String PROPERTIES = "Properties";
    private static final String DATA = "data";
    private static final String SIZE = "size";
    private static final String COLS = "cols";
    private static final String TYPE = "type";
    private static final int SINGLE_SIZE = 1;
    private static final String TABLENAME = "tableName";
    private static final String OPERATETYPE = "operateType";
    private static JedisCluster jedis;

    public KafkaExtract(KafkaExtractDsl kafkaExtractDsl, JobContext<KafkaExtractDsl> jobContext) {
        super(kafkaExtractDsl, jobContext);
        this.brokerList = new ArrayList();
        this.dsl = kafkaExtractDsl;
    }

    @Override // com.jzt.wotu.etl.core.schema.extract.AbstractExtract
    public void extract(BiConsumer<LoadData, Map<String, Object>> biConsumer, Consumer<Throwable> consumer) {
        this.threadName = this.context.getThreadName();
        Thread thread = new Thread(() -> {
            doExtract1(biConsumer, this.threadName);
        }, this.threadName);
        System.out.println("threadName is " + this.threadName);
        ZookeeperService.THREAD_MANAGER.put(this.threadName, thread);
        thread.start();
    }

    void doExtract1(BiConsumer<LoadData, Map<String, Object>> biConsumer, String str) {
        KafkaExtractDsl extractConfig = getExtractConfig();
        String name = extractConfig.isNeedBackQuery() ? ExtractType.kafka_Transform.name() : ExtractType.kafka_noTransform.name();
        KafkaConsumer initKafkaConsumer = initKafkaConsumer(extractConfig);
        try {
            try {
                LinkedList linkedList = new LinkedList();
                long currentTimeMillis = System.currentTimeMillis();
                MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
                while (true) {
                    ConsumerRecords poll = initKafkaConsumer.poll(Duration.ofMillis(200L));
                    if ((poll == null || poll.partitions().size() == 0 || linkedList.size() > 50) ? false : true) {
                        linkedList.add(poll);
                        try {
                            try {
                                initKafkaConsumer.commitAsync();
                                currentTimeMillis = System.currentTimeMillis();
                            } catch (Throwable th) {
                                System.currentTimeMillis();
                                throw th;
                            }
                        } catch (CommitFailedException e) {
                            System.out.println("====================offSet 提交失败========================");
                            try {
                                initKafkaConsumer.commitSync();
                                initKafkaConsumer.close();
                                currentTimeMillis = System.currentTimeMillis();
                            } finally {
                            }
                        }
                    }
                    boolean z = System.currentTimeMillis() - currentTimeMillis > 1000;
                    if (linkedList.size() > 100 || z) {
                        System.out.println("进行业务处理Used:" + ((memoryMXBean.getHeapMemoryUsage().getUsed() / 1024) / 1024) + "MB");
                        String str2 = name;
                        linkedList.forEach(consumerRecords -> {
                            saveToDb(consumerRecords, initKafkaConsumer, biConsumer, str2);
                        });
                        linkedList.clear();
                    }
                    if (poll == null || poll.partitions().size() == 0) {
                        Thread.sleep(100L);
                    }
                }
            } catch (Exception e2) {
                e2.printStackTrace();
                log.debug("=========================Pre Step ?=============================================");
                initKafkaConsumer.close();
            }
        } catch (Throwable th2) {
            log.debug("=========================Pre Step ?=============================================");
            initKafkaConsumer.close();
            throw th2;
        }
    }

    void doExtract(BiConsumer<LoadData, Map<String, Object>> biConsumer, String str) {
        KafkaExtractDsl extractConfig = getExtractConfig();
        String name = extractConfig.isNeedBackQuery() ? ExtractType.kafka_Transform.name() : ExtractType.kafka_noTransform.name();
        KafkaConsumer initKafkaConsumer = initKafkaConsumer(extractConfig);
        try {
            try {
                MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
                System.out.println("1Used:" + ((memoryMXBean.getHeapMemoryUsage().getUsed() / 1024) / 1024) + "MB");
                while (true) {
                    ConsumerRecords poll = initKafkaConsumer.poll(Duration.ofMillis(100L));
                    if (poll == null || poll.partitions().size() == 0) {
                        Thread.sleep(100L);
                    } else {
                        try {
                            initKafkaConsumer.commitAsync();
                        } catch (CommitFailedException e) {
                            System.out.println("====================offSet 提交失败========================");
                            try {
                                initKafkaConsumer.commitSync();
                                initKafkaConsumer.close();
                            } finally {
                            }
                        }
                        String str2 = name;
                        new Thread(() -> {
                            saveToDb(poll, initKafkaConsumer, biConsumer, str2);
                        }).start();
                        System.out.println("Used:" + ((memoryMXBean.getHeapMemoryUsage().getUsed() / 1024) / 1024) + "MB");
                    }
                }
            } catch (Throwable th) {
                log.debug("=========================Pre Step ?=============================================");
                initKafkaConsumer.close();
                throw th;
            }
        } catch (Exception e2) {
            e2.printStackTrace();
            log.debug("=========================Pre Step ?=============================================");
            initKafkaConsumer.close();
        }
    }

    public void resetSaveToDb(ConsumerRecords<String, String> consumerRecords, KafkaConsumer kafkaConsumer, BiConsumer<LoadData, Map<String, Object>> biConsumer, String str) {
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        log.info(((KafkaExtractDsl) this.extractConfig).getKey() + "   开始消费数据=时间  " + simpleDateFormat.format(new Date(System.currentTimeMillis())) + "|||===records.count  is " + consumerRecords.count());
        for (TopicPartition topicPartition : consumerRecords.partitions()) {
            if (((KafkaExtractDsl) this.extractConfig).getDebugOffset() != null) {
                kafkaConsumer.seek(topicPartition, ((KafkaExtractDsl) this.extractConfig).getDebugOffset().longValue());
            }
            for (ConsumerRecord consumerRecord : consumerRecords.records(topicPartition)) {
                String format = simpleDateFormat.format(new Date(System.currentTimeMillis()));
                Logger logger = log;
                String key = ((KafkaExtractDsl) this.extractConfig).getKey();
                String str2 = consumerRecord.topic();
                int partition = topicPartition.partition();
                long offset = consumerRecord.offset();
                consumerRecords.count();
                logger.info(key + ":开始消费数据=topic=>" + str2 + "partition.partition()" + partition + "=>time is" + format + "===offset is " + offset + "=>records.countIs=>" + logger);
                String str3 = consumerRecord.topic();
                String str4 = (String) consumerRecord.key();
                long offset2 = consumerRecord.offset();
                spitRecords((String) consumerRecord.value()).parallelStream().forEach(str5 -> {
                    List<Map<String, Object>> transferMap = getTransferMap(str5);
                    String str5 = (String) transferMap.get(0).get(OPERATETYPE);
                    System.out.println("执行");
                    if ("DELETE".equalsIgnoreCase(str5)) {
                        try {
                            Thread.sleep(100L);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    log.debug("==开始消费数据=============" + str5);
                    log.debug("=========================Pre Step 1=============================================");
                    if (CollectionUtils.isNotEmpty(transferMap)) {
                        try {
                            try {
                                HashMap hashMap = new HashMap();
                                hashMap.put("ExtractType", str);
                                hashMap.put("offSet", String.valueOf(offset2));
                                biConsumer.accept(new LoadData(transferMap), hashMap);
                                Logger logger2 = log;
                                String key2 = ((KafkaExtractDsl) this.extractConfig).getKey();
                                String str6 = consumerRecord.topic();
                                int partition2 = topicPartition.partition();
                                long offset3 = consumerRecord.offset();
                                consumerRecords.count();
                                logger2.info(key2 + ":结束消费数据=topic=>" + str6 + "partition.partition()" + partition2 + "=>time is" + format + "===offset is " + offset3 + "=>records.countIs=>" + logger2);
                            } catch (Exception e2) {
                                log.info("===================消费数据时异常======" + e2.getMessage());
                                KafkaRetryServiceImpl kafkaRetryServiceImpl = new KafkaRetryServiceImpl();
                                System.out.println(JSONArray.toJSONString(transferMap));
                                kafkaRetryServiceImpl.commitSingleMsg(str3, str4, str5, this.brokerList, JSON.toJSONString(e2));
                                Logger logger3 = log;
                                String key3 = ((KafkaExtractDsl) this.extractConfig).getKey();
                                String str7 = consumerRecord.topic();
                                int partition3 = topicPartition.partition();
                                long offset4 = consumerRecord.offset();
                                consumerRecords.count();
                                logger3.info(key3 + ":结束消费数据=topic=>" + str7 + "partition.partition()" + partition3 + "=>time is" + format + "===offset is " + offset4 + "=>records.countIs=>" + logger3);
                            }
                        } catch (Throwable th) {
                            Logger logger4 = log;
                            String key4 = ((KafkaExtractDsl) this.extractConfig).getKey();
                            String str8 = consumerRecord.topic();
                            int partition4 = topicPartition.partition();
                            long offset5 = consumerRecord.offset();
                            consumerRecords.count();
                            logger4.info(key4 + ":结束消费数据=topic=>" + str8 + "partition.partition()" + partition4 + "=>time is" + format + "===offset is " + offset5 + "=>records.countIs=>" + logger4);
                            throw th;
                        }
                    }
                });
            }
        }
    }

    public void saveToDb(ConsumerRecords<String, String> consumerRecords, KafkaConsumer kafkaConsumer, BiConsumer<LoadData, Map<String, Object>> biConsumer, String str) {
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        log.info(((KafkaExtractDsl) this.extractConfig).getKey() + "   开始消费数据=时间  " + simpleDateFormat.format(new Date(System.currentTimeMillis())) + "|||===records.count  is " + consumerRecords.count());
        List<Map<String, Object>> arrayList = new ArrayList();
        for (TopicPartition topicPartition : consumerRecords.partitions()) {
            if (((KafkaExtractDsl) this.extractConfig).getDebugOffset() != null) {
                kafkaConsumer.seek(topicPartition, ((KafkaExtractDsl) this.extractConfig).getDebugOffset().longValue());
            }
            List<ConsumerRecord<String, String>> records = consumerRecords.records(topicPartition);
            for (ConsumerRecord<String, String> consumerRecord : records) {
                String format = simpleDateFormat.format(new Date(System.currentTimeMillis()));
                try {
                    try {
                        Logger logger = log;
                        String key = ((KafkaExtractDsl) this.extractConfig).getKey();
                        String str2 = consumerRecord.topic();
                        int partition = topicPartition.partition();
                        long offset = consumerRecord.offset();
                        consumerRecords.count();
                        logger.info(key + ":开始消费数据=topic=>" + str2 + "partition.partition()" + partition + "=>time is" + format + "===offset is " + offset + "=>records.countIs=>" + logger);
                        String str3 = (String) consumerRecord.value();
                        System.out.println("size" + str3.toCharArray().length);
                        arrayList = getTransferMap(str3);
                        log.debug("==开始消费数据=============" + str3);
                        long offset2 = ((ConsumerRecord) records.get(records.size() - SINGLE_SIZE)).offset();
                        Logger logger2 = log;
                        topicPartition.topic();
                        logger2.debug("开始消费数据======lastOffset  is " + offset2 + "  Topic is  " + logger2);
                        log.debug("=========================Pre Step 1=============================================");
                        if (CollectionUtils.isNotEmpty(arrayList)) {
                            LoadData loadData = new LoadData(arrayList);
                            HashMap hashMap = new HashMap();
                            hashMap.put("ExtractType", str);
                            biConsumer.accept(loadData, hashMap);
                        }
                        arrayList.clear();
                        Logger logger3 = log;
                        String key2 = ((KafkaExtractDsl) this.extractConfig).getKey();
                        String str4 = consumerRecord.topic();
                        int partition2 = topicPartition.partition();
                        long offset3 = consumerRecord.offset();
                        consumerRecords.count();
                        logger3.info(key2 + ":结束消费数据=topic=>" + str4 + "partition.partition()" + partition2 + "=>time is" + format + "===offset is " + offset3 + "=>records.countIs=>" + logger3);
                    } catch (Exception e) {
                        log.info("===================消费数据时异常======" + e.getMessage());
                        log.info("==异常数据====" + ((String) consumerRecord.value()));
                        new KafkaRetryServiceImpl().consumerLater(consumerRecord, this.brokerList, JSON.toJSONString(e));
                        arrayList.clear();
                        Logger logger4 = log;
                        String key3 = ((KafkaExtractDsl) this.extractConfig).getKey();
                        String str5 = consumerRecord.topic();
                        int partition3 = topicPartition.partition();
                        long offset4 = consumerRecord.offset();
                        consumerRecords.count();
                        logger4.info(key3 + ":结束消费数据=topic=>" + str5 + "partition.partition()" + partition3 + "=>time is" + format + "===offset is " + offset4 + "=>records.countIs=>" + logger4);
                    }
                } catch (Throwable th) {
                    arrayList.clear();
                    Logger logger5 = log;
                    String key4 = ((KafkaExtractDsl) this.extractConfig).getKey();
                    String str6 = consumerRecord.topic();
                    int partition4 = topicPartition.partition();
                    long offset5 = consumerRecord.offset();
                    consumerRecords.count();
                    logger5.info(key4 + ":结束消费数据=topic=>" + str6 + "partition.partition()" + partition4 + "=>time is" + format + "===offset is " + offset5 + "=>records.countIs=>" + logger5);
                    throw th;
                }
            }
        }
    }

    void doExtractTest(BiConsumer<LoadData, String> biConsumer, String str) {
        KafkaExtractDsl extractConfig = getExtractConfig();
        String name = extractConfig.isNeedBackQuery() ? ExtractType.kafka_Transform.name() : ExtractType.kafka_noTransform.name();
        KafkaConsumer initKafkaConsumer = initKafkaConsumer(extractConfig);
        try {
            try {
                new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date(System.currentTimeMillis()));
                List<String> spitRecords = spitRecords(getTestData());
                System.out.println(JSONArray.toJSONString(spitRecords));
                String str2 = name;
                spitRecords.parallelStream().forEach(str3 -> {
                    List<Map<String, Object>> transferMap = getTransferMap(str3);
                    String str3 = (String) transferMap.get(0).get(OPERATETYPE);
                    if ("DELETE".equalsIgnoreCase(str3)) {
                        System.out.println("删除操作休眠5s");
                        try {
                            Thread.sleep(5000L);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        System.out.println("删除休眠结束");
                    }
                    if ("UPDATE".equalsIgnoreCase(str3)) {
                        System.out.println("更新操作休眠2s");
                        try {
                            Thread.sleep(2000L);
                        } catch (InterruptedException e2) {
                            e2.printStackTrace();
                        }
                        System.out.println("更新休眠结束");
                    }
                    if ("INSERT".equalsIgnoreCase(str3)) {
                        System.out.println("即时执行新增");
                    }
                    log.debug("==开始消费数据=============" + str3);
                    log.debug("=========================Pre Step 1=============================================");
                    if (CollectionUtils.isNotEmpty(transferMap)) {
                        try {
                            biConsumer.accept(new LoadData(transferMap), str2);
                        } catch (Exception e3) {
                            log.info("===================消费数据时异常======" + e3.getMessage());
                            KafkaRetryServiceImpl kafkaRetryServiceImpl = new KafkaRetryServiceImpl();
                            System.out.println(JSONArray.toJSONString(transferMap));
                            kafkaRetryServiceImpl.commitSingleMsg("test-original-topic", "0", str3, this.brokerList, JSON.toJSONString(e3));
                        }
                    }
                });
                log.debug("=========================Pre Step ?=============================================");
                initKafkaConsumer.close();
            } catch (Exception e) {
                e.printStackTrace();
                log.debug("=========================Pre Step ?=============================================");
                initKafkaConsumer.close();
            }
        } catch (Throwable th) {
            log.debug("=========================Pre Step ?=============================================");
            initKafkaConsumer.close();
            throw th;
        }
    }

    private String getTestData() {
        return readJsonFile(KafkaExtract.class.getClassLoader().getResource("test.json").getPath());
    }

    private String readJsonFile(String str) {
        try {
            File file = new File(str);
            FileReader fileReader = new FileReader(file);
            InputStreamReader inputStreamReader = new InputStreamReader(new FileInputStream(file), "utf-8");
            StringBuffer stringBuffer = new StringBuffer();
            while (true) {
                int read = inputStreamReader.read();
                if (read == -1) {
                    fileReader.close();
                    inputStreamReader.close();
                    return stringBuffer.toString();
                }
                stringBuffer.append((char) read);
            }
        } catch (IOException e) {
            e.printStackTrace();
            return null;
        }
    }

    private List<String> spitRecords(String str) {
        ArrayList arrayList = new ArrayList();
        Map jsonToMap = YvanUtil.jsonToMap(str);
        Object obj = jsonToMap.get(DATA);
        if (SINGLE_SIZE == ((Integer) jsonToMap.get(SIZE)).intValue()) {
            arrayList.add(str);
            return arrayList;
        }
        ((List) JSON.parseObject(JSON.toJSONString(obj), List.class)).parallelStream().forEach(obj2 -> {
            ArrayList arrayList2 = new ArrayList();
            arrayList2.add(obj2);
            jsonToMap.put(DATA, arrayList2);
            jsonToMap.put(SIZE, Integer.valueOf(SINGLE_SIZE));
            arrayList.add(JSON.toJSONString(jsonToMap));
        });
        return arrayList;
    }

    private List<Map<String, Object>> getTransferMap(String str) {
        ArrayList arrayList = new ArrayList();
        ((List) JSON.parseObject(JSON.toJSONString(YvanUtil.jsonToMap(str).get(DATA)), List.class)).stream().forEach(obj -> {
            String string;
            HashMap hashMap = new HashMap();
            JSONObject parseObject = JSON.parseObject(JSON.toJSONString(obj));
            Object obj = parseObject.get(COLS);
            if (parseObject.containsKey(PROPERTIES)) {
                string = ((OracleProperties) JSON.parseObject(JSON.toJSONString(parseObject.get(PROPERTIES)), OracleProperties.class)).getGG_OPTYPE();
            } else {
                if (!parseObject.containsKey("type")) {
                    throw new RuntimeException("数据库操作类型异常");
                }
                string = parseObject.getString("type");
            }
            hashMap.put(TABLENAME, parseObject.get("table"));
            hashMap.put(OPERATETYPE, string);
            System.out.print("table is " + parseObject.get("table"));
            System.out.print("<=>operateType is " + string);
            List list = (List) JSON.parseObject(JSON.toJSONString(obj), List.class);
            String str2 = string;
            list.stream().forEach(obj2 -> {
                Object obj2;
                JSONObject parseObject2 = JSONObject.parseObject(JSON.toJSONString(obj2));
                String string2 = parseObject2.getString("name");
                if (SaveType.delete.name().equalsIgnoreCase(str2)) {
                    if (getExtractConfig().isDeleteNeedBackQuerySwitch) {
                        hashMap.put(OPERATETYPE, SaveType.delete2ReplaceInto.name());
                    }
                    obj2 = parseObject2.get("before");
                } else if (SaveType.insert.name().equalsIgnoreCase(str2)) {
                    obj2 = parseObject2.get("after");
                } else {
                    obj2 = parseObject2.get("after");
                    if (obj2 == null) {
                        obj2 = parseObject2.get("before");
                    }
                }
                if ("PK".equalsIgnoreCase(string2)) {
                    System.out.print("<=>Pk is " + JSON.toJSONString(obj2) + "||");
                }
                hashMap.put(string2, obj2);
            });
            arrayList.add(hashMap);
        });
        System.out.println();
        return arrayList;
    }

    private KafkaConsumer initKafkaConsumer(KafkaExtractDsl kafkaExtractDsl) {
        EtlConsumerConfig etlConsumerConfig = new EtlConsumerConfig();
        String str = ZookeeperService.ENV_MAP.get("prefix");
        if (Extract.TEST.equalsIgnoreCase(str)) {
            str = kafkaExtractDsl.getPreTopic() + "-" + str;
        }
        String str2 = ZookeeperService.ENV_MAP.get("kafkaBrokers");
        if (StringUtils.isNotBlank(str2) && str2.contains(IDialect.COMMA)) {
            Collections.addAll(this.brokerList, str2.split(IDialect.COMMA));
            System.out.println("从配置文件获取到kafka brokers" + this.brokerList);
        } else {
            try {
                String brokerByHermesTag = ZookeeperService.getInstance("com.jzt.wotu.etl.worker.ZkService").getBrokerByHermesTag(str);
                if (StringUtils.isBlank(brokerByHermesTag)) {
                    throw new RuntimeException("通过环境变量:" + str + "  获取brokers is null");
                }
                this.brokerList = (List) new JsonWapper(brokerByHermesTag).getInnerMap().get("kafkaBroker");
                System.out.println("从zk获取到kafka brokers" + this.brokerList);
            } catch (Exception e) {
                e.printStackTrace();
                throw new RuntimeException("初始化Kafka 消费者异常：" + e.getMessage());
            }
        }
        if (kafkaExtractDsl.getDebugBrokers() != null) {
            this.brokerList = new ArrayList();
            Collections.addAll(this.brokerList, kafkaExtractDsl.getDebugBrokers().split(IDialect.COMMA));
        }
        etlConsumerConfig.setBrokers(this.brokerList);
        etlConsumerConfig.setAutoOffsetReset(kafkaExtractDsl.getAutoOffsetReset());
        etlConsumerConfig.setClientId(kafkaExtractDsl.getClientId());
        String str3 = kafkaExtractDsl.getPreTopic() + "." + kafkaExtractDsl.getTopic();
        if (!StringUtils.isBlank(kafkaExtractDsl.getIdentifier())) {
            str3 = str3 + "-" + kafkaExtractDsl.getIdentifier();
        }
        if (!StringUtils.isBlank(kafkaExtractDsl.getDebugKey())) {
            str3 = str3 + "-" + kafkaExtractDsl.getDebugKey();
        }
        etlConsumerConfig.setGroupId(str3);
        etlConsumerConfig.setMaxPollRecords(kafkaExtractDsl.getMaxPollRecords());
        String str4 = str + "." + kafkaExtractDsl.getTopic();
        if (kafkaExtractDsl.getDebugTopic() != null) {
            str4 = kafkaExtractDsl.getDebugTopic();
        }
        System.out.print("ETL  consumer brokerList = " + JSONArray.toJSONString(this.brokerList));
        System.out.print(" topic = " + str4);
        System.out.println(" groupId = " + str3);
        etlConsumerConfig.setTopics(Arrays.asList(str4));
        KafkaConsumer buildKafkaConsumer = new EtlKafkaConsumerFactory().buildKafkaConsumer(etlConsumerConfig);
        buildKafkaConsumer.subscribe(etlConsumerConfig.getTopics());
        return buildKafkaConsumer;
    }
}
