package com.odianyun.mq.consumer.impl.inner;

import com.odianyun.mq.common.inner.util.IPUtil;
import com.odianyun.mq.common.inner.util.NameCheckUtil;
import com.odianyun.mq.common.inner.wrap.WrappedConsumerMessage;
import com.odianyun.mq.common.inner.wrap.WrappedMessage;
import com.odianyun.mq.common.message.Destination;
import com.odianyun.mq.common.netty.component.DefaultThreadFactory;
import com.odianyun.mq.common.netty.component.HostInfo;
import com.odianyun.mq.common.netty.component.RouteManager;
import com.odianyun.mq.common.properties.OmqConfigUtil;
import com.odianyun.mq.common.protocol.json.JsonDecoder;
import com.odianyun.mq.common.protocol.json.JsonEncoder;
import com.odianyun.mq.consumer.Consumer;
import com.odianyun.mq.consumer.ConsumerConfig;
import com.odianyun.mq.consumer.MessageListener;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
import org.jboss.netty.handler.codec.frame.LengthFieldPrepender;
import org.jboss.netty.util.HashedWheelTimer;
import org.jboss.netty.util.Timer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/odianyun/mq/consumer/impl/inner/ConsumerImpl.class */
public class ConsumerImpl implements Consumer {
    private ConsumerThread consumerThread;
    private InetSocketAddress consumerAddress;
    private String consumerId;
    private Destination dest;
    private ClientBootstrap bootstrap;
    private MessageListener listener;
    private ConsumerConfig config;
    private final RouteManager routerManager;
    private final Map<String, LinkedBlockingQueue<Consumer>> connectionPool;
    private static final int READ_TIMEOUT_SECONDS = 60;
    private static final int WRITE_TIMEOUT_SECONDS = 20;
    private MessageClientHandler handler;
    private static final Logger LOG = LoggerFactory.getLogger(ConsumerImpl.class);
    private static Map<InetSocketAddress, String> inetAddress2HostName = new HashMap();
    private static NioClientSocketChannelFactory nioClientSocketChannelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
    private static boolean clientSocketClose = false;
    private static Timer timer = new HashedWheelTimer(new DefaultThreadFactory("Consumer-Client-HashedWheelTimer"));
    private volatile boolean closed = false;
    private volatile AtomicBoolean started = new AtomicBoolean(false);
    private final String consumerIP = IPUtil.getFirstNoLoopbackIP4Address();
    private final long connectInterval = 5000;
    private final List<ConsumerImpl> children = new ArrayList();
    private AtomicBoolean isRestart = new AtomicBoolean(false);

    public ConsumerImpl(Destination destination, String str, ConsumerConfig consumerConfig, InetSocketAddress inetSocketAddress, RouteManager routeManager, Map<String, LinkedBlockingQueue<Consumer>> map) {
        if (!NameCheckUtil.isTopicNameValid(destination.getAssembleName())) {
            throw new IllegalArgumentException("Topic " + destination.getAssembleName() + " is invalid,please check length is not greater than 30");
        }
        if (!NameCheckUtil.isConsumerIdValid(str)) {
            throw new IllegalArgumentException("ConsumerId " + str + " is invalid,please check length is not greater than 28");
        }
        this.dest = destination;
        this.consumerId = str;
        this.config = consumerConfig == null ? new ConsumerConfig() : consumerConfig;
        this.connectionPool = map;
        this.routerManager = routeManager;
        this.consumerAddress = inetSocketAddress;
    }

    public boolean isClosed() {
        return this.closed;
    }

    @Override // com.odianyun.mq.consumer.Consumer
    public void start() {
        LOG.info("Starting " + toString());
        if (this.listener == null) {
            throw new IllegalArgumentException("MessageListener is null, MessageListener should be set(use setListener()) before start.");
        }
        if (!OmqConfigUtil.isConsumerEnabled()) {
            LOG.error("omq consumer enable false , will not not start this consumer");
            return;
        }
        if (this.consumerAddress == null) {
            return;
        }
        if (this.started.compareAndSet(false, true)) {
            init();
            this.consumerThread = new ConsumerThread();
            this.consumerThread.setBootstrap(this.bootstrap);
            this.consumerThread.setRemoteAddress(this.consumerAddress);
            this.consumerThread.setInterval(5000L);
            this.consumerThread.setName("ConsumerThread_" + this.dest.getAssembleName());
            this.consumerThread.setConsumer(this);
            this.consumerThread.start();
            this.isRestart.set(false);
        }
        LOG.info("Consumer started: {}", this);
        for (ConsumerImpl consumerImpl : this.children) {
            try {
                consumerImpl.start();
            } catch (Exception e) {
                e.printStackTrace();
                LOG.error("Child consumer started error: {}", consumerImpl);
            }
        }
    }

    @Override // com.odianyun.mq.consumer.Consumer
    public void setListener(MessageListener messageListener) {
        this.listener = messageListener;
        Iterator<ConsumerImpl> it = this.children.iterator();
        while (it.hasNext()) {
            it.next().setListener(messageListener);
        }
    }

