package org.apache.shenyu.protocol.mqtt.repositories;

import io.netty.channel.Channel;
import io.netty.handler.codec.mqtt.MqttTopicSubscription;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/shenyu/protocol/mqtt/repositories/SubscribeRepository.class */
public class SubscribeRepository implements BaseRepository<List<String>, List<Channel>> {
    private static final Logger LOG = LoggerFactory.getLogger(SubscribeRepository.class);
    private static final Map<String, List<Channel>> TOPIC_CHANNEL_FACTORY = new ConcurrentHashMap();

    @Override // org.apache.shenyu.protocol.mqtt.repositories.BaseRepository
    public void add(List<String> list, List<Channel> list2) {
        CompletableFuture.runAsync(() -> {
            list.parallelStream().forEach(str -> {
                List<Channel> list3 = get(str);
                list3.addAll(list2);
                TOPIC_CHANNEL_FACTORY.put(str, list3);
            });
        });
    }

    public void add(Channel channel, List<MqttTopicSubscription> list) {
        CompletableFuture.runAsync(() -> {
            list.parallelStream().forEach(mqttTopicSubscription -> {
                List<Channel> list2 = get(mqttTopicSubscription.topicName());
                list2.add(channel);
                TOPIC_CHANNEL_FACTORY.put(mqttTopicSubscription.topicName(), list2);
            });
        });
    }

    @Override // org.apache.shenyu.protocol.mqtt.repositories.BaseRepository
    public void remove(List<String> list) {
        CompletableFuture.runAsync(() -> {
            Stream parallelStream = list.parallelStream();
            Map<String, List<Channel>> map = TOPIC_CHANNEL_FACTORY;
            Objects.requireNonNull(map);
            parallelStream.forEach((v1) -> {
                r1.remove(v1);
            });
        });
    }

    public void remove(List<String> list, Channel channel) {
        CompletableFuture.runAsync(() -> {
            list.parallelStream().forEach(str -> {
                TOPIC_CHANNEL_FACTORY.get(str).remove(channel);
            });
        });
    }

    @Override // org.apache.shenyu.protocol.mqtt.repositories.BaseRepository
    public List<Channel> get(List<String> list) {
        CopyOnWriteArraySet copyOnWriteArraySet = new CopyOnWriteArraySet();
        list.parallelStream().forEach(str -> {
            copyOnWriteArraySet.addAll(TOPIC_CHANNEL_FACTORY.get(str));
        });
        return new CopyOnWriteArrayList(copyOnWriteArraySet);
    }

    public List<Channel> get(String str) {
        return TOPIC_CHANNEL_FACTORY.getOrDefault(str, new CopyOnWriteArrayList());
    }
}
