package org.springframework.data.redis.core;

import java.lang.reflect.Proxy;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import org.reactivestreams.Publisher;
import org.springframework.data.redis.connection.DataType;
import org.springframework.data.redis.connection.ReactiveRedisConnection;
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
import org.springframework.data.redis.connection.ReactiveSubscription;
import org.springframework.data.redis.core.script.DefaultReactiveScriptExecutor;
import org.springframework.data.redis.core.script.ReactiveScriptExecutor;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.data.redis.listener.ReactiveRedisMessageListenerContainer;
import org.springframework.data.redis.listener.Topic;
import org.springframework.data.redis.serializer.RedisElementReader;
import org.springframework.data.redis.serializer.RedisElementWriter;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:BOOT-INF/lib/spring-data-redis-2.1.5.RELEASE.jar:org/springframework/data/redis/core/ReactiveRedisTemplate.class */
public class ReactiveRedisTemplate<K, V> implements ReactiveRedisOperations<K, V> {
    private final ReactiveRedisConnectionFactory connectionFactory;
    private final RedisSerializationContext<K, V> serializationContext;
    private final boolean exposeConnection;
    private final ReactiveScriptExecutor<K> reactiveScriptExecutor;

    public ReactiveRedisTemplate(ReactiveRedisConnectionFactory reactiveRedisConnectionFactory, RedisSerializationContext<K, V> redisSerializationContext) {
        this(reactiveRedisConnectionFactory, redisSerializationContext, false);
    }

    public ReactiveRedisTemplate(ReactiveRedisConnectionFactory reactiveRedisConnectionFactory, RedisSerializationContext<K, V> redisSerializationContext, boolean z) {
        Assert.notNull(reactiveRedisConnectionFactory, "ConnectionFactory must not be null!");
        Assert.notNull(redisSerializationContext, "SerializationContext must not be null!");
        this.connectionFactory = reactiveRedisConnectionFactory;
        this.serializationContext = redisSerializationContext;
        this.exposeConnection = z;
        this.reactiveScriptExecutor = new DefaultReactiveScriptExecutor(reactiveRedisConnectionFactory, redisSerializationContext);
    }

    public ReactiveRedisConnectionFactory getConnectionFactory() {
        return this.connectionFactory;
    }

    @Override // org.springframework.data.redis.core.ReactiveRedisOperations
    public <T> Flux<T> execute(ReactiveRedisCallback<T> reactiveRedisCallback) {
        return execute(reactiveRedisCallback, this.exposeConnection);
    }

    public <T> Flux<T> execute(ReactiveRedisCallback<T> reactiveRedisCallback, boolean z) {
        Assert.notNull(reactiveRedisCallback, "Callback object must not be null");
        ReactiveRedisConnection reactiveConnection = getConnectionFactory().getReactiveConnection();
        try {
            ReactiveRedisConnection preProcessConnection = preProcessConnection(reactiveConnection, false);
            return Flux.from(postProcessResult(reactiveRedisCallback.doInRedis(z ? preProcessConnection : createRedisConnectionProxy(preProcessConnection)), preProcessConnection, false)).doFinally(signalType -> {
                reactiveConnection.close();
            });
        } catch (RuntimeException e) {
            reactiveConnection.close();
            throw e;
        }
    }

    public <T> Flux<T> createFlux(ReactiveRedisCallback<T> reactiveRedisCallback) {
        Assert.notNull(reactiveRedisCallback, "ReactiveRedisCallback must not be null!");
        return Flux.defer(() -> {
            return doInConnection(reactiveRedisCallback, this.exposeConnection);
        });
    }

    public <T> Mono<T> createMono(ReactiveRedisCallback<T> reactiveRedisCallback) {
        Assert.notNull(reactiveRedisCallback, "ReactiveRedisCallback must not be null!");
        return Mono.defer(() -> {
            return Mono.from(doInConnection(reactiveRedisCallback, this.exposeConnection));
        });
    }

