package com.odianyun.mq.consumer.impl.inner;

import com.odianyun.mq.common.MqUtils;
import com.odianyun.mq.common.consumer.ConsumerType;
import com.odianyun.mq.common.inner.consumer.ConsumerMessageType;
import com.odianyun.mq.common.inner.message.MqMessage;
import com.odianyun.mq.common.inner.strategy.DefaultPullStrategy;
import com.odianyun.mq.common.inner.util.ZipUtil;
import com.odianyun.mq.common.inner.wrap.WrappedConsumerMessage;
import com.odianyun.mq.common.inner.wrap.WrappedMessage;
import com.odianyun.mq.common.inner.wrap.WrappedType;
import com.odianyun.mq.consumer.BackoutMessageException;
import com.odianyun.mq.consumer.NeedResendException;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/lib/omq-real-client-2.0.17.4.RELEASE.jar:com/odianyun/mq/consumer/impl/inner/MessageClientHandler.class */
public class MessageClientHandler extends SimpleChannelUpstreamHandler {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) MessageClientHandler.class);
    private final ConsumerImpl consumer;
    private ExecutorService service;
    private ChannelGroup channelGroup = new DefaultChannelGroup();

    public MessageClientHandler(ConsumerImpl consumerImpl) {
        this.consumer = consumerImpl;
        this.service = MqUtils.getThreadPoolExecutor(consumerImpl.getConfig());
    }

    @Override // org.jboss.netty.channel.SimpleChannelUpstreamHandler
    public void channelConnected(ChannelHandlerContext channelHandlerContext, ChannelStateEvent channelStateEvent) throws Exception {
        channelStateEvent.getChannel().write(new WrappedConsumerMessage(ConsumerMessageType.WAKE, this.consumer.getConsumerId(), this.consumer.getDest(), this.consumer.getConfig().getConsumerType(), this.consumer.getConfig().getThreadPoolSize(), this.consumer.getConfig().getMessageFilter()));
        this.channelGroup.add(channelStateEvent.getChannel());
    }

    @Override // org.jboss.netty.channel.SimpleChannelUpstreamHandler
    public void channelDisconnected(ChannelHandlerContext channelHandlerContext, ChannelStateEvent channelStateEvent) throws Exception {
        removeChannel(channelStateEvent);
        super.channelDisconnected(channelHandlerContext, channelStateEvent);
        LOG.info("Channel(remoteAddress=" + channelStateEvent.getChannel().getRemoteAddress() + ") disconnected");
    }

    @Override // org.jboss.netty.channel.SimpleChannelUpstreamHandler
    public void messageReceived(ChannelHandlerContext channelHandlerContext, final MessageEvent messageEvent) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("MessageReceived from " + messageEvent.getChannel().getRemoteAddress());
        }
        final WrappedMessage wrappedMessage = (WrappedMessage) messageEvent.getMessage();
        if (wrappedMessage.getWrappedType().equals(WrappedType.HEARTBEAT)) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Heartbeat received from {} of {}", messageEvent.getRemoteAddress(), this.consumer.getConsumerId() + "#" + this.consumer.getDest().getName());
            }
        } else if (!wrappedMessage.getWrappedType().equals(WrappedType.REFUSE)) {
            this.service.submit(new Runnable() { // from class: com.odianyun.mq.consumer.impl.inner.MessageClientHandler.2
                @Override // java.lang.Runnable
                public void run() {
                    MqMessage content = wrappedMessage.getContent();
                    Long messageId = content.getMessageId();
                    WrappedConsumerMessage wrappedConsumerMessage = new WrappedConsumerMessage(ConsumerMessageType.ACK, messageId, MessageClientHandler.this.consumer.isClosed());
                    try {
                        if (content.getInternalProperties() != null && "gzip".equals(content.getInternalProperties().get("compress"))) {
                            content.setContent(ZipUtil.unzip(content.getContent()));
                        }
                        try {
                            DefaultPullStrategy defaultPullStrategy = new DefaultPullStrategy(MessageClientHandler.this.consumer.getConfig().getMinDelayOnException(), MessageClientHandler.this.consumer.getConfig().getMaxDelayOnException());
                            int i = 0;
                            boolean z = false;
                            while (!z && i <= MessageClientHandler.this.consumer.getConfig().getRetryCountOnException()) {
                                try {
                                    MessageClientHandler.this.consumer.getListener().onMessage(content);
                                    z = true;
                                } catch (BackoutMessageException e) {
                                    i++;
                                    if (i <= MessageClientHandler.this.consumer.getConfig().getRetryCountOnException()) {
                                        MessageClientHandler.LOG.error("BackoutMessageException occur on onMessage(), onMessage() will be retryed soon [retryCount=" + i + "]. ", (Throwable) e);
                                        defaultPullStrategy.fail(true);
                                    } else {
                                        MessageClientHandler.LOG.error("BackoutMessageException occur on onMessage(), onMessage() failed.", (Throwable) e);
                                    }
                                } catch (NeedResendException e2) {
                                    if (MessageClientHandler.this.consumer.getConfig().getConsumerType() != ConsumerType.CLIENT_ACKNOWLEDGE) {
                                        throw new IllegalStateException("Can NOT throw NeedResendException wihthout a CLIENT_ACKNOWLEDGE consumer type");
                                    }
                                    wrappedConsumerMessage.setType(ConsumerMessageType.RESEND);
                                }
                            }
                        } catch (InterruptedException e3) {
                            MessageClientHandler.LOG.warn("InterruptedException in MessageListener", (Throwable) e3);
                        } catch (Throwable th) {
                            MessageClientHandler.LOG.warn("Exception in MessageListener", th);
                        }
                    } catch (IOException e4) {
                        MessageClientHandler.LOG.error("Can not uncompress message with messageId " + messageId, (Throwable) e4);
                    }
                    try {
                        messageEvent.getChannel().write(wrappedConsumerMessage);
                    } catch (RuntimeException e5) {
                        MessageClientHandler.LOG.warn("Write to broker error.", (Throwable) e5);
                    }
                }
            });
        } else if (LOG.isWarnEnabled()) {
            LOG.warn("Broker server close the connect,because host ip in namespace {} topic {} blackList", this.consumer.getDest().getNamespace(), this.consumer.getDest().getName());
            this.service.submit(new Runnable() { // from class: com.odianyun.mq.consumer.impl.inner.MessageClientHandler.1
                @Override // java.lang.Runnable
                public void run() {
                    MessageClientHandler.this.consumer.close();
                }
            });
        }
    }

    @Override // org.jboss.netty.channel.SimpleChannelUpstreamHandler
    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, ExceptionEvent exceptionEvent) {
        removeChannel(exceptionEvent);
        Channel channel = exceptionEvent.getChannel();
        if (channel != null) {
            LOG.warn("Error from channel(remoteAddress=" + channel.getRemoteAddress() + ",switch broker server )", exceptionEvent.getCause());
            channel.close();
        }
    }

    @Override // org.jboss.netty.channel.SimpleChannelUpstreamHandler
    public void channelClosed(ChannelHandlerContext channelHandlerContext, ChannelStateEvent channelStateEvent) throws Exception {
        removeChannel(channelStateEvent);
        super.channelClosed(channelHandlerContext, channelStateEvent);
        LOG.info(channelStateEvent.getChannel().getRemoteAddress() + " closed!");
    }

    private void removeChannel(ChannelEvent channelEvent) {
        this.channelGroup.remove(channelEvent.getChannel());
    }

    public ChannelGroup getChannelGroup() {
        return this.channelGroup;
    }
}
