package org.redisson.reactive;

import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.redisson.api.RFuture;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;

/* loaded from: input_file:BOOT-INF/lib/redisson-3.13.4.jar:org/redisson/reactive/ElementsStream.class */
public class ElementsStream {
    private static <V> void take(Callable<RFuture<V>> callable, FluxSink<V> fluxSink, AtomicLong atomicLong, AtomicReference<RFuture<V>> atomicReference) {
        try {
            RFuture<V> call = callable.call();
            atomicReference.set(call);
            call.onComplete((obj, th) -> {
                if (th != null) {
                    fluxSink.error(th);
                    return;
                }
                fluxSink.next(obj);
                if (atomicLong.decrementAndGet() == 0) {
                    fluxSink.complete();
                }
                take(callable, fluxSink, atomicLong, atomicReference);
            });
        } catch (Exception e) {
            fluxSink.error(e);
        }
    }

    public static <V> Flux<V> takeElements(Callable<RFuture<V>> callable) {
        return Flux.create(fluxSink -> {
            fluxSink.onRequest(j -> {
                AtomicLong atomicLong = new AtomicLong(j);
                AtomicReference atomicReference = new AtomicReference();
                take(callable, fluxSink, atomicLong, atomicReference);
                fluxSink.onDispose(() -> {
                    ((RFuture) atomicReference.get()).cancel(true);
                });
            });
        });
    }
}