    private <T> Publisher<T> doInConnection(ReactiveRedisCallback<T> reactiveRedisCallback, boolean z) {
        Assert.notNull(reactiveRedisCallback, "Callback object must not be null");
        ReactiveRedisConnection reactiveConnection = getConnectionFactory().getReactiveConnection();
        ReactiveRedisConnection preProcessConnection = preProcessConnection(reactiveConnection, false);
        return Flux.from(postProcessResult(reactiveRedisCallback.doInRedis(z ? preProcessConnection : createRedisConnectionProxy(preProcessConnection)), preProcessConnection, false)).doFinally(signalType -> {
            reactiveConnection.close();
        });
    }

    @Override // org.springframework.data.redis.core.ReactiveRedisOperations
    public Mono<Long> convertAndSend(String str, V v) {
        Assert.hasText(str, "Destination channel must not be empty!");
        Assert.notNull(v, "Message must not be null!");
        return createMono(reactiveRedisConnection -> {
            return reactiveRedisConnection.pubSubCommands().publish(getSerializationContext().getStringSerializationPair().write(str), getSerializationContext().getValueSerializationPair().write(v));
        });
    }

    @Override // org.springframework.data.redis.core.ReactiveRedisOperations
    public Flux<? extends ReactiveSubscription.Message<String, V>> listenTo(Topic... topicArr) {
        ReactiveRedisMessageListenerContainer reactiveRedisMessageListenerContainer = new ReactiveRedisMessageListenerContainer(getConnectionFactory());
        return reactiveRedisMessageListenerContainer.receive(Arrays.asList(topicArr), getSerializationContext().getStringSerializationPair(), getSerializationContext().getValueSerializationPair()).doFinally(signalType -> {
            reactiveRedisMessageListenerContainer.destroyLater().subscribeOn(Schedulers.elastic());
        });
    }

    @Override // org.springframework.data.redis.core.ReactiveRedisOperations
    public Mono<Boolean> hasKey(K k) {
        Assert.notNull(k, "Key must not be null!");
        return createMono(reactiveRedisConnection -> {
            return reactiveRedisConnection.keyCommands().exists(rawKey(k));
        });
    }

    @Override // org.springframework.data.redis.core.ReactiveRedisOperations
    public Mono<DataType> type(K k) {
        Assert.notNull(k, "Key must not be null!");
        return createMono(reactiveRedisConnection -> {
            return reactiveRedisConnection.keyCommands().type(rawKey(k));
        });
    }

    @Override // org.springframework.data.redis.core.ReactiveRedisOperations
    public Flux<K> keys(K k) {
        Assert.notNull(k, "Pattern must not be null!");
        return createFlux(reactiveRedisConnection -> {
            return reactiveRedisConnection.keyCommands().keys(rawKey(k));
        }).flatMap((v0) -> {
            return Flux.fromIterable(v0);
        }).map(this::readKey);
    }

    @Override // org.springframework.data.redis.core.ReactiveRedisOperations
    public Flux<K> scan(ScanOptions scanOptions) {
        Assert.notNull(scanOptions, "ScanOptions must not be null!");
        return createFlux(reactiveRedisConnection -> {
            return reactiveRedisConnection.keyCommands().scan(scanOptions);
        }).map(this::readKey);
    }

    @Override // org.springframework.data.redis.core.ReactiveRedisOperations
    public Mono<K> randomKey() {
        return createMono(reactiveRedisConnection -> {
            return reactiveRedisConnection.keyCommands().randomKey();
        }).map(this::readKey);
    }

    @Override // org.springframework.data.redis.core.ReactiveRedisOperations
    public Mono<Boolean> rename(K k, K k2) {
        Assert.notNull(k, "Old key must not be null!");
        Assert.notNull(k2, "New Key must not be null!");
        return createMono(reactiveRedisConnection -> {
            return reactiveRedisConnection.keyCommands().rename(rawKey(k), rawKey(k2));
        });
    }

