package com.odianyun.mq.kafka;

import com.odianyun.mq.common.consumer.ConsumerType;
import com.odianyun.mq.common.consumer.MessageFilter;
import com.odianyun.mq.common.inner.message.MqMessage;
import com.odianyun.mq.common.message.Destination;
import com.odianyun.mq.consumer.BackoutMessageException;
import com.odianyun.mq.consumer.Consumer;
import com.odianyun.mq.consumer.ConsumerConfig;
import com.odianyun.mq.consumer.MessageListener;
import com.odianyun.mq.consumer.NeedResendException;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.ObjectUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/kafka-client-2.0.17.1.RELEASE.jar:com/odianyun/mq/kafka/KafkaConsumerImpl.class */
public class KafkaConsumerImpl implements Consumer {
    public static final int POOL_TIMEOUT = 300;
    public static final int REOPEN_CONSUMER_INTERVAL = 9000;
    private volatile Destination dest;
    private volatile ConsumerConfig config;
    private volatile String consumerId;
    private volatile MessageListener messagelistener;
    private volatile KafkaConsumer<String, String> kafkaConsumer;
    Thread thread;
    private Logger logger = LoggerFactory.getLogger(getClass());
    private final AtomicBoolean closed = new AtomicBoolean(false);

    public KafkaConsumerImpl(Destination destination, ConsumerConfig consumerConfig, String str) {
        this.dest = destination;
        this.config = consumerConfig;
        this.consumerId = str;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public KafkaConsumer getKafkaConsumer() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", KafkaUtils.getAddress());
        properties.put(org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG, this.consumerId);
        properties.put(org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        properties.put(org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        return new KafkaConsumer(properties);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean matchMessageType(MqMessage mqMessage) {
        MessageFilter messageFilter = this.config.getMessageFilter();
        MessageFilter.FilterType type = messageFilter.getType();
        switch (type) {
            case AllMatch:
                return true;
            case InSet:
                Set<String> param = messageFilter.getParam();
                return CollectionUtils.isEmpty(param) || param.contains(mqMessage.getType());
            default:
                throw new RuntimeException("不支持的消息过滤类型:" + type);
        }
    }

    @Override // com.odianyun.mq.consumer.Consumer
    public void start() {
        this.closed.set(false);
        this.thread = new Thread() { // from class: com.odianyun.mq.kafka.KafkaConsumerImpl.1
            private KafkaConsumer<String, String> openConsumer() {
                KafkaConsumer<String, String> kafkaConsumer = KafkaConsumerImpl.this.getKafkaConsumer();
                kafkaConsumer.subscribe(Arrays.asList(KafkaConsumerImpl.this.dest.getAssembleName()));
                return kafkaConsumer;
            }

            private void closeConsumer(KafkaConsumer<String, String> kafkaConsumer) {
                if (kafkaConsumer != null) {
                    try {
                        kafkaConsumer.close(Duration.ofSeconds(5L));
                    } catch (Exception e) {
                    }
                }
            }

            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                boolean equals = ObjectUtils.equals(KafkaConsumerImpl.this.config.getConsumerType(), ConsumerType.CLIENT_ACKNOWLEDGE);
                while (!KafkaConsumerImpl.this.closed.get()) {
                    try {
                        try {
                            KafkaConsumerImpl.this.kafkaConsumer = openConsumer();
                            long currentTimeMillis = System.currentTimeMillis();
                            while (true) {
                                if (!KafkaConsumerImpl.this.closed.get()) {
                                    try {
                                        ConsumerRecords<String, String> poll = KafkaConsumerImpl.this.kafkaConsumer.poll(Duration.ofMillis(300L));
                                        KafkaConsumerImpl.this.logger.debug("receive record count:" + poll.count());
                                        handleRecords(equals, poll);
                                    } catch (WakeupException e) {
                                    } catch (Exception e2) {
                                        KafkaConsumerImpl.this.logger.error("onMessage exception", (Throwable) e2);
                                    }
                                    if (equals && System.currentTimeMillis() - currentTimeMillis >= 9000) {
                                        closeConsumer(KafkaConsumerImpl.this.kafkaConsumer);
                                        KafkaConsumerImpl.this.logger.debug("reopen consumer");
                                        break;
                                    }
                                }
                            }
                        } catch (WakeupException e3) {
                            if (!KafkaConsumerImpl.this.closed.get()) {
                                KafkaConsumerImpl.this.logger.info("WakeupException:" + e3.getMessage(), (Throwable) e3);
                            }
                            try {
                                KafkaConsumerImpl.this.kafkaConsumer.close(Duration.ofSeconds(5L));
                                return;
                            } catch (Exception e4) {
                                return;
                            }
                        }
                    } finally {
                        try {
                            KafkaConsumerImpl.this.kafkaConsumer.close(Duration.ofSeconds(5L));
                        } catch (Exception e5) {
                        }
                    }
                }
            }

            private void handleRecords(boolean z, ConsumerRecords<String, String> consumerRecords) {
                boolean z2 = false;
                for (TopicPartition topicPartition : consumerRecords.partitions()) {
                    Iterator<ConsumerRecord<String, String>> it = consumerRecords.records(topicPartition).iterator();
                    while (true) {
                        if (it.hasNext()) {
                            ConsumerRecord<String, String> next = it.next();
                            KafkaConsumerImpl.this.logger.debug("receive record:" + consumerRecords);
                            try {
                                MqMessage byString = MqMessage.getByString(next.value());
                                if (KafkaConsumerImpl.this.matchMessageType(byString)) {
                                    if (KafkaConsumerImpl.this.messagelistener != null) {
                                        try {
                                            KafkaConsumerImpl.this.messagelistener.onMessage(byString);
                                        } catch (BackoutMessageException e) {
                                            z2 = true;
                                        } catch (NeedResendException e2) {
                                            z2 = true;
                                        }
                                    }
                                    if (!z) {
                                        continue;
                                    } else if (z2) {
                                        KafkaConsumerImpl.this.logger.debug("need resend");
                                        break;
                                    } else {
                                        KafkaConsumerImpl.this.kafkaConsumer.commitSync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(next.offset() + 1)));
                                        KafkaConsumerImpl.this.logger.debug("commitSync for clientAck");
                                    }
                                } else {
                                    continue;
                                }
                            } catch (Exception e3) {
                                KafkaConsumerImpl.this.logger.error("getMqMessage error", (Throwable) e3);
                            }
                        }
                    }
                }
                if (z || consumerRecords.count() <= 0) {
                    return;
                }
                KafkaConsumerImpl.this.kafkaConsumer.commitAsync();
                KafkaConsumerImpl.this.logger.debug("commitSync for autoAck");
            }
        };
        this.thread.setName("omq-kafka-consumer-" + this.dest.getAssembleName());
        this.thread.start();
    }

    @Override // com.odianyun.mq.consumer.Consumer
    public void setListener(MessageListener messageListener) {
        this.messagelistener = messageListener;
    }

    @Override // com.odianyun.mq.consumer.Consumer
    public void close() {
        this.closed.set(true);
        if (this.kafkaConsumer != null) {
            this.kafkaConsumer.wakeup();
        }
    }

    @Override // com.odianyun.mq.consumer.Consumer
    public String getRemoteAddress() {
        return null;
    }

    @Override // com.odianyun.mq.consumer.Consumer
    public InetSocketAddress getConsumerAddress() {
        return null;
    }

    @Override // com.odianyun.mq.consumer.Consumer
    public void restart() {
        close();
        try {
            this.thread.join();
            start();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    @Override // com.odianyun.mq.consumer.Consumer
    public Destination getDest() {
        return this.dest;
    }
}
