package com.alibaba.otter.canal.connector.pulsarmq.consumer;

import com.alibaba.fastjson2.JSON;
import com.alibaba.otter.canal.common.utils.MQUtil;
import com.alibaba.otter.canal.connector.core.consumer.CommonMessage;
import com.alibaba.otter.canal.connector.core.spi.CanalMsgConsumer;
import com.alibaba.otter.canal.connector.core.spi.SPI;
import com.alibaba.otter.canal.connector.core.util.CanalMessageSerializerUtil;
import com.alibaba.otter.canal.connector.core.util.MessageUtil;
import com.alibaba.otter.canal.connector.pulsarmq.config.PulsarMQConstants;
import com.alibaba.otter.canal.protocol.exception.CanalClientException;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.StringUtils;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.BatchReceivePolicy;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Messages;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.util.RetryMessageUtil;

@SPI(PulsarMQConstants.ROOT)
/* loaded from: input_file:com/alibaba/otter/canal/connector/pulsarmq/consumer/CanalPulsarMQConsumer.class */
public class CanalPulsarMQConsumer implements CanalMsgConsumer {
    private PulsarClient pulsarClient;
    private Consumer<byte[]> pulsarMQConsumer;
    private String topic;
    private volatile Messages<byte[]> lastGetBatchMessage;
    private String serviceUrl;
    private String roleToken;
    private String subscriptName;
    private boolean flatMessage = false;
    private int batchSize = 30;
    private int getBatchTimeoutSeconds = 30;
    private long batchProcessTimeout = 60000;
    private int redeliveryDelaySeconds = 60;
    private int ackTimeoutSeconds = 30;
    private boolean isRetry = true;
    private boolean isRetryDLQUpperCase = false;
    private int maxRedeliveryCount = 128;

    public void init(Properties properties, String str, String str2) {
        this.topic = str;
        String property = properties.getProperty("canal.mq.flatMessage");
        if (StringUtils.isNotEmpty(property)) {
            this.flatMessage = Boolean.parseBoolean(property);
        }
        this.serviceUrl = properties.getProperty(PulsarMQConstants.PULSARMQ_SERVER_URL);
        this.roleToken = properties.getProperty(PulsarMQConstants.PULSARMQ_ROLE_TOKEN);
        this.subscriptName = properties.getProperty(PulsarMQConstants.PULSARMQ_SUBSCRIPT_NAME);
        if (StringUtils.isEmpty(this.subscriptName)) {
            this.subscriptName = str2;
        }
        if (StringUtils.isEmpty(this.subscriptName)) {
            throw new RuntimeException("Pulsar Consumer subscriptName required");
        }
        String property2 = properties.getProperty("canal.mq.canalBatchSize");
        if (StringUtils.isNotEmpty(property2)) {
            this.batchSize = Integer.parseInt(property2);
        }
        String property3 = properties.getProperty(PulsarMQConstants.PULSARMQ_GET_BATCH_TIMEOUT_SECONDS);
        if (StringUtils.isNotEmpty(property3)) {
            this.getBatchTimeoutSeconds = Integer.parseInt(property3);
        }
        if (StringUtils.isNotEmpty(properties.getProperty(PulsarMQConstants.PULSARMQ_BATCH_PROCESS_TIMEOUT))) {
            this.batchProcessTimeout = Integer.parseInt(r0);
        }
        String property4 = properties.getProperty(PulsarMQConstants.PULSARMQ_REDELIVERY_DELAY_SECONDS);
        if (StringUtils.isNotEmpty(property4)) {
            this.redeliveryDelaySeconds = Integer.parseInt(property4);
        }
        String property5 = properties.getProperty(PulsarMQConstants.PULSARMQ_ACK_TIMEOUT_SECONDS);
        if (StringUtils.isNotEmpty(property5)) {
            this.ackTimeoutSeconds = Integer.parseInt(property5);
        }
        String property6 = properties.getProperty(PulsarMQConstants.PULSARMQ_IS_RETRY);
        if (StringUtils.isNotEmpty(property6)) {
            this.isRetry = Boolean.parseBoolean(property6);
        }
        String property7 = properties.getProperty(PulsarMQConstants.PULSARMQ_IS_RETRY_DLQ_UPPERCASE);
        if (StringUtils.isNotEmpty(property7)) {
            this.isRetryDLQUpperCase = Boolean.parseBoolean(property7);
        }
        String property8 = properties.getProperty(PulsarMQConstants.PULSARMQ_MAX_REDELIVERY_COUNT);
        if (StringUtils.isNotEmpty(property8)) {
            this.maxRedeliveryCount = Integer.parseInt(property8);
        }
    }

