package com.jzt.jk.center.common.rocketmq;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.jzt.jk.center.common.rocketmq.annotation.RocketMqListener;
import com.jzt.jk.center.common.rocketmq.config.RocketMqProperties;
import com.jzt.jk.center.common.rocketmq.enums.MessageMode;
import com.jzt.jk.center.common.rocketmq.enums.MqAction;
import com.jzt.jk.center.common.rocketmq.provider.ConsumerProvider;
import com.jzt.jk.center.common.rocketmq.retrys.limit.DefaultReConsumeLimitFallback;
import com.jzt.jk.center.common.rocketmq.retrys.limit.ReConsumeLimitFallback;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import javax.annotation.PostConstruct;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.messaging.support.MessageBuilder;

/* loaded from: input_file:com/jzt/jk/center/common/rocketmq/RocketMqConsumerFactory.class */
public class RocketMqConsumerFactory {
    private static final Logger log = LoggerFactory.getLogger(RocketMqConsumerFactory.class);
    public ApplicationContext context;
    private RocketMqProperties configuration;
    private ReConsumeLimitFallback defaultReConsumeLimitFallback = new DefaultReConsumeLimitFallback();
    private Map<String, DefaultMQPushConsumer> consumerMap = Maps.newConcurrentMap();
    private Map<String, String> consumerGroupToListenerClassMap = Maps.newHashMap();
    private RocketMQMessageConverter rocketMQMessageConverter = new RocketMQMessageConverter();

    public RocketMqConsumerFactory(RocketMqProperties rocketMqProperties, ApplicationContext applicationContext) {
        this.configuration = rocketMqProperties;
        this.context = applicationContext;
        initializeConsumer();
    }

    @PostConstruct
    private void start() {
        this.consumerMap.forEach((str, defaultMQPushConsumer) -> {
            try {
                log.info("RocketMQ Consumer => {} Starting.", str);
                defaultMQPushConsumer.start();
                log.info("RocketMQ Consumer => {} Starting success!", str);
            } catch (MQClientException e) {
                log.info("RocketMQ Consumer => {} fail connection.", str);
                log.error(e.getMessage(), e);
            }
        });
    }

    private void initializeConsumer() {
        log.info("begin init rocketmq consumer");
        List<ConsumerInfo> rocketMqListenerList = getRocketMqListenerList();
        rocketMqListenerList.addAll(getCustomerizedRocketMqListeners());
        rocketMqListenerList.forEach(consumerInfo -> {
            try {
                this.consumerMap.put(String.format("%s-%s-%s", consumerInfo.getTopic(), consumerInfo.getTag(), consumerInfo.getConsumerGroup()), createRealRocketMqConsumer(consumerInfo));
                this.consumerGroupToListenerClassMap.put(consumerInfo.getConsumerGroup(), consumerInfo.getHandlerIns().getClass().getSimpleName());
            } catch (MQClientException e) {
                log.error(e.getMessage(), e);
            }
        });
        log.info("end init rocketmq consumer");
    }

    public void update(ConsumerInfo consumerInfo) {
        String format = String.format("%s-%s-%s", consumerInfo.getTopic(), consumerInfo.getTag(), consumerInfo.getConsumerGroup());
        if (this.consumerMap.containsKey(format)) {
            DefaultMQPushConsumer defaultMQPushConsumer = this.consumerMap.get(format);
            this.consumerMap.remove(format);
            this.consumerGroupToListenerClassMap.remove(consumerInfo.getConsumerGroup());
            defaultMQPushConsumer.shutdown();
        }
        try {
            DefaultMQPushConsumer createRealRocketMqConsumer = createRealRocketMqConsumer(consumerInfo);
            this.consumerMap.put(String.format("%s-%s-%s", consumerInfo.getTopic(), consumerInfo.getTag(), consumerInfo.getConsumerGroup()), createRealRocketMqConsumer);
            this.consumerGroupToListenerClassMap.put(consumerInfo.getConsumerGroup(), consumerInfo.getHandlerIns().getClass().getSimpleName());
            createRealRocketMqConsumer.start();
        } catch (MQClientException e) {
            log.error(e.getMessage(), e);
        }
    }

    private List<ConsumerInfo> getRocketMqListenerList() {
        ArrayList newArrayList = Lists.newArrayList();
        for (Map.Entry entry : this.context.getBeansWithAnnotation(RocketMqListener.class).entrySet()) {
            Class<?> cls = entry.getValue().getClass();
            RocketMqListener rocketMqListener = (RocketMqListener) cls.getDeclaredAnnotation(RocketMqListener.class);
            newArrayList.add(ConsumerInfo.build(entry.getValue(), getMessageType(cls), rocketMqListener, this.configuration));
        }
        return newArrayList;
    }

