package com.odianyun.mq.consumer.impl.inner;

import com.odianyun.mq.common.Constants;
import com.odianyun.mq.consumer.Consumer;
import com.odianyun.soa.common.util.ZkUtil;
import com.odianyun.zk.client.ZkClient;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.ChannelFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/odianyun/mq/consumer/impl/inner/ConsumerThread.class */
public class ConsumerThread extends Thread {
    private static final Logger LOG = LoggerFactory.getLogger(ConsumerThread.class);
    private ClientBootstrap bootstrap;
    private InetSocketAddress remoteAddress;
    private long interval;
    private Consumer consumer;
    private final int retryMaxCount = 10;

    public void setConsumer(Consumer consumer) {
        this.consumer = consumer;
    }

    public void setBootstrap(ClientBootstrap clientBootstrap) {
        this.bootstrap = clientBootstrap;
    }

    public void setRemoteAddress(InetSocketAddress inetSocketAddress) {
        this.remoteAddress = inetSocketAddress;
    }

    public void setInterval(long j) {
        this.interval = j;
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        int i = 0;
        while (!Thread.currentThread().isInterrupted()) {
            synchronized (this.bootstrap) {
                if (!Thread.currentThread().isInterrupted()) {
                    try {
                        LOG.info("ConsumerThread-try connecting to " + this.remoteAddress);
                        while (this.remoteAddress == null) {
                            Thread.sleep(this.interval);
                            LOG.warn("Maybe all brokers are down,waiting...");
                            this.remoteAddress = this.consumer.getConsumerAddress();
                        }
                        try {
                            ZkClient zkClientInstance = ZkUtil.getZkClientInstance("mq-cluster1.serverList");
                            if (zkClientInstance == null) {
                                LOG.error("Omq client get zk client failed by key {}", "mq-cluster1.serverList");
                                throw new RuntimeException("Can not create zk client please the key mq-cluster1.serverList in osoa config file");
                            }
                            StringBuilder sb = new StringBuilder(Constants.TOPIC_PATH);
                            if (Constants.DEFAULT_NAMESPACE.equals(this.consumer.getDest().getNamespace())) {
                                sb.append('/').append(this.consumer.getDest().getName());
                            } else {
                                sb.append('/').append(this.consumer.getDest().getNamespace()).append('/').append(this.consumer.getDest().getName());
                            }
                            while (!zkClientInstance.exists(sb.toString())) {
                                LOG.warn("Namespace {} topic {} is not exist,please apply for the topic from base architecture", this.consumer.getDest().getNamespace(), this.consumer.getDest().getName());
                                Thread.sleep(this.interval * 60);
                            }
                            ChannelFuture connect = this.bootstrap.connect(this.remoteAddress);
                            connect.awaitUninterruptibly();
                            LOG.debug("Connect finished");
                            if (connect.getChannel() == null || !connect.getChannel().isConnected()) {
                                i++;
                                if (i >= 10) {
                                    i = 0;
                                    this.remoteAddress = this.consumer.getConsumerAddress();
                                }
                                LOG.debug("Retry {} times", Integer.valueOf(i));
                            } else {
                                i = 0;
                                SocketAddress localAddress = connect.getChannel().getLocalAddress();
                                LOG.info("ConsumerThread(localAddress=" + localAddress + ")-connected to " + this.remoteAddress);
                                connect.getChannel().getCloseFuture().awaitUninterruptibly();
                                LOG.info("ConsumerThread(localAddress=" + localAddress + ")-closed from " + this.remoteAddress);
                            }
                        } catch (Exception e) {
                            LOG.error("Omq client get zk client failed:{}", e.getMessage());
                            throw new RuntimeException(e);
                        }
                    } catch (Exception e2) {
                        LOG.error(e2.getMessage(), e2);
                    }
                }
            }
            try {
                Thread.sleep(this.interval);
            } catch (InterruptedException e3) {
                Thread.currentThread().interrupt();
            }
        }
        LOG.warn("ConsumerThread({}) (remoteAddress=" + this.remoteAddress + ") done", this.consumer.getDest().getAssembleName());
    }
}