    public void connect() {
        if (isConsumerActive()) {
            return;
        }
        try {
            ClientBuilder serviceUrl = PulsarClient.builder().serviceUrl(this.serviceUrl);
            if (StringUtils.isNotEmpty(this.roleToken)) {
                serviceUrl.authentication(AuthenticationFactory.token(this.roleToken));
            }
            this.pulsarClient = serviceUrl.build();
            ConsumerBuilder<byte[]> newConsumer = this.pulsarClient.newConsumer();
            if (MQUtil.isPatternTopic(this.topic)) {
                newConsumer.topicsPattern(this.topic);
            } else {
                newConsumer.topic(this.topic);
            }
            newConsumer.subscriptionType(SubscriptionType.Failover);
            newConsumer.negativeAckRedeliveryDelay(this.redeliveryDelaySeconds, TimeUnit.SECONDS).subscriptionName(this.subscriptName);
            if (this.isRetry) {
                DeadLetterPolicy.DeadLetterPolicyBuilder maxRedeliverCount = DeadLetterPolicy.builder().maxRedeliverCount(this.maxRedeliveryCount);
                if (!MQUtil.isPatternTag(this.topic)) {
                    maxRedeliverCount.retryLetterTopic(this.topic + (this.isRetryDLQUpperCase ? RetryMessageUtil.RETRY_GROUP_TOPIC_SUFFIX : "-retry"));
                    maxRedeliverCount.deadLetterTopic(this.topic + (this.isRetryDLQUpperCase ? RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX : "-dlq"));
                }
                newConsumer.enableRetry(true).deadLetterPolicy(maxRedeliverCount.build());
            }
            newConsumer.ackTimeout(this.ackTimeoutSeconds, TimeUnit.SECONDS);
            newConsumer.batchReceivePolicy(new BatchReceivePolicy.Builder().maxNumMessages(this.batchSize).timeout(this.getBatchTimeoutSeconds, TimeUnit.SECONDS).build());
            try {
                this.pulsarMQConsumer = newConsumer.subscribe();
            } catch (PulsarClientException e) {
                throw new CanalClientException("Subscript pulsar consumer error", e);
            }
        } catch (PulsarClientException e2) {
            throw new RuntimeException(e2);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    public List<CommonMessage> getMessage(Long l, TimeUnit timeUnit) {
        ArrayList newArrayList = Lists.newArrayList();
        try {
            Messages<byte[]> batchReceive = this.pulsarMQConsumer.batchReceive();
            if (null == batchReceive || batchReceive.size() == 0) {
                return newArrayList;
            }
            this.lastGetBatchMessage = batchReceive;
            Iterator<byte[]> it = batchReceive.iterator();
            while (it.hasNext()) {
                byte[] data = ((Message) it.next()).getData();
                if (this.flatMessage) {
                    newArrayList.add((CommonMessage) JSON.parseObject(data, CommonMessage.class));
                } else {
                    newArrayList.addAll(MessageUtil.convert(CanalMessageSerializerUtil.deserializer(data)));
                }
            }
            return newArrayList;
        } catch (PulsarClientException e) {
            throw new CanalClientException("Receive pulsar batch message error", e);
        }
    }

    public void rollback() {
        try {
            if (isConsumerActive() && hasLastMessages()) {
                this.pulsarMQConsumer.negativeAcknowledge((Messages<?>) this.lastGetBatchMessage);
            }
        } finally {
            this.lastGetBatchMessage = null;
        }
    }

    public void ack() {
        try {
            if (isConsumerActive() && hasLastMessages()) {
                this.pulsarMQConsumer.acknowledge((Messages<?>) this.lastGetBatchMessage);
            }
        } catch (PulsarClientException e) {
            if (isConsumerActive() && hasLastMessages()) {
                this.pulsarMQConsumer.negativeAcknowledge((Messages<?>) this.lastGetBatchMessage);
            }
        } finally {
            this.lastGetBatchMessage = null;
        }
    }

    public void disconnect() {
        if (null == this.pulsarMQConsumer || !this.pulsarMQConsumer.isConnected()) {
            return;
        }
        try {
            this.pulsarClient.close();
        } catch (PulsarClientException e) {
            throw new CanalClientException("Disconnect pulsar consumer error", e);
        }
    }

    private boolean isConsumerActive() {
        return null != this.pulsarMQConsumer && this.pulsarMQConsumer.isConnected();
    }

    private boolean hasLastMessages() {
        return null != this.lastGetBatchMessage && this.lastGetBatchMessage.size() > 0;
    }
}