    private List<ConsumerInfo> getCustomerizedRocketMqListeners() {
        Map beansOfType = this.context.getBeansOfType(ConsumerProvider.class);
        if (null == beansOfType || beansOfType.isEmpty()) {
            return new ArrayList();
        }
        ArrayList arrayList = new ArrayList();
        beansOfType.forEach((str, consumerProvider) -> {
            log.info("customerized consumer provider :" + str + " has been supplying ...");
            List<ConsumerInfo> supplyConsumers = consumerProvider.supplyConsumers();
            if (null == supplyConsumers || supplyConsumers.isEmpty()) {
                return;
            }
            arrayList.addAll(supplyConsumers);
        });
        return arrayList;
    }

    private RPCHook getAclRPCHook(String str, String str2) {
        return new AclClientRPCHook(new SessionCredentials(str, str2));
    }

    private DefaultMQPushConsumer createRealRocketMqConsumer(ConsumerInfo consumerInfo) throws MQClientException {
        validate(this.consumerGroupToListenerClassMap, consumerInfo);
        String ak = this.configuration.getAk();
        DefaultMQPushConsumer defaultMQPushConsumer = StringUtils.isNotBlank(ak) ? new DefaultMQPushConsumer(consumerInfo.getConsumerGroup(), getAclRPCHook(ak, this.configuration.getSk()), new AllocateMessageQueueAveragely()) : new DefaultMQPushConsumer(consumerInfo.getConsumerGroup());
        defaultMQPushConsumer.setNamesrvAddr(consumerInfo.getNamesrvAddr());
        defaultMQPushConsumer.setConsumeThreadMin(consumerInfo.getMinConcurrentCount());
        defaultMQPushConsumer.setConsumeThreadMax(consumerInfo.getMaxConcurrentCount());
        defaultMQPushConsumer.setConsumeMessageBatchMaxSize(consumerInfo.getMessageBatchMaxSize());
        switch (consumerInfo.getMessageMode()) {
            case Broadcast:
                defaultMQPushConsumer.setMessageModel(MessageModel.BROADCASTING);
                break;
            default:
                defaultMQPushConsumer.setMessageModel(MessageModel.CLUSTERING);
                break;
        }
        defaultMQPushConsumer.setConsumeFromWhere(consumerInfo.getConsumeFromWhere());
        defaultMQPushConsumer.subscribe(consumerInfo.getTopic(), consumerInfo.getTag());
        switch (consumerInfo.getConsumeMode()) {
            case Ordinary:
                defaultMQPushConsumer.registerMessageListener((list, consumeOrderlyContext) -> {
                    return doOrderListenerHandle(list, consumeOrderlyContext, consumerInfo);
                });
                break;
            default:
                defaultMQPushConsumer.registerMessageListener((list2, consumeConcurrentlyContext) -> {
                    return doConcurrentListenerHandle(list2, consumeConcurrentlyContext, consumerInfo);
                });
                break;
        }
        if (MessageMode.Broadcast == consumerInfo.getMessageMode()) {
            defaultMQPushConsumer.setUnitName(consumerInfo.getServer());
        }
        log.info(String.format("create real rocketmq consumer topic:%s tag:%s consumer-group:%s handler:%s", consumerInfo.getTopic(), consumerInfo.getTag(), consumerInfo.getConsumerGroup(), consumerInfo.getHandlerIns().getClass().getSimpleName()));
        return defaultMQPushConsumer;
    }

    private void validate(Map<String, String> map, ConsumerInfo consumerInfo) {
        String simpleName = consumerInfo.getHandlerIns().getClass().getSimpleName();
        if (StringUtils.isBlank(consumerInfo.getTopic())) {
            throw new RuntimeException(simpleName + ":topic为空");
        }
        if (StringUtils.isBlank(consumerInfo.getConsumerGroup())) {
            throw new RuntimeException(simpleName + ":consumerGroup为空");
        }
        if (map.containsKey(consumerInfo.getConsumerGroup())) {
            throw new RuntimeException(String.format("consumerGroup:%s 已经由%s监听 请勿重复监听同一consumerGroup", consumerInfo.getConsumerGroup(), map.get(consumerInfo.getConsumerGroup())));
        }
    }