    @Override // org.springframework.data.redis.core.ReactiveRedisOperations
    public Mono<Boolean> renameIfAbsent(K k, K k2) {
        Assert.notNull(k, "Old key must not be null!");
        Assert.notNull(k2, "New Key must not be null!");
        return createMono(reactiveRedisConnection -> {
            return reactiveRedisConnection.keyCommands().renameNX(rawKey(k), rawKey(k2));
        });
    }

    @Override // org.springframework.data.redis.core.ReactiveRedisOperations
    @SafeVarargs
    public final Mono<Long> delete(K... kArr) {
        Assert.notNull(kArr, "Keys must not be null!");
        Assert.notEmpty(kArr, "Keys must not be empty!");
        Assert.noNullElements(kArr, "Keys must not contain null elements!");
        if (kArr.length == 1) {
            return createMono(reactiveRedisConnection -> {
                return reactiveRedisConnection.keyCommands().del(rawKey(kArr[0]));
            });
        }
        Mono collectList = Flux.fromArray(kArr).map(this::rawKey).collectList();
        return createMono(reactiveRedisConnection2 -> {
            return collectList.flatMap(list -> {
                return reactiveRedisConnection2.keyCommands().mDel((List<ByteBuffer>) list);
            });
        });
    }

    @Override // org.springframework.data.redis.core.ReactiveRedisOperations
    public Mono<Long> delete(Publisher<K> publisher) {
        Assert.notNull(publisher, "Keys must not be null!");
        return createFlux(reactiveRedisConnection -> {
            return reactiveRedisConnection.keyCommands().mDel((Publisher<List<ByteBuffer>>) Flux.from(publisher).map(this::rawKey).buffer(128)).map((v0) -> {
                return v0.getOutput();
            });
        }).collect(Collectors.summingLong(l -> {
            return l.longValue();
        }));
    }

    @Override // org.springframework.data.redis.core.ReactiveRedisOperations
    @SafeVarargs
    public final Mono<Long> unlink(K... kArr) {
        Assert.notNull(kArr, "Keys must not be null!");
        Assert.notEmpty(kArr, "Keys must not be empty!");
        Assert.noNullElements(kArr, "Keys must not contain null elements!");
        if (kArr.length == 1) {
            return createMono(reactiveRedisConnection -> {
                return reactiveRedisConnection.keyCommands().unlink(rawKey(kArr[0]));
            });
        }
        Mono collectList = Flux.fromArray(kArr).map(this::rawKey).collectList();
        return createMono(reactiveRedisConnection2 -> {
            return collectList.flatMap(list -> {
                return reactiveRedisConnection2.keyCommands().mUnlink((List<ByteBuffer>) list);
            });
        });
    }

    @Override // org.springframework.data.redis.core.ReactiveRedisOperations
    public Mono<Long> unlink(Publisher<K> publisher) {
        Assert.notNull(publisher, "Keys must not be null!");
        return createFlux(reactiveRedisConnection -> {
            return reactiveRedisConnection.keyCommands().mUnlink((Publisher<List<ByteBuffer>>) Flux.from(publisher).map(this::rawKey).buffer(128)).map((v0) -> {
                return v0.getOutput();
            });
        }).collect(Collectors.summingLong(l -> {
            return l.longValue();
        }));
    }

    @Override // org.springframework.data.redis.core.ReactiveRedisOperations
    public Mono<Boolean> expire(K k, Duration duration) {
        Assert.notNull(k, "Key must not be null!");
        Assert.notNull(duration, "Timeout must not be null!");
        return duration.getNano() == 0 ? createMono(reactiveRedisConnection -> {
            return reactiveRedisConnection.keyCommands().expire(rawKey(k), duration);
        }) : createMono(reactiveRedisConnection2 -> {
            return reactiveRedisConnection2.keyCommands().pExpire(rawKey(k), duration);
        });
    }

