package com.odianyun.agent.business.mq.service;

import com.alibaba.fastjson.JSON;
import com.google.common.collect.ImmutableMap;
import com.odianyun.agent.business.mq.model.AgentLog;
import com.odianyun.agent.business.mq.model.IHaveCompanyId;
import com.odianyun.architecture.caddy.SystemContext;
import com.odianyun.mq.common.consumer.ConsumerType;
import com.odianyun.mq.common.message.Destination;
import com.odianyun.mq.common.message.Message;
import com.odianyun.mq.consumer.BackoutMessageException;
import com.odianyun.mq.consumer.Consumer;
import com.odianyun.mq.consumer.ConsumerConfig;
import com.odianyun.mq.consumer.ConsumerFactory;
import com.odianyun.mq.consumer.MessageListener;
import com.odianyun.mq.consumer.NeedResendException;
import com.odianyun.mq.consumer.impl.ConsumerFactoryImpl;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;

@Service
@Lazy(false)
/* loaded from: input_file:WEB-INF/lib/agent-business-prod2.10.0-20210318.040232-1.jar:com/odianyun/agent/business/mq/service/OmqDistributor.class */
public class OmqDistributor implements ApplicationContextAware, InitializingBean {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) OmqDistributor.class);
    private static final String CONSUMER_ID_PREFIX = "agent_";
    private ApplicationContext applicationContext;
    private final Map<String, OmqConsumer> listeners = new ConcurrentHashMap();
    private final List<Consumer> consumers = new ArrayList();

    @Override // org.springframework.beans.factory.InitializingBean
    public void afterPropertiesSet() throws Exception {
        this.listeners.putAll(this.applicationContext.getBeansOfType(OmqConsumer.class));
        if (this.listeners.isEmpty()) {
            return;
        }
        ConsumerConfig consumerConfig = new ConsumerConfig();
        consumerConfig.setThreadPoolSize(10);
        consumerConfig.setConsumerType(ConsumerType.CLIENT_ACKNOWLEDGE);
        ConsumerFactory consumerFactoryImpl = ConsumerFactoryImpl.getInstance();
        this.listeners.forEach((str, omqConsumer) -> {
            Objects.requireNonNull(omqConsumer.topic(), "OmqConsumer.topic cannot be null");
            Objects.requireNonNull(omqConsumer.messageObjectType(), "OmqConsumer.messageObjectType cannot be null");
            LOGGER.info(String.format("监听 MQ topic %s 到 bean %s", omqConsumer.topic(), str));
            Consumer createConsumer = consumerFactoryImpl.createConsumer(Destination.topic(omqConsumer.topic()), CONSUMER_ID_PREFIX + str, consumerConfig);
            createConsumer.setListener(new MessageListener() { // from class: com.odianyun.agent.business.mq.service.OmqDistributor.1
                @Override // com.odianyun.mq.consumer.MessageListener
                public void onMessage(Message message) throws BackoutMessageException, NeedResendException {
                    Object transferContentToBean = message.transferContentToBean(omqConsumer.messageObjectType());
                    AgentLog of = AgentLog.of(omqConsumer.topic(), transferContentToBean, str);
                    try {
                        try {
                            OmqDistributor.LOGGER.info(String.format("由 %s 消费 %s topic消息：%s", str, omqConsumer.topic(), JSON.toJSONString(transferContentToBean)));
                            if (transferContentToBean instanceof IHaveCompanyId) {
                                SystemContext.setCompanyId(((IHaveCompanyId) transferContentToBean).getCompanyId());
                            }
                            omqConsumer.tryHandel(transferContentToBean);
                            of.done(true);
                            if (transferContentToBean instanceof IHaveCompanyId) {
                                SystemContext.clean();
                            }
                            OmqLogService.add(of);
                        } catch (Exception e) {
                            of.done(ImmutableMap.of("errorType", e.getClass().getName(), "errorMsg", e.getMessage() == null ? "<N/A>" : e.getMessage()));
                            OmqDistributor.LOGGER.error("处理MQ消息报错", (Throwable) e);
                            if (transferContentToBean instanceof IHaveCompanyId) {
                                SystemContext.clean();
                            }
                            OmqLogService.add(of);
                        }
                    } catch (Throwable th) {
                        if (transferContentToBean instanceof IHaveCompanyId) {
                            SystemContext.clean();
                        }
                        OmqLogService.add(of);
                        throw th;
                    }
                }
            });
            createConsumer.start();
            this.consumers.add(createConsumer);
        });
    }

    @Override // org.springframework.context.ApplicationContextAware
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }
}
