package com.alibaba.otter.canal.connector.pulsarmq.producer;

import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONWriter;
import com.alibaba.otter.canal.common.utils.ExecutorTemplate;
import com.alibaba.otter.canal.common.utils.NamedThreadFactory;
import com.alibaba.otter.canal.common.utils.PropertiesUtils;
import com.alibaba.otter.canal.connector.core.producer.AbstractMQProducer;
import com.alibaba.otter.canal.connector.core.producer.MQDestination;
import com.alibaba.otter.canal.connector.core.producer.MQMessageUtils;
import com.alibaba.otter.canal.connector.core.spi.CanalMQProducer;
import com.alibaba.otter.canal.connector.core.spi.SPI;
import com.alibaba.otter.canal.connector.core.util.Callback;
import com.alibaba.otter.canal.connector.core.util.CanalMessageSerializerUtil;
import com.alibaba.otter.canal.connector.pulsarmq.config.PulsarMQConstants;
import com.alibaba.otter.canal.connector.pulsarmq.config.PulsarMQProducerConfig;
import com.alibaba.otter.canal.protocol.FlatMessage;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.StringUtils;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRouter;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.TopicMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@SPI(PulsarMQConstants.ROOT)
/* loaded from: input_file:com/alibaba/otter/canal/connector/pulsarmq/producer/CanalPulsarMQProducer.class */
public class CanalPulsarMQProducer extends AbstractMQProducer implements CanalMQProducer {
    public static final String MSG_PROPERTY_PARTITION_NAME = "partitionNum";
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) CanalPulsarMQProducer.class);
    private static final Map<String, Producer<byte[]>> PRODUCERS = new HashMap();
    protected ThreadPoolExecutor sendPartitionExecutor;
    protected PulsarClient client;
    protected PulsarAdmin pulsarAdmin;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/alibaba/otter/canal/connector/pulsarmq/producer/CanalPulsarMQProducer$MessageRouterImpl.class */
    public static class MessageRouterImpl implements MessageRouter {
        private String topicLocal;

        public MessageRouterImpl(String str) {
            this.topicLocal = str;
        }

        @Override // org.apache.pulsar.client.api.MessageRouter
        public int choosePartition(Message<?> message, TopicMetadata topicMetadata) {
            String property = message.getProperty(CanalPulsarMQProducer.MSG_PROPERTY_PARTITION_NAME);
            int i = 0;
            if (!StringUtils.isEmpty(property)) {
                try {
                    i = Integer.parseInt(property);
                } catch (NumberFormatException e) {
                    CanalPulsarMQProducer.logger.warn("Parse msg {} property failed for value: {}", CanalPulsarMQProducer.MSG_PROPERTY_PARTITION_NAME, property);
                }
            }
            Integer valueOf = Integer.valueOf(topicMetadata.numPartitions());
            if (null != valueOf && i >= valueOf.intValue()) {
                i %= valueOf.intValue();
            }
            return i;
        }
    }

    public void init(Properties properties) {
        PulsarMQProducerConfig pulsarMQProducerConfig = new PulsarMQProducerConfig();
        this.mqProperties = pulsarMQProducerConfig;
        super.init(properties);
        loadPulsarMQProperties(properties);
        try {
            ClientBuilder serviceUrl = PulsarClient.builder().serviceUrl(pulsarMQProducerConfig.getServerUrl());
            if (StringUtils.isNotEmpty(pulsarMQProducerConfig.getRoleToken())) {
                serviceUrl.authentication(AuthenticationFactory.token(pulsarMQProducerConfig.getRoleToken()));
            }
            this.client = serviceUrl.build();
            if (StringUtils.isNotEmpty(pulsarMQProducerConfig.getAdminServerUrl())) {
                try {
                    this.pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl(pulsarMQProducerConfig.getAdminServerUrl()).build();
                } catch (PulsarClientException e) {
                    throw new RuntimeException(e);
                }
            }
            int intValue = this.mqProperties.getParallelSendThreadSize().intValue();
            this.sendPartitionExecutor = new ThreadPoolExecutor(intValue, intValue, 0L, TimeUnit.SECONDS, new ArrayBlockingQueue(intValue * 2), new NamedThreadFactory("MQ-Parallel-Sender-Partition"), new ThreadPoolExecutor.CallerRunsPolicy());
        } catch (PulsarClientException e2) {
            throw new RuntimeException(e2);
        }
    }

    private void loadPulsarMQProperties(Properties properties) {
        PulsarMQProducerConfig pulsarMQProducerConfig = (PulsarMQProducerConfig) this.mqProperties;
        String property = PropertiesUtils.getProperty(properties, PulsarMQConstants.PULSARMQ_SERVER_URL);
        if (!StringUtils.isEmpty(property)) {
            pulsarMQProducerConfig.setServerUrl(property);
        }
        String property2 = PropertiesUtils.getProperty(properties, PulsarMQConstants.PULSARMQ_ROLE_TOKEN);
        if (!StringUtils.isEmpty(property2)) {
            pulsarMQProducerConfig.setRoleToken(property2);
        }
        String property3 = PropertiesUtils.getProperty(properties, PulsarMQConstants.PULSARMQ_TOPIC_TENANT_PREFIX);
        if (!StringUtils.isEmpty(property3)) {
            pulsarMQProducerConfig.setTopicTenantPrefix(property3);
        }
        String property4 = PropertiesUtils.getProperty(properties, PulsarMQConstants.PULSARMQ_ADMIN_SERVER_URL);
        if (!StringUtils.isEmpty(property4)) {
            pulsarMQProducerConfig.setAdminServerUrl(property4);
        }
        if (logger.isDebugEnabled()) {
            logger.debug("Load pulsar properties ==> {}", JSON.toJSON(this.mqProperties));
        }
    }

    public void send(MQDestination mQDestination, com.alibaba.otter.canal.protocol.Message message, Callback callback) {
        ExecutorTemplate executorTemplate = new ExecutorTemplate(this.sendExecutor);
        try {
            try {
                if (StringUtils.isEmpty(mQDestination.getDynamicTopic())) {
                    send(mQDestination, mQDestination.getTopic(), message);
                } else {
                    for (Map.Entry entry : MQMessageUtils.messageTopics(message, mQDestination.getTopic(), mQDestination.getDynamicTopic()).entrySet()) {
                        String replace = ((String) entry.getKey()).replace('.', '_');
                        com.alibaba.otter.canal.protocol.Message message2 = (com.alibaba.otter.canal.protocol.Message) entry.getValue();
                        executorTemplate.submit(() -> {
                            try {
                                send(mQDestination, replace, message2);
                            } catch (Exception e) {
                                throw new RuntimeException(e);
                            }
                        });
                    }
                    executorTemplate.waitForResult();
                }
                callback.commit();
                executorTemplate.clear();
            } catch (Throwable th) {
                logger.error(th.getMessage(), th);
                callback.rollback();
                executorTemplate.clear();
            }
        } catch (Throwable th2) {
            executorTemplate.clear();
            throw th2;
        }
    }

    public void send(MQDestination mQDestination, String str, com.alibaba.otter.canal.protocol.Message message) {
        Integer parseDynamicTopicPartition = MQMessageUtils.parseDynamicTopicPartition(str, mQDestination.getDynamicTopicPartitionNum());
        if (parseDynamicTopicPartition == null) {
            parseDynamicTopicPartition = mQDestination.getPartitionsNum();
        }
        if (this.pulsarAdmin != null && parseDynamicTopicPartition != null && parseDynamicTopicPartition.intValue() > 0 && PRODUCERS.get(str) == null) {
            createMultipleTopic(str, parseDynamicTopicPartition);
        }
        ExecutorTemplate executorTemplate = new ExecutorTemplate(this.sendPartitionExecutor);
        MQMessageUtils.EntryRowData[] buildMessageData = MQMessageUtils.buildMessageData(message, this.buildExecutor);
        if (!this.mqProperties.isFlatMessage()) {
            if (mQDestination.getPartitionHash() == null || mQDestination.getPartitionHash().isEmpty()) {
                sendMessage(str, mQDestination.getPartition() != null ? mQDestination.getPartition().intValue() : 0, message);
                return;
            }
            for (MQMessageUtils.EntryRowData entryRowData : buildMessageData) {
                if (null != entryRowData.entry) {
                    com.alibaba.otter.canal.protocol.Message[] messagePartition = MQMessageUtils.messagePartition(buildMessageData, message.getId(), parseDynamicTopicPartition, mQDestination.getPartitionHash(), this.mqProperties.isDatabaseHash());
                    int length = messagePartition.length;
                    for (int i = 0; i < length; i++) {
                        int i2 = i;
                        com.alibaba.otter.canal.protocol.Message message2 = messagePartition[i];
                        executorTemplate.submit(() -> {
                            sendMessage(str, i2, message2);
                        });
                    }
                }
            }
            return;
        }
        List<FlatMessage> messageConverter = MQMessageUtils.messageConverter(buildMessageData, message.getId());
        if (mQDestination.getPartitionHash() == null || mQDestination.getPartitionHash().isEmpty()) {
            sendMessage(str, mQDestination.getPartition() != null ? mQDestination.getPartition().intValue() : 0, messageConverter);
            return;
        }
        ArrayList arrayList = new ArrayList();
        int intValue = parseDynamicTopicPartition.intValue();
        for (int i3 = 0; i3 < intValue; i3++) {
            arrayList.add(new ArrayList());
        }
        Iterator<FlatMessage> it = messageConverter.iterator();
        while (it.hasNext()) {
            FlatMessage[] messagePartition2 = MQMessageUtils.messagePartition(it.next(), parseDynamicTopicPartition, mQDestination.getPartitionHash(), this.mqProperties.isDatabaseHash());
            int length2 = messagePartition2.length;
            for (int i4 = 0; i4 < length2; i4++) {
                if (messagePartition2[i4] != null) {
                    ((List) arrayList.get(i4)).add(messagePartition2[i4]);
                }
            }
        }
        for (int i5 = 0; i5 < intValue; i5++) {
            List list = (List) arrayList.get(i5);
            if (list != null && list.size() > 0) {
                int i6 = i5;
                executorTemplate.submit(() -> {
                    sendMessage(str, i6, (List<FlatMessage>) list);
                });
            }
        }
        executorTemplate.waitForResult();
    }

    private void sendMessage(String str, int i, com.alibaba.otter.canal.protocol.Message message) {
        Producer<byte[]> producer = getProducer(str);
        try {
            MessageId send = producer.newMessage().property(MSG_PROPERTY_PARTITION_NAME, String.valueOf(i)).value(CanalMessageSerializerUtil.serializer(message, this.mqProperties.isFilterTransactionEntry())).send();
            if (logger.isDebugEnabled()) {
                logger.debug("Send Message to topic:{} Result: {}", str, send);
            }
        } catch (Throwable th) {
            throw new RuntimeException(th);
        }
    }

    private void sendMessage(String str, int i, List<FlatMessage> list) {
        Producer<byte[]> producer = getProducer(str);
        Iterator<FlatMessage> it = list.iterator();
        while (it.hasNext()) {
            try {
                MessageId send = producer.newMessage().property(MSG_PROPERTY_PARTITION_NAME, String.valueOf(i)).value(JSON.toJSONBytes(it.next(), new JSONWriter.Feature[]{JSONWriter.Feature.WriteNulls})).send();
                if (logger.isDebugEnabled()) {
                    logger.debug("Send Messages to topic:{} Result: {}", str, send);
                }
            } catch (Throwable th) {
                throw new RuntimeException(th);
            }
        }
    }

    private void createMultipleTopic(String str, Integer num) {
        PulsarMQProducerConfig pulsarMQProducerConfig = (PulsarMQProducerConfig) this.mqProperties;
        String topicTenantPrefix = pulsarMQProducerConfig.getTopicTenantPrefix();
        String str2 = str;
        if (!StringUtils.isEmpty(topicTenantPrefix)) {
            if (!topicTenantPrefix.endsWith("/")) {
                str2 = "/" + str2;
            }
            str2 = pulsarMQProducerConfig.getTopicTenantPrefix() + str2;
        }
        try {
            this.pulsarAdmin.topics().createPartitionedTopic(str2, num.intValue());
        } catch (PulsarAdminException e) {
        }
    }

    private Producer<byte[]> getProducer(String str) {
        Producer<byte[]> producer = PRODUCERS.get(str);
        if (null == producer || !producer.isConnected()) {
            try {
                synchronized (PRODUCERS) {
                    Producer<byte[]> producer2 = PRODUCERS.get(str);
                    if (null != producer2 && producer2.isConnected()) {
                        return producer2;
                    }
                    PulsarMQProducerConfig pulsarMQProducerConfig = (PulsarMQProducerConfig) this.mqProperties;
                    String topicTenantPrefix = pulsarMQProducerConfig.getTopicTenantPrefix();
                    String str2 = str;
                    if (!StringUtils.isEmpty(topicTenantPrefix)) {
                        if (!topicTenantPrefix.endsWith("/")) {
                            str2 = "/" + str2;
                        }
                        str2 = pulsarMQProducerConfig.getTopicTenantPrefix() + str2;
                    }
                    producer = this.client.newProducer().topic(str2).messageRouter(new MessageRouterImpl(str)).create();
                    PRODUCERS.put(str, producer);
                }
            } catch (PulsarClientException e) {
                logger.error("create producer failed for topic: " + str, (Throwable) e);
                throw new RuntimeException(e);
            }
        }
        return producer;
    }

    public void stop() {
        logger.info("## Stop PulsarMQ producer##");
        for (Producer<byte[]> producer : PRODUCERS.values()) {
            if (null != producer) {
                try {
                    if (producer.isConnected()) {
                        producer.close();
                    }
                } catch (PulsarClientException e) {
                    logger.warn("close producer name: {}, topic: {}, error: {}", producer.getProducerName(), producer.getTopic(), e.getMessage());
                }
            }
        }
        super.stop();
    }
}