    @Override // org.springframework.data.redis.core.ReactiveRedisOperations
    public Mono<Boolean> expireAt(K k, Instant instant) {
        Assert.notNull(k, "Key must not be null!");
        Assert.notNull(instant, "Expire at must not be null!");
        return instant.getNano() == 0 ? createMono(reactiveRedisConnection -> {
            return reactiveRedisConnection.keyCommands().expireAt(rawKey(k), instant);
        }) : createMono(reactiveRedisConnection2 -> {
            return reactiveRedisConnection2.keyCommands().pExpireAt(rawKey(k), instant);
        });
    }

    @Override // org.springframework.data.redis.core.ReactiveRedisOperations
    public Mono<Boolean> persist(K k) {
        Assert.notNull(k, "Key must not be null!");
        return createMono(reactiveRedisConnection -> {
            return reactiveRedisConnection.keyCommands().persist(rawKey(k));
        });
    }

    @Override // org.springframework.data.redis.core.ReactiveRedisOperations
    public Mono<Duration> getExpire(K k) {
        Assert.notNull(k, "Key must not be null!");
        return createMono(reactiveRedisConnection -> {
            return reactiveRedisConnection.keyCommands().pTtl(rawKey(k)).flatMap(l -> {
                return l.longValue() == -1 ? Mono.just(Duration.ZERO) : l.longValue() == -2 ? Mono.empty() : Mono.just(Duration.ofMillis(l.longValue()));
            });
        });
    }

    @Override // org.springframework.data.redis.core.ReactiveRedisOperations
    public Mono<Boolean> move(K k, int i) {
        Assert.notNull(k, "Key must not be null!");
        return createMono(reactiveRedisConnection -> {
            return reactiveRedisConnection.keyCommands().move(rawKey(k), i);
        });
    }

    @Override // org.springframework.data.redis.core.ReactiveRedisOperations
    public <T> Flux<T> execute(RedisScript<T> redisScript, List<K> list, List<?> list2) {
        return this.reactiveScriptExecutor.execute(redisScript, list, list2);
    }

    @Override // org.springframework.data.redis.core.ReactiveRedisOperations
    public <T> Flux<T> execute(RedisScript<T> redisScript, List<K> list, List<?> list2, RedisElementWriter<?> redisElementWriter, RedisElementReader<T> redisElementReader) {
        return this.reactiveScriptExecutor.execute(redisScript, list, list2, redisElementWriter, redisElementReader);
    }

    protected ReactiveRedisConnection preProcessConnection(ReactiveRedisConnection reactiveRedisConnection, boolean z) {
        return reactiveRedisConnection;
    }

    protected <T> Publisher<T> postProcessResult(Publisher<T> publisher, ReactiveRedisConnection reactiveRedisConnection, boolean z) {
        return publisher;
    }

    protected ReactiveRedisConnection createRedisConnectionProxy(ReactiveRedisConnection reactiveRedisConnection) {
        return (ReactiveRedisConnection) Proxy.newProxyInstance(reactiveRedisConnection.getClass().getClassLoader(), ClassUtils.getAllInterfacesForClass(reactiveRedisConnection.getClass(), getClass().getClassLoader()), new CloseSuppressingInvocationHandler(reactiveRedisConnection));
    }

    @Override // org.springframework.data.redis.core.ReactiveRedisOperations
    public ReactiveGeoOperations<K, V> opsForGeo() {
        return (ReactiveGeoOperations<K, V>) opsForGeo(this.serializationContext);
    }

    @Override // org.springframework.data.redis.core.ReactiveRedisOperations
    public <K1, V1> ReactiveGeoOperations<K1, V1> opsForGeo(RedisSerializationContext<K1, V1> redisSerializationContext) {
        return new DefaultReactiveGeoOperations(this, redisSerializationContext);
    }

