package com.jzt.jk.rocketmq.mq;

import com.google.common.collect.Maps;
import com.jzt.jk.rocketmq.annotation.RocketMqBroadcastListener;
import com.jzt.jk.rocketmq.annotation.RocketMqListener;
import com.jzt.jk.rocketmq.annotation.RocketMqOrderListener;
import com.jzt.jk.rocketmq.config.RocketMqProperties;
import com.jzt.jk.rocketmq.util.GeneratorId;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;

/* loaded from: input_file:com/jzt/jk/rocketmq/mq/RocketMqConsumer.class */
public class RocketMqConsumer {
    private static final Logger log = LoggerFactory.getLogger(RocketMqConsumer.class);
    public ApplicationContext context;
    private volatile boolean init = false;
    private RocketMqProperties configuration;
    private Map<String, DefaultMQPushConsumer> consumerMap;

    public RocketMqConsumer(RocketMqProperties rocketMqProperties, ApplicationContext applicationContext) {
        this.context = applicationContext;
        this.configuration = rocketMqProperties;
    }

    public synchronized void start() throws Exception {
        if (this.init) {
            log.warn("请不要重复初始化RocketMQ消费者");
            return;
        }
        this.consumerMap = Maps.newConcurrentMap();
        initializeConsumer(this.consumerMap);
        this.init = true;
    }

    private void initializeConsumer(Map<String, DefaultMQPushConsumer> map) throws Exception {
        HashMap newHashMap = Maps.newHashMap();
        HashMap newHashMap2 = Maps.newHashMap();
        initializeConcurrentlyConsumer(map, newHashMap, newHashMap2);
        initializeOrderConsumer(map, newHashMap, newHashMap2);
        initializeBroadcastConsumer(map, newHashMap, newHashMap2);
        map.forEach((str, defaultMQPushConsumer) -> {
            try {
                defaultMQPushConsumer.setInstanceName(System.currentTimeMillis() + GeneratorId.nextFormatId());
                defaultMQPushConsumer.start();
                log.info("自建RocketMQ成功加载 Topic-tag:{}", str);
            } catch (MQClientException e) {
                log.error("自建RocketMQ加载失败 Topic-tag:{}", str, e);
                throw new RuntimeException(e.getMessage(), e);
            }
        });
        log.info("-------------- 成功初始化所有消费者到自建mq --------------");
    }

