package com.alibaba.otter.canal.connector.kafka.consumer;

import com.alibaba.fastjson2.JSON;
import com.alibaba.otter.canal.common.utils.PropertiesUtils;
import com.alibaba.otter.canal.connector.core.consumer.CommonMessage;
import com.alibaba.otter.canal.connector.core.spi.CanalMsgConsumer;
import com.alibaba.otter.canal.connector.core.spi.SPI;
import com.alibaba.otter.canal.connector.core.util.MessageUtil;
import com.alibaba.otter.canal.connector.kafka.config.KafkaConstants;
import com.alibaba.otter.canal.protocol.Message;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

@SPI(KafkaConstants.ROOT)
/* loaded from: input_file:com/alibaba/otter/canal/connector/kafka/consumer/CanalKafkaConsumer.class */
public class CanalKafkaConsumer implements CanalMsgConsumer {
    private static final String PREFIX_KAFKA_CONFIG = "kafka.";
    private KafkaConsumer<String, ?> kafkaConsumer;
    private String topic;
    private boolean flatMessage = true;
    private Map<Integer, Long> currentOffsets = new ConcurrentHashMap();
    private Properties kafkaProperties = new Properties();

    public void init(Properties properties, String str, String str2) {
        this.topic = str;
        Boolean bool = (Boolean) properties.get("canal.mq.flatMessage");
        if (bool != null) {
            this.flatMessage = bool.booleanValue();
        }
        for (Map.Entry entry : properties.entrySet()) {
            String str3 = (String) entry.getKey();
            Object value = entry.getValue();
            if (str3.startsWith(PREFIX_KAFKA_CONFIG) && value != null) {
                this.kafkaProperties.put(str3.substring(PREFIX_KAFKA_CONFIG.length()), PropertiesUtils.getProperty(properties, str3));
            }
        }
        this.kafkaProperties.put("group.id", str2);
        this.kafkaProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        this.kafkaProperties.put("client.id", UUID.randomUUID().toString().substring(0, 6));
    }

    public void connect() {
        if (this.flatMessage) {
            this.kafkaProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            this.kafkaConsumer = new KafkaConsumer<>(this.kafkaProperties);
        } else {
            this.kafkaProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaMessageDeserializer.class);
            this.kafkaConsumer = new KafkaConsumer<>(this.kafkaProperties);
        }
        this.kafkaConsumer.subscribe(Collections.singletonList(this.topic));
    }

    public List<CommonMessage> getMessage(Long l, TimeUnit timeUnit) {
        if (this.flatMessage) {
            ConsumerRecords<String, ?> poll = this.kafkaConsumer.poll(timeUnit.toMillis(l.longValue()));
            if (poll.isEmpty()) {
                return null;
            }
            ArrayList arrayList = new ArrayList();
            this.currentOffsets.clear();
            Iterator<ConsumerRecord<String, ?>> it = poll.iterator();
            while (it.hasNext()) {
                ConsumerRecord<String, ?> next = it.next();
                if (this.currentOffsets.get(Integer.valueOf(next.partition())) == null) {
                    this.currentOffsets.put(Integer.valueOf(next.partition()), Long.valueOf(next.offset()));
                }
                arrayList.add((CommonMessage) JSON.parseObject((String) next.value(), CommonMessage.class));
            }
            return arrayList;
        }
        ConsumerRecords<String, ?> poll2 = this.kafkaConsumer.poll(timeUnit.toMillis(l.longValue()));
        if (poll2.isEmpty()) {
            return null;
        }
        this.currentOffsets.clear();
        ArrayList arrayList2 = new ArrayList();
        Iterator<ConsumerRecord<String, ?>> it2 = poll2.iterator();
        while (it2.hasNext()) {
            ConsumerRecord<String, ?> next2 = it2.next();
            if (this.currentOffsets.get(Integer.valueOf(next2.partition())) == null) {
                this.currentOffsets.put(Integer.valueOf(next2.partition()), Long.valueOf(next2.offset()));
            }
            arrayList2.addAll(MessageUtil.convert((Message) next2.value()));
        }
        return arrayList2;
    }

    public void rollback() {
        if (this.kafkaConsumer != null) {
            for (Map.Entry<Integer, Long> entry : this.currentOffsets.entrySet()) {
                this.kafkaConsumer.seek(new TopicPartition(this.topic, entry.getKey().intValue()), this.currentOffsets.get(entry.getKey()).longValue());
                this.kafkaConsumer.commitSync();
            }
        }
    }

    public void ack() {
        if (this.kafkaConsumer != null) {
            this.kafkaConsumer.commitSync();
        }
    }

    public void disconnect() {
        if (this.kafkaConsumer != null) {
            this.kafkaConsumer.unsubscribe();
        }
        if (this.kafkaConsumer != null) {
            this.kafkaConsumer.close();
            this.kafkaConsumer = null;
        }
    }
}