    @Override // org.springframework.data.redis.core.ReactiveRedisOperations
    public <HK, HV> ReactiveHashOperations<K, HK, HV> opsForHash() {
        return (ReactiveHashOperations<K, HK, HV>) opsForHash(this.serializationContext);
    }

    @Override // org.springframework.data.redis.core.ReactiveRedisOperations
    public <K1, HK, HV> ReactiveHashOperations<K1, HK, HV> opsForHash(RedisSerializationContext<K1, ?> redisSerializationContext) {
        return new DefaultReactiveHashOperations(this, redisSerializationContext);
    }

    @Override // org.springframework.data.redis.core.ReactiveRedisOperations
    public ReactiveHyperLogLogOperations<K, V> opsForHyperLogLog() {
        return (ReactiveHyperLogLogOperations<K, V>) opsForHyperLogLog(this.serializationContext);
    }

    @Override // org.springframework.data.redis.core.ReactiveRedisOperations
    public <K1, V1> ReactiveHyperLogLogOperations<K1, V1> opsForHyperLogLog(RedisSerializationContext<K1, V1> redisSerializationContext) {
        return new DefaultReactiveHyperLogLogOperations(this, redisSerializationContext);
    }

    @Override // org.springframework.data.redis.core.ReactiveRedisOperations
    public ReactiveListOperations<K, V> opsForList() {
        return (ReactiveListOperations<K, V>) opsForList(this.serializationContext);
    }

    @Override // org.springframework.data.redis.core.ReactiveRedisOperations
    public <K1, V1> ReactiveListOperations<K1, V1> opsForList(RedisSerializationContext<K1, V1> redisSerializationContext) {
        return new DefaultReactiveListOperations(this, redisSerializationContext);
    }

    @Override // org.springframework.data.redis.core.ReactiveRedisOperations
    public ReactiveSetOperations<K, V> opsForSet() {
        return (ReactiveSetOperations<K, V>) opsForSet(this.serializationContext);
    }

    @Override // org.springframework.data.redis.core.ReactiveRedisOperations
    public <K1, V1> ReactiveSetOperations<K1, V1> opsForSet(RedisSerializationContext<K1, V1> redisSerializationContext) {
        return new DefaultReactiveSetOperations(this, redisSerializationContext);
    }

    @Override // org.springframework.data.redis.core.ReactiveRedisOperations
    public ReactiveValueOperations<K, V> opsForValue() {
        return (ReactiveValueOperations<K, V>) opsForValue(this.serializationContext);
    }

    @Override // org.springframework.data.redis.core.ReactiveRedisOperations
    public <K1, V1> ReactiveValueOperations<K1, V1> opsForValue(RedisSerializationContext<K1, V1> redisSerializationContext) {
        return new DefaultReactiveValueOperations(this, redisSerializationContext);
    }

    @Override // org.springframework.data.redis.core.ReactiveRedisOperations
    public ReactiveZSetOperations<K, V> opsForZSet() {
        return (ReactiveZSetOperations<K, V>) opsForZSet(this.serializationContext);
    }

    @Override // org.springframework.data.redis.core.ReactiveRedisOperations
    public <K1, V1> ReactiveZSetOperations<K1, V1> opsForZSet(RedisSerializationContext<K1, V1> redisSerializationContext) {
        return new DefaultReactiveZSetOperations(this, redisSerializationContext);
    }

    @Override // org.springframework.data.redis.core.ReactiveRedisOperations
    public RedisSerializationContext<K, V> getSerializationContext() {
        return this.serializationContext;
    }

    private ByteBuffer rawKey(K k) {
        return getSerializationContext().getKeySerializationPair().getWriter().write(k);
    }

    private K readKey(ByteBuffer byteBuffer) {
        return getSerializationContext().getKeySerializationPair().getReader().read(byteBuffer);
    }
}