    private void initializeConcurrentlyConsumer(Map<String, DefaultMQPushConsumer> map, Map<String, String> map2, Map<String, String> map3) throws MQClientException {
        for (Map.Entry entry : this.context.getBeansWithAnnotation(RocketMqListener.class).entrySet()) {
            Class<?> cls = entry.getValue().getClass();
            RocketMqListener rocketMqListener = (RocketMqListener) cls.getDeclaredAnnotation(RocketMqListener.class);
            String str = rocketMqListener.topic();
            String tag = rocketMqListener.tag();
            String consumerGroup = rocketMqListener.consumerGroup();
            validate(map2, map3, cls, str, consumerGroup);
            DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer(consumerGroup);
            defaultMQPushConsumer.setNamesrvAddr(this.configuration.getNamesrvAddr());
            defaultMQPushConsumer.setConsumeThreadMin(this.configuration.getConsumerConsumeThreadMin());
            defaultMQPushConsumer.setConsumeThreadMax(this.configuration.getConsumerConsumeThreadMax());
            defaultMQPushConsumer.setConsumeMessageBatchMaxSize(this.configuration.getConsumerConsumeMessageBatchMaxSize());
            defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            defaultMQPushConsumer.subscribe(str, tag);
            defaultMQPushConsumer.registerMessageListener((list, consumeConcurrentlyContext) -> {
                try {
                    Iterator it = list.iterator();
                    while (it.hasNext()) {
                        switch (((MessageListener) entry.getValue()).consume((MessageExt) it.next(), consumeConcurrentlyContext)) {
                            case ReconsumeLater:
                                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                        }
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                } catch (Exception e) {
                    log.error("消费失败", e);
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
            });
            map2.put(str, cls.getSimpleName());
            map3.put(consumerGroup, cls.getSimpleName());
            map.put(String.format("%s-%s", str, tag), defaultMQPushConsumer);
        }
    }

    private void validate(Map<String, String> map, Map<String, String> map2, Class<?> cls, String str, String str2) {
        if (StringUtils.isBlank(str)) {
            throw new RuntimeException(cls.getSimpleName() + ":topic不能为空");
        }
        if (StringUtils.isBlank(str2)) {
            throw new RuntimeException(cls.getSimpleName() + ":consumerGroup不能为空");
        }
        if (map.containsKey(str)) {
            throw new RuntimeException(String.format("Topic:%s 已经由%s监听 请勿重复监听同一Topic", str, cls.getSimpleName()));
        }
        if (map2.containsKey(str2)) {
            throw new RuntimeException(String.format("consumerGroup:%s 已经由%s监听 请勿重复监听同一consumerGroup", str2, cls.getSimpleName()));
        }
    }

    private void initializeOrderConsumer(Map<String, DefaultMQPushConsumer> map, Map<String, String> map2, Map<String, String> map3) throws MQClientException {
        for (Map.Entry entry : this.context.getBeansWithAnnotation(RocketMqOrderListener.class).entrySet()) {
            Class<?> cls = entry.getValue().getClass();
            RocketMqOrderListener rocketMqOrderListener = (RocketMqOrderListener) cls.getDeclaredAnnotation(RocketMqOrderListener.class);
            String str = rocketMqOrderListener.topic();
            String tag = rocketMqOrderListener.tag();
            String consumerGroup = rocketMqOrderListener.consumerGroup();
            validate(map2, map3, cls, str, consumerGroup);
            DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer(consumerGroup);
            defaultMQPushConsumer.setNamesrvAddr(this.configuration.getNamesrvAddr());
            defaultMQPushConsumer.subscribe(str, tag);
            defaultMQPushConsumer.registerMessageListener((list, consumeOrderlyContext) -> {
                try {
                    Iterator it = list.iterator();
                    while (it.hasNext()) {
                        switch (((MessageOrderListener) entry.getValue()).consume((MessageExt) it.next(), consumeOrderlyContext)) {
                            case ReconsumeLater:
                                return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                        }
                    }
                    return ConsumeOrderlyStatus.SUCCESS;
                } catch (Exception e) {
                    log.error("消费失败", e);
                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                }
            });
            map2.put(str, cls.getSimpleName());
            map3.put(consumerGroup, cls.getSimpleName());
            map.put(String.format("%s-%s", str, tag), defaultMQPushConsumer);
        }
    }

    private void initializeBroadcastConsumer(Map<String, DefaultMQPushConsumer> map, Map<String, String> map2, Map<String, String> map3) throws MQClientException {
        for (Map.Entry entry : this.context.getBeansWithAnnotation(RocketMqBroadcastListener.class).entrySet()) {
            Class<?> cls = entry.getValue().getClass();
            RocketMqBroadcastListener rocketMqBroadcastListener = (RocketMqBroadcastListener) cls.getDeclaredAnnotation(RocketMqBroadcastListener.class);
            String str = rocketMqBroadcastListener.topic();
            String tag = rocketMqBroadcastListener.tag();
            String consumerGroup = rocketMqBroadcastListener.consumerGroup();
            validate(map2, map3, cls, str, consumerGroup);
            DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer(consumerGroup);
            defaultMQPushConsumer.setMessageModel(MessageModel.BROADCASTING);
            defaultMQPushConsumer.setNamesrvAddr(this.configuration.getNamesrvAddr());
            defaultMQPushConsumer.subscribe(str, tag);
            defaultMQPushConsumer.registerMessageListener((list, consumeConcurrentlyContext) -> {
                try {
                    Iterator it = list.iterator();
                    while (it.hasNext()) {
                        switch (((MessageBroadcastListener) entry.getValue()).consume((MessageExt) it.next(), consumeConcurrentlyContext)) {
                            case ReconsumeLater:
                                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                        }
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                } catch (Exception e) {
                    log.error("消费失败", e);
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
            });
            map2.put(str, cls.getSimpleName());
            map3.put(consumerGroup, cls.getSimpleName());
            map.put(String.format("%s-%s", str, tag), defaultMQPushConsumer);
        }
    }
}
