package org.springframework.data.redis.connection.lettuce;

import io.lettuce.core.pubsub.api.reactive.RedisPubSubReactiveCommands;
import java.nio.ByteBuffer;
import java.util.function.Function;
import lombok.NonNull;
import org.reactivestreams.Publisher;
import org.springframework.data.redis.connection.ReactivePubSubCommands;
import org.springframework.data.redis.connection.ReactiveSubscription;
import org.springframework.util.Assert;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:WEB-INF/lib/spring-data-redis-2.1.9.RELEASE.jar:org/springframework/data/redis/connection/lettuce/LettuceReactivePubSubCommands.class */
class LettuceReactivePubSubCommands implements ReactivePubSubCommands {

    @NonNull
    private final LettuceReactiveRedisConnection connection;

    @Override // org.springframework.data.redis.connection.ReactivePubSubCommands
    public Mono<ReactiveSubscription> createSubscription() {
        return this.connection.getPubSubConnection().map(statefulRedisPubSubConnection -> {
            return new LettuceReactiveSubscription(statefulRedisPubSubConnection.reactive(), this.connection.translateException());
        });
    }

    @Override // org.springframework.data.redis.connection.ReactivePubSubCommands
    public Flux<Long> publish(Publisher<ReactiveSubscription.ChannelMessage<ByteBuffer, ByteBuffer>> publisher) {
        Assert.notNull(publisher, "ChannelMessage stream must not be null!");
        return this.connection.getCommands().flatMapMany(redisClusterReactiveCommands -> {
            return Flux.from(publisher).flatMap(channelMessage -> {
                return redisClusterReactiveCommands.publish(channelMessage.getChannel(), channelMessage.getMessage());
            });
        });
    }

    @Override // org.springframework.data.redis.connection.ReactivePubSubCommands
    public Mono<Void> subscribe(ByteBuffer... byteBufferArr) {
        Assert.notNull(byteBufferArr, "Channels must not be null!");
        return doWithPubSub(redisPubSubReactiveCommands -> {
            return redisPubSubReactiveCommands.subscribe(byteBufferArr);
        });
    }

    @Override // org.springframework.data.redis.connection.ReactivePubSubCommands
    public Mono<Void> pSubscribe(ByteBuffer... byteBufferArr) {
        Assert.notNull(byteBufferArr, "Patterns must not be null!");
        return doWithPubSub(redisPubSubReactiveCommands -> {
            return redisPubSubReactiveCommands.psubscribe(byteBufferArr);
        });
    }

    private <T> Mono<T> doWithPubSub(Function<RedisPubSubReactiveCommands<ByteBuffer, ByteBuffer>, Mono<T>> function) {
        return this.connection.getPubSubConnection().flatMap(statefulRedisPubSubConnection -> {
            return (Mono) function.apply(statefulRedisPubSubConnection.reactive());
        }).onErrorMap(this.connection.translateException());
    }

    public LettuceReactivePubSubCommands(@NonNull LettuceReactiveRedisConnection lettuceReactiveRedisConnection) {
        if (lettuceReactiveRedisConnection == null) {
            throw new NullPointerException("connection is marked @NonNull but is null");
        }
        this.connection = lettuceReactiveRedisConnection;
    }
}