    private void init() {
        LOG.info("Initializing consumer named {} of topic {}", this.consumerId, this.dest.getAssembleName());
        synchronized (this.connectionPool) {
            if (clientSocketClose) {
                nioClientSocketChannelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
                clientSocketClose = false;
            }
            this.bootstrap = new ClientBootstrap(nioClientSocketChannelFactory);
            this.bootstrap.setPipelineFactory(new ChannelPipelineFactory() { // from class: com.odianyun.mq.consumer.impl.inner.ConsumerImpl.1
                public ChannelPipeline getPipeline() throws Exception {
                    ConsumerImpl.this.handler = new MessageClientHandler(this);
                    ChannelPipeline pipeline = Channels.pipeline();
                    pipeline.addLast("heartbeat", new HeartBeatHandler(ConsumerImpl.timer, 60, 20, 0));
                    pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
                    pipeline.addLast("jsonDecoder", new JsonDecoder(WrappedMessage.class));
                    pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
                    pipeline.addLast("jsonEncoder", new JsonEncoder(WrappedConsumerMessage.class));
                    pipeline.addLast("handler", ConsumerImpl.this.handler);
                    return pipeline;
                }
            });
            this.closed = false;
            try {
                String str = this.dest.getAssembleName() + getHostName(this.consumerAddress) + ":" + this.consumerAddress.getPort();
                if (this.connectionPool.containsKey(str)) {
                    this.connectionPool.get(str).put(this);
                } else {
                    LinkedBlockingQueue<Consumer> linkedBlockingQueue = new LinkedBlockingQueue<>();
                    linkedBlockingQueue.put(this);
                    this.connectionPool.put(str, linkedBlockingQueue);
                }
            } catch (InterruptedException e) {
                LOG.error(e.getMessage(), e);
            }
        }
    }

    @Override // com.odianyun.mq.consumer.Consumer
    public void close() {
        this.closed = true;
        String str = this.dest.getAssembleName() + getHostName(this.consumerAddress) + ":" + this.consumerAddress.getPort();
        if (this.connectionPool.containsKey(str)) {
            this.connectionPool.get(str).remove(this);
        }
        if (this.consumerThread != null) {
            this.consumerThread.interrupt();
        }
        try {
            if (this.handler != null) {
                ChannelGroup channelGroup = this.handler.getChannelGroup();
                channelGroup.unbind().await();
                channelGroup.close().await();
                channelGroup.clear();
            }
            if (this.connectionPool.isEmpty()) {
                synchronized (this.connectionPool) {
                    if (this.connectionPool.isEmpty()) {
                        clientSocketClose = true;
                        this.bootstrap.releaseExternalResources();
                        nioClientSocketChannelFactory = null;
                    }
                }
            }
        } catch (InterruptedException e) {
            LOG.error(e.getMessage(), e.getCause());
        }
        this.started.set(false);
        for (ConsumerImpl consumerImpl : this.children) {
            try {
                consumerImpl.close();
            } catch (Exception e2) {
                e2.printStackTrace();
                LOG.error("Close consumer error: {}", consumerImpl);
            }
        }
    }

    @Override // com.odianyun.mq.consumer.Consumer
    public void restart() {
        if (this.isRestart.compareAndSet(false, true)) {
            close();
            do {
                try {
                    Thread.sleep(1500L);
                    this.consumerAddress = getConsumerAddress();
                } catch (InterruptedException e) {
                    LOG.error(e.getMessage(), e);
                }
            } while (this.consumerAddress == null);
            start();
            LOG.warn("Switch to host:" + getHostName(this.consumerAddress) + ":" + this.consumerAddress.getPort());
            for (ConsumerImpl consumerImpl : this.children) {
                try {
                    consumerImpl.restart();
                } catch (Exception e2) {
                    e2.printStackTrace();
                    LOG.error("Restart consumer error: {}", consumerImpl);
                }
            }
        }
    }

    public static synchronized String getHostName(InetSocketAddress inetSocketAddress) {
        if (inetAddress2HostName.containsKey(inetSocketAddress)) {
            return inetAddress2HostName.get(inetSocketAddress);
        }
        String hostName = inetSocketAddress.getHostName();
        inetAddress2HostName.put(inetSocketAddress, hostName);
        return hostName;
    }

    public List<ConsumerImpl> getChildren() {
        return this.children;
    }

    public synchronized void addChild(ConsumerImpl consumerImpl) {
        this.children.add(consumerImpl);
    }

    public synchronized void removeChild(String str) {
        Iterator<ConsumerImpl> it = this.children.iterator();
        while (it.hasNext()) {
            if (it.next().dest.getName().equals(str)) {
                it.remove();
            }
        }
    }

    public boolean isConsumerExist(Destination destination) {
        Iterator<ConsumerImpl> it = this.children.iterator();
        while (it.hasNext()) {
            if (it.next().getDest().getName().equals(destination.getName())) {
                return true;
            }
        }
        return false;
    }

    public String getConsumerId() {
        return this.consumerId;
    }

    public void setConsumerId(String str) {
        this.consumerId = str;
    }

    @Override // com.odianyun.mq.consumer.Consumer
    public Destination getDest() {
        return this.dest;
    }

    public ClientBootstrap getBootstrap() {
        return this.bootstrap;
    }

    public MessageListener getListener() {
        return this.listener;
    }

    public ConsumerConfig getConfig() {
        return this.config;
    }

    public String getConsumerIP() {
        return this.consumerIP;
    }

    @Override // com.odianyun.mq.consumer.Consumer
    public String getRemoteAddress() {
        return this.consumerAddress.getAddress().getHostAddress() + ":" + this.consumerAddress.getPort();
    }

    @Override // com.odianyun.mq.consumer.Consumer
    public InetSocketAddress getConsumerAddress() {
        HostInfo route = this.routerManager.route();
        if (route == null) {
            return null;
        }
        return new InetSocketAddress(route.getHost(), route.getPort());
    }

    public String toString() {
        return String.format("ConsumerImpl [consumerId=%s, topic=%s, consumerAddress=%s,  config=%s]", this.consumerId, this.dest.getAssembleName(), this.consumerAddress, this.config);
    }
}