    private ConsumeConcurrentlyStatus doConcurrentListenerHandle(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext, ConsumerInfo consumerInfo) {
        try {
            switch (doRealInsHandler(list, consumeConcurrentlyContext.getMessageQueue(), consumerInfo)) {
                case ReconsumeLater:
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                default:
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        } catch (Exception e) {
            log.error("消费失败", e);
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
        log.error("消费失败", e);
        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
    }

    private ConsumeOrderlyStatus doOrderListenerHandle(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext, ConsumerInfo consumerInfo) {
        try {
            switch (doRealInsHandler(list, consumeOrderlyContext.getMessageQueue(), consumerInfo)) {
                case ReconsumeLater:
                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                default:
                    return ConsumeOrderlyStatus.SUCCESS;
            }
        } catch (Exception e) {
            log.error("消费失败", e);
            return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
        }
        log.error("消费失败", e);
        return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
    }

    private MqAction doRealInsHandler(List<MessageExt> list, MessageQueue messageQueue, ConsumerInfo consumerInfo) {
        Object handlerIns = consumerInfo.getHandlerIns();
        Type messageType = consumerInfo.getMessageType();
        String messageCharset = consumerInfo.getMessageCharset();
        Class<?> cls = handlerIns.getClass();
        MqAction mqAction = MqAction.ReconsumeLater;
        if (null == list || list.isEmpty()) {
            return MqAction.CommitMessage;
        }
        try {
            return Arrays.asList(cls.getInterfaces()).contains(MessageListener.class) ? singleMessageListenerHandle(list, messageQueue, (MessageListener) handlerIns, consumerInfo, messageType, messageCharset) : Arrays.asList(cls.getInterfaces()).contains(RocketMessageListener.class) ? rocketMessageListenerHandle(list, messageQueue, (RocketMessageListener) handlerIns, consumerInfo, messageType, messageCharset) : batchMessageListenerHandle(list, messageQueue, (BatchMessageListener) handlerIns, consumerInfo, messageType, messageCharset);
        } catch (Exception e) {
            log.error("rocketmq message handle with exception,will reconsume later", e);
            return MqAction.ReconsumeLater;
        }
    }

    private Type getMessageType(Class<?> cls) {
        for (Type type : cls.getGenericInterfaces()) {
            Type rawType = ((ParameterizedType) type).getRawType();
            if (rawType.equals(BatchMessageListener.class) || rawType.equals(MessageListener.class) || rawType.equals(RocketMessageListener.class)) {
                Type[] actualTypeArguments = ((ParameterizedType) type).getActualTypeArguments();
                if (0 < actualTypeArguments.length) {
                    return actualTypeArguments[0];
                }
            }
        }
        return null;
    }

    private Object doConvertMessage(MessageExt messageExt, Type type, String str) {
        if (Objects.equals(type, MessageExt.class) || Objects.equals(type, Message.class)) {
            return messageExt;
        }
        String str2 = new String(messageExt.getBody(), Charset.forName(str));
        if (Objects.equals(type, String.class)) {
            return str2;
        }
        try {
            if (type instanceof Class) {
                return this.rocketMQMessageConverter.getMessageConverter().fromMessage(MessageBuilder.withPayload(str2).build(), (Class) type);
            }
            throw new RuntimeException("messageType need has a not Generic Parameter :" + type);
        } catch (Exception e) {
            log.info("convert failed. str:{}, msgType:{}", str2, type);
            throw new RuntimeException("cannot convert message to " + type, e);
        }
    }

    private MqAction rocketMessageListenerHandle(List<MessageExt> list, MessageQueue messageQueue, RocketMessageListener rocketMessageListener, ConsumerInfo consumerInfo, Type type, String str) {
        for (MessageExt messageExt : list) {
            if (!isOverReconsumeLimit(messageExt, consumerInfo)) {
                try {
                    MqAction consume = rocketMessageListener.consume(doConvertMessage(messageExt, type, str), messageExt, messageQueue);
                    if (null == consume) {
                        return MqAction.CommitMessage;
                    }
                    if (consume.equals(MqAction.ReconsumeLater)) {
                        return MqAction.ReconsumeLater;
                    }
                } catch (Exception e) {
                    log.error(e.getMessage(), e);
                    return MqAction.ReconsumeLater;
                }
            }
        }
        return MqAction.CommitMessage;
    }

    private MqAction singleMessageListenerHandle(List<MessageExt> list, MessageQueue messageQueue, MessageListener messageListener, ConsumerInfo consumerInfo, Type type, String str) {
        for (MessageExt messageExt : list) {
            if (!isOverReconsumeLimit(messageExt, consumerInfo)) {
                try {
                    MqAction consume = messageListener.consume(doConvertMessage(messageExt, type, str), messageQueue);
                    if (null != consume && consume.equals(MqAction.ReconsumeLater)) {
                        return MqAction.ReconsumeLater;
                    }
                } catch (Exception e) {
                    log.error(e.getMessage(), e);
                    return MqAction.ReconsumeLater;
                }
            }
        }
        return MqAction.CommitMessage;
    }

    private MqAction batchMessageListenerHandle(List<MessageExt> list, MessageQueue messageQueue, BatchMessageListener batchMessageListener, ConsumerInfo consumerInfo, Type type, String str) {
        ArrayList newArrayList = Lists.newArrayList();
        for (MessageExt messageExt : list) {
            if (!isOverReconsumeLimit(messageExt, consumerInfo)) {
                newArrayList.add(doConvertMessage(messageExt, type, str));
            }
        }
        return newArrayList.isEmpty() ? MqAction.CommitMessage : batchMessageListener.consume(newArrayList, messageQueue);
    }

    private boolean isOverReconsumeLimit(MessageExt messageExt, ConsumerInfo consumerInfo) {
        if (-1 == consumerInfo.getMaxReConsumeTimes() || messageExt.getReconsumeTimes() <= consumerInfo.getMaxReConsumeTimes()) {
            return false;
        }
        this.defaultReConsumeLimitFallback.onOverLimit(messageExt, consumerInfo);
        return true;
    }
}
