package com.odianyun.mq.consumer.impl;

import com.odianyun.mq.common.message.Destination;
import com.odianyun.mq.common.netty.component.HostInfo;
import com.odianyun.mq.common.netty.component.RouteManager;
import com.odianyun.mq.consumer.Consumer;
import com.odianyun.mq.consumer.ConsumerConfig;
import com.odianyun.mq.consumer.ConsumerFactory;
import com.odianyun.mq.consumer.impl.inner.ConsumerImpl;
import com.odianyun.soa.common.util.ZkUtil;
import com.odianyun.zk.client.ZkClient;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/odianyun/mq/consumer/impl/ConsumerFactoryRealImpl.class */
public final class ConsumerFactoryRealImpl implements ConsumerFactory {
    private static final Logger LOG = LoggerFactory.getLogger(ConsumerFactoryRealImpl.class);
    private static ConsumerFactoryRealImpl instance = new ConsumerFactoryRealImpl();
    private static final RouteManager routerManager = new RouteManager("/MQ/Broker/Consumer", "ConsistentHash");
    private static final Map<String, LinkedBlockingQueue<Consumer>> connectionPool = new ConcurrentHashMap();
    private ZkClient _zkClient = null;

    private ConsumerFactoryRealImpl() {
        init();
    }

    private void init() {
        try {
            this._zkClient = ZkUtil.getZkClientInstance("mq-cluster1.serverList");
        } catch (Exception e) {
            LOG.error("Error occour from ZK :" + e.getMessage(), e);
        }
    }

    public static ConsumerFactory getInstance() {
        return instance;
    }

    public Consumer createConsumer(Destination destination, String str, ConsumerConfig consumerConfig) {
        if (destination == null || str == null || consumerConfig == null) {
            throw new NullPointerException("Null param in createConsumer");
        }
        return createLocalConsumer(destination, str, consumerConfig);
    }

    public Consumer createLocalConsumer(Destination destination, String str, ConsumerConfig consumerConfig) {
        return initConsumer(destination, str, consumerConfig);
    }

    public Consumer createConsumer(Destination destination, String str) {
        return createConsumer(destination, str, new ConsumerConfig());
    }

    public Consumer createConsumer(Destination destination) {
        return createConsumer(destination, destination.getName() + "autoconsumer");
    }

    private Consumer initConsumer(Destination destination, String str, ConsumerConfig consumerConfig) {
        InetSocketAddress consumerAddress = getConsumerAddress();
        String str2 = destination.getAssembleName() + ConsumerImpl.getHostName(consumerAddress) + ":" + consumerAddress.getPort();
        if (connectionPool.containsKey(str2)) {
            LinkedBlockingQueue<Consumer> linkedBlockingQueue = connectionPool.get(str2);
            if (!linkedBlockingQueue.isEmpty() && linkedBlockingQueue.size() >= consumerConfig.getMaxConnectionCount()) {
                try {
                    Consumer poll = linkedBlockingQueue.poll();
                    if (poll != null) {
                        linkedBlockingQueue.put(poll);
                        LOG.warn("Consumer connection reaches the max count:" + consumerConfig.getMaxConnectionCount() + ",reuse an instance " + poll.getRemoteAddress());
                        return poll;
                    }
                } catch (InterruptedException e) {
                    LOG.error(e.getMessage(), e);
                }
            }
        }
        return new ConsumerImpl(destination, str, consumerConfig, consumerAddress, routerManager, connectionPool);
    }

    private InetSocketAddress getConsumerAddress() {
        HostInfo route = routerManager.route();
        if (route == null) {
            throw new NullPointerException("Mq consumer server not found, please contact us for help.");
        }
        return new InetSocketAddress(route.getHost(), route.getPort());
    }
}
