package com.alibaba.mqtt.server;

import com.alibaba.mqtt.server.callback.MessageListener;
import com.alibaba.mqtt.server.callback.StatusListener;
import com.alibaba.mqtt.server.config.ChannelConfig;
import com.alibaba.mqtt.server.config.ConsumerConfig;
import com.alibaba.mqtt.server.model.MessageProperties;
import com.alibaba.mqtt.server.model.StatusNotice;
import com.alibaba.mqtt.server.network.AbstractChannel;
import com.alibaba.mqtt.server.util.ThreadFactoryImpl;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/* loaded from: input_file:com/alibaba/mqtt/server/ServerConsumer.class */
public class ServerConsumer extends AbstractChannel {
    private static final int CONNECTION_NUM = 4;
    private Connection[] connections;
    private Channel[] channels;
    private ConsumerConfig consumerConfig;
    private ExecutorService msgExecutor;
    private ExecutorService statusExecutor;
    private static ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1, new ThreadFactoryImpl("scan_server_consumer_callback_"));
    private Map<String, StatusListener> subscribeStatusMap;
    private Map<String, MessageListener> subscribeTopicMap;

    public ServerConsumer(ChannelConfig channelConfig, ConsumerConfig consumerConfig) {
        super(channelConfig);
        this.connections = new Connection[CONNECTION_NUM];
        this.channels = new Channel[CONNECTION_NUM];
        this.subscribeStatusMap = new ConcurrentHashMap();
        this.subscribeTopicMap = new ConcurrentHashMap();
        this.consumerConfig = consumerConfig;
    }

    @Override // com.alibaba.mqtt.server.network.AbstractChannel
    public void start() throws IOException, TimeoutException {
        if (this.started.compareAndSet(false, true)) {
            super.start();
            for (int i = 0; i < CONNECTION_NUM; i++) {
                this.connections[i] = this.factory.newConnection();
                this.channels[i] = this.connections[i].createChannel();
            }
            this.msgExecutor = new ThreadPoolExecutor(this.consumerConfig.getMinConsumeThreadNum(), this.consumerConfig.getMaxConsumeThreadNum(), 1L, TimeUnit.MINUTES, new LinkedBlockingQueue(30000));
            this.statusExecutor = new ThreadPoolExecutor(this.consumerConfig.getMinConsumeThreadNum(), this.consumerConfig.getMaxConsumeThreadNum(), 1L, TimeUnit.MINUTES, new LinkedBlockingQueue(30000));
            scheduler.scheduleAtFixedRate(new Runnable() { // from class: com.alibaba.mqtt.server.ServerConsumer.1
                @Override // java.lang.Runnable
                public void run() {
                    for (int i2 = 0; i2 < ServerConsumer.CONNECTION_NUM; i2++) {
                        try {
                            if (!ServerConsumer.this.channels[i2].isOpen()) {
                                ServerConsumer.this.channels[i2] = ServerConsumer.this.connections[i2].createChannel();
                                Channel channel = ServerConsumer.this.channels[i2];
                                ServerConsumer.this.subscribeTopicMap.forEach((str, messageListener) -> {
                                    try {
                                        ServerConsumer.this._subscribeTopic(channel, str, messageListener);
                                    } catch (IOException e) {
                                        e.printStackTrace();
                                    }
                                });
                                ServerConsumer.this.subscribeStatusMap.forEach((str2, statusListener) -> {
                                    try {
                                        ServerConsumer.this._subscribeStatus(channel, str2, statusListener);
                                    } catch (IOException e) {
                                        e.printStackTrace();
                                    }
                                });
                            }
                        } catch (Throwable th) {
                            th.printStackTrace();
                            return;
                        }
                    }
                }
            }, 5L, 5L, TimeUnit.SECONDS);
        }
    }

    public void stop() throws IOException {
        for (int i = 0; i < CONNECTION_NUM; i++) {
            this.connections[i].close();
        }
    }

    public void subscribeTopic(String str, MessageListener messageListener) throws IOException {
        if (str == null || messageListener == null) {
            return;
        }
        this.subscribeTopicMap.put(str, messageListener);
        for (int i = 0; i < CONNECTION_NUM; i++) {
            _subscribeTopic(this.channels[i], str, messageListener);
        }
    }

    public void _subscribeTopic(final Channel channel, String str, final MessageListener messageListener) throws IOException {
        channel.basicConsume(str, false, new DefaultConsumer(channel) { // from class: com.alibaba.mqtt.server.ServerConsumer.2
            public void handleDelivery(String str2, final Envelope envelope, final AMQP.BasicProperties basicProperties, final byte[] bArr) {
                ServerConsumer.this.msgExecutor.submit(new Runnable() { // from class: com.alibaba.mqtt.server.ServerConsumer.2.1
                    @Override // java.lang.Runnable
                    public void run() {
                        try {
                            messageListener.process(basicProperties.getMessageId(), new MessageProperties(basicProperties), bArr);
                            channel.basicAck(envelope.getDeliveryTag(), false);
                        } catch (Throwable th) {
                            try {
                                channel.basicNack(envelope.getDeliveryTag(), false, false);
                            } catch (IOException e) {
                            }
                        }
                    }
                });
            }
        });
    }

    public void subscribeStatus(String str, StatusListener statusListener) throws IOException {
        if (str == null || statusListener == null) {
            return;
        }
        this.subscribeStatusMap.put(str, statusListener);
        for (int i = 0; i < CONNECTION_NUM; i++) {
            _subscribeStatus(this.channels[i], str, statusListener);
        }
    }

    public void _subscribeStatus(final Channel channel, String str, final StatusListener statusListener) throws IOException {
        HashMap hashMap = new HashMap(CONNECTION_NUM);
        hashMap.put("GROUP_ID", str);
        channel.basicConsume("STATUS", false, hashMap, new DefaultConsumer(channel) { // from class: com.alibaba.mqtt.server.ServerConsumer.3
            public void handleDelivery(String str2, final Envelope envelope, AMQP.BasicProperties basicProperties, final byte[] bArr) throws IOException {
                ServerConsumer.this.statusExecutor.submit(new Runnable() { // from class: com.alibaba.mqtt.server.ServerConsumer.3.1
                    @Override // java.lang.Runnable
                    public void run() {
                        try {
                            statusListener.process(new StatusNotice(bArr));
                            channel.basicAck(envelope.getDeliveryTag(), false);
                        } catch (Throwable th) {
                            try {
                                channel.basicNack(envelope.getDeliveryTag(), false, false);
                            } catch (IOException e) {
                            }
                        }
                    }
                });
            }
        });
    }
}
