package com.alibaba.otter.canal.connector.kafka.producer;

import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONWriter;
import com.alibaba.otter.canal.common.utils.ExecutorTemplate;
import com.alibaba.otter.canal.common.utils.PropertiesUtils;
import com.alibaba.otter.canal.connector.core.producer.AbstractMQProducer;
import com.alibaba.otter.canal.connector.core.producer.MQDestination;
import com.alibaba.otter.canal.connector.core.producer.MQMessageUtils;
import com.alibaba.otter.canal.connector.core.spi.CanalMQProducer;
import com.alibaba.otter.canal.connector.core.spi.SPI;
import com.alibaba.otter.canal.connector.core.util.Callback;
import com.alibaba.otter.canal.connector.core.util.CanalMessageSerializerUtil;
import com.alibaba.otter.canal.connector.kafka.config.KafkaConstants;
import com.alibaba.otter.canal.connector.kafka.config.KafkaProducerConfig;
import com.alibaba.otter.canal.protocol.FlatMessage;
import com.alibaba.otter.canal.protocol.Message;
import java.io.File;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.commons.lang.StringUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.security.JaasUtils;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@SPI(KafkaConstants.ROOT)
/* loaded from: input_file:com/alibaba/otter/canal/connector/kafka/producer/CanalKafkaProducer.class */
public class CanalKafkaProducer extends AbstractMQProducer implements CanalMQProducer {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) CanalKafkaProducer.class);
    private static final String PREFIX_KAFKA_CONFIG = "kafka.";
    private Producer<String, byte[]> producer;

    public void init(Properties properties) {
        KafkaProducerConfig kafkaProducerConfig = new KafkaProducerConfig();
        this.mqProperties = kafkaProducerConfig;
        super.init(properties);
        loadKafkaProperties(properties);
        Properties properties2 = new Properties();
        properties2.putAll(kafkaProducerConfig.getKafkaProperties());
        properties2.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
        properties2.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        if (kafkaProducerConfig.isKerberosEnabled()) {
            File file = new File(kafkaProducerConfig.getKrb5File());
            File file2 = new File(kafkaProducerConfig.getJaasFile());
            if (!file.exists() || !file2.exists()) {
                logger.error("ERROR # The kafka kerberos configuration file does not exist! please check it");
                throw new RuntimeException("ERROR # The kafka kerberos configuration file does not exist! please check it");
            }
            System.setProperty("java.security.krb5.conf", file.getAbsolutePath());
            System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, file2.getAbsolutePath());
            System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");
            properties2.put("security.protocol", "SASL_PLAINTEXT");
            properties2.put(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, KafkaConstants.ROOT);
        }
        properties2.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaMessageSerializer.class);
        this.producer = new KafkaProducer(properties2);
    }

    private void loadKafkaProperties(Properties properties) {
        KafkaProducerConfig kafkaProducerConfig = (KafkaProducerConfig) this.mqProperties;
        Map<String, Object> kafkaProperties = kafkaProducerConfig.getKafkaProperties();
        doMoreCompatibleConvert("canal.mq.servers", "kafka.bootstrap.servers", properties);
        doMoreCompatibleConvert("canal.mq.acks", "kafka.acks", properties);
        doMoreCompatibleConvert("canal.mq.compressionType", "kafka.compression.type", properties);
        doMoreCompatibleConvert("canal.mq.retries", "kafka.retries", properties);
        doMoreCompatibleConvert("canal.mq.batchSize", "kafka.batch.size", properties);
        doMoreCompatibleConvert("canal.mq.lingerMs", "kafka.linger.ms", properties);
        doMoreCompatibleConvert("canal.mq.maxRequestSize", "kafka.max.request.size", properties);
        doMoreCompatibleConvert("canal.mq.bufferMemory", "kafka.buffer.memory", properties);
        doMoreCompatibleConvert("canal.mq.kafka.kerberos.enable", KafkaConstants.CANAL_MQ_KAFKA_KERBEROS_ENABLE, properties);
        doMoreCompatibleConvert("canal.mq.kafka.kerberos.krb5.file", KafkaConstants.CANAL_MQ_KAFKA_KERBEROS_KRB5_FILE, properties);
        doMoreCompatibleConvert("canal.mq.kafka.kerberos.jaas.file", KafkaConstants.CANAL_MQ_KAFKA_KERBEROS_JAAS_FILE, properties);
        for (Map.Entry entry : properties.entrySet()) {
            String str = (String) entry.getKey();
            Object value = entry.getValue();
            if (str.startsWith(PREFIX_KAFKA_CONFIG) && value != null) {
                kafkaProperties.put(str.substring(PREFIX_KAFKA_CONFIG.length()), PropertiesUtils.getProperty(properties, str));
            }
        }
        String property = PropertiesUtils.getProperty(properties, KafkaConstants.CANAL_MQ_KAFKA_KERBEROS_ENABLE);
        if (!StringUtils.isEmpty(property)) {
            kafkaProducerConfig.setKerberosEnabled(Boolean.parseBoolean(property));
        }
        String property2 = PropertiesUtils.getProperty(properties, KafkaConstants.CANAL_MQ_KAFKA_KERBEROS_KRB5_FILE);
        if (!StringUtils.isEmpty(property2)) {
            kafkaProducerConfig.setKrb5File(property2);
        }
        String property3 = PropertiesUtils.getProperty(properties, KafkaConstants.CANAL_MQ_KAFKA_KERBEROS_JAAS_FILE);
        if (StringUtils.isEmpty(property3)) {
            return;
        }
        kafkaProducerConfig.setJaasFile(property3);
    }

    public void stop() {
        try {
            try {
                logger.info("## stop the kafka producer");
                if (this.producer != null) {
                    this.producer.close();
                }
                super.stop();
                logger.info("## kafka producer is down.");
            } catch (Throwable th) {
                logger.warn("##something goes wrong when stopping kafka producer:", th);
                logger.info("## kafka producer is down.");
            }
        } catch (Throwable th2) {
            logger.info("## kafka producer is down.");
            throw th2;
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v44, types: [java.util.List] */
    public void send(MQDestination mQDestination, Message message, Callback callback) {
        ArrayList arrayList;
        ExecutorTemplate executorTemplate = new ExecutorTemplate(this.sendExecutor);
        try {
            try {
                if (StringUtils.isEmpty(mQDestination.getDynamicTopic())) {
                    arrayList = new ArrayList();
                    arrayList.add(send(mQDestination, mQDestination.getTopic(), message, this.mqProperties.isFlatMessage()));
                } else {
                    for (Map.Entry entry : MQMessageUtils.messageTopics(message, mQDestination.getTopic(), mQDestination.getDynamicTopic()).entrySet()) {
                        String replace = ((String) entry.getKey()).replace('.', '_');
                        Message message2 = (Message) entry.getValue();
                        executorTemplate.submit(() -> {
                            try {
                                return send(mQDestination, replace, message2, this.mqProperties.isFlatMessage());
                            } catch (Exception e) {
                                throw new RuntimeException(e);
                            }
                        });
                    }
                    arrayList = executorTemplate.waitForResult();
                }
                this.producer.flush();
                Iterator it = arrayList.iterator();
                while (it.hasNext()) {
                    Iterator it2 = ((List) it.next()).iterator();
                    while (it2.hasNext()) {
                        try {
                            ((Future) it2.next()).get();
                        } catch (InterruptedException | ExecutionException e) {
                            throw new RuntimeException(e);
                        }
                    }
                }
                callback.commit();
                executorTemplate.clear();
            } catch (Throwable th) {
                executorTemplate.clear();
                throw th;
            }
        } catch (Throwable th2) {
            logger.error(th2.getMessage(), th2);
            callback.rollback();
            executorTemplate.clear();
        }
    }

    private List<Future> send(MQDestination mQDestination, String str, Message message, boolean z) {
        ArrayList arrayList = new ArrayList();
        Integer parseDynamicTopicPartition = MQMessageUtils.parseDynamicTopicPartition(str, mQDestination.getDynamicTopicPartitionNum());
        if (parseDynamicTopicPartition == null) {
            parseDynamicTopicPartition = mQDestination.getPartitionsNum();
        }
        if (z) {
            for (FlatMessage flatMessage : MQMessageUtils.messageConverter(MQMessageUtils.buildMessageData(message, this.buildExecutor), message.getId())) {
                if (mQDestination.getPartitionHash() == null || mQDestination.getPartitionHash().isEmpty()) {
                    arrayList.add(new ProducerRecord<>(str, Integer.valueOf(mQDestination.getPartition() != null ? mQDestination.getPartition().intValue() : 0), null, JSON.toJSONBytes(flatMessage, new JSONWriter.Feature[]{JSONWriter.Feature.WriteNulls})));
                } else {
                    FlatMessage[] messagePartition = MQMessageUtils.messagePartition(flatMessage, parseDynamicTopicPartition, mQDestination.getPartitionHash(), this.mqProperties.isDatabaseHash());
                    int length = messagePartition.length;
                    for (int i = 0; i < length; i++) {
                        FlatMessage flatMessage2 = messagePartition[i];
                        if (flatMessage2 != null) {
                            arrayList.add(new ProducerRecord<>(str, Integer.valueOf(i), null, JSON.toJSONBytes(flatMessage2, new JSONWriter.Feature[]{JSONWriter.Feature.WriteNulls})));
                        }
                    }
                }
            }
        } else if (mQDestination.getPartitionHash() == null || mQDestination.getPartitionHash().isEmpty()) {
            arrayList.add(new ProducerRecord<>(str, Integer.valueOf(mQDestination.getPartition() != null ? mQDestination.getPartition().intValue() : 0), null, CanalMessageSerializerUtil.serializer(message, this.mqProperties.isFilterTransactionEntry())));
        } else {
            Message[] messagePartition2 = MQMessageUtils.messagePartition(MQMessageUtils.buildMessageData(message, this.buildExecutor), message.getId(), parseDynamicTopicPartition, mQDestination.getPartitionHash(), this.mqProperties.isDatabaseHash());
            int length2 = messagePartition2.length;
            for (int i2 = 0; i2 < length2; i2++) {
                Message message2 = messagePartition2[i2];
                if (message2 != null) {
                    arrayList.add(new ProducerRecord<>(str, Integer.valueOf(i2), null, CanalMessageSerializerUtil.serializer(message2, this.mqProperties.isFilterTransactionEntry())));
                }
            }
        }
        return produce(arrayList);
    }

    private List<Future> produce(List<ProducerRecord<String, byte[]>> list) {
        ArrayList arrayList = new ArrayList();
        Iterator<ProducerRecord<String, byte[]>> it = list.iterator();
        while (it.hasNext()) {
            arrayList.add(this.producer.send(it.next()));
        }
        return arrayList;
    }
}
