package io.jboot.components.mq;

import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import com.jfinal.log.Log;
import io.jboot.Jboot;
import io.jboot.components.serializer.JbootSerializer;
import io.jboot.components.serializer.JbootSerializerManager;
import io.jboot.exception.JbootException;
import io.jboot.utils.StrUtil;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/* loaded from: input_file:io/jboot/components/mq/JbootmqBase.class */
public abstract class JbootmqBase implements Jbootmq {
    private static final Log LOG = Log.getLog(JbootmqBase.class);
    protected JbootSerializer serializer;
    private List<JbootmqMessageListener> allChannelListeners = new CopyOnWriteArrayList();
    private Multimap<String, JbootmqMessageListener> listenersMap = ArrayListMultimap.create();
    protected JbootmqConfig config = (JbootmqConfig) Jboot.config(JbootmqConfig.class);
    protected Set<String> channels = Sets.newHashSet();
    protected Set<String> syncRecevieMessageChannels = Sets.newHashSet();
    private final ExecutorService threadPool = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS, new SynchronousQueue());
    protected boolean isStartListen = false;

    public JbootmqBase() {
        String channel = this.config.getChannel();
        if (StrUtil.isBlank(channel)) {
            return;
        }
        this.channels.addAll(StrUtil.splitToSet(channel, ","));
        if (StrUtil.isNotBlank(this.config.getSyncRecevieMessageChannel())) {
            this.syncRecevieMessageChannels.addAll(StrUtil.splitToSet(this.config.getSyncRecevieMessageChannel(), ","));
        }
    }

    @Override // io.jboot.components.mq.Jbootmq
    public void addMessageListener(JbootmqMessageListener jbootmqMessageListener) {
        this.allChannelListeners.add(jbootmqMessageListener);
    }

    @Override // io.jboot.components.mq.Jbootmq
    public void addMessageListener(JbootmqMessageListener jbootmqMessageListener, String str) {
        for (String str2 : str.split(",")) {
            if (!StrUtil.isBlank(str2)) {
                this.listenersMap.put(str2.trim(), jbootmqMessageListener);
            }
        }
    }

    @Override // io.jboot.components.mq.Jbootmq
    public void removeListener(JbootmqMessageListener jbootmqMessageListener) {
        this.allChannelListeners.remove(jbootmqMessageListener);
        Iterator it = this.listenersMap.keySet().iterator();
        while (it.hasNext()) {
            this.listenersMap.remove((String) it.next(), jbootmqMessageListener);
        }
    }

    @Override // io.jboot.components.mq.Jbootmq
    public void removeAllListeners() {
        this.allChannelListeners.clear();
        this.listenersMap.clear();
    }

    @Override // io.jboot.components.mq.Jbootmq
    public Collection<JbootmqMessageListener> getAllChannelListeners() {
        return this.allChannelListeners;
    }

    @Override // io.jboot.components.mq.Jbootmq
    public Collection<JbootmqMessageListener> getListenersByChannel(String str) {
        return this.listenersMap.get(str);
    }

    public void notifyListeners(String str, Object obj) {
        boolean notifyAll = notifyAll(str, obj, this.allChannelListeners);
        boolean notifyAll2 = notifyAll(str, obj, this.listenersMap.get(str));
        if (notifyAll || notifyAll2) {
            return;
        }
        LOG.warn("recevie mq message, bug has not mq listener to process. channel:" + str + "  message:" + String.valueOf(obj));
    }

    protected boolean notifyAll(String str, Object obj, Collection<JbootmqMessageListener> collection) {
        if (collection == null || collection.size() == 0) {
            return false;
        }
        if (!this.syncRecevieMessageChannels.contains(str)) {
            for (JbootmqMessageListener jbootmqMessageListener : collection) {
                this.threadPool.execute(() -> {
                    jbootmqMessageListener.onMessage(str, obj);
                });
            }
            return true;
        }
        for (JbootmqMessageListener jbootmqMessageListener2 : collection) {
            try {
                jbootmqMessageListener2.onMessage(str, obj);
            } catch (Throwable th) {
                LOG.warn("listener[" + jbootmqMessageListener2.getClass().getName() + "] execute mq message is error. channel:" + str + "  message:" + String.valueOf(obj));
            }
        }
        return true;
    }

    public JbootSerializer getSerializer() {
        if (this.serializer == null) {
            if (StrUtil.isBlank(this.config.getSerializer())) {
                this.serializer = Jboot.getSerializer();
            } else {
                this.serializer = JbootSerializerManager.me().getSerializer(this.config.getSerializer());
            }
        }
        return this.serializer;
    }

    @Override // io.jboot.components.mq.Jbootmq
    public boolean startListening() {
        if (this.isStartListen) {
            throw new JbootException("jboot mq is started before");
        }
        if (this.channels == null || this.channels.isEmpty()) {
            throw new JbootException("mq channels is null or empty, please config channels");
        }
        onStartListening();
        this.isStartListen = true;
        return true;
    }

    protected abstract void onStartListening();
}
