package org.redisson.rx;

import io.reactivex.Flowable;
import io.reactivex.functions.Action;
import io.reactivex.functions.LongConsumer;
import io.reactivex.processors.ReplayProcessor;
import java.util.concurrent.atomic.AtomicLong;
import org.redisson.api.RFuture;
import org.redisson.api.RTopic;
import org.redisson.api.listener.MessageListener;

/* loaded from: input_file:BOOT-INF/lib/redisson-3.13.4.jar:org/redisson/rx/RedissonTopicRx.class */
public class RedissonTopicRx {
    private final RTopic topic;

    public RedissonTopicRx(RTopic rTopic) {
        this.topic = rTopic;
    }

    public <M> Flowable<M> getMessages(final Class<M> cls) {
        final ReplayProcessor create = ReplayProcessor.create();
        return (Flowable<M>) create.doOnRequest(new LongConsumer() { // from class: org.redisson.rx.RedissonTopicRx.1
            @Override // io.reactivex.functions.LongConsumer
            public void accept(long j) throws Exception {
                final AtomicLong atomicLong = new AtomicLong(j);
                RFuture<Integer> addListenerAsync = RedissonTopicRx.this.topic.addListenerAsync(cls, new MessageListener<M>() { // from class: org.redisson.rx.RedissonTopicRx.1.1
                    @Override // org.redisson.api.listener.MessageListener
                    public void onMessage(CharSequence charSequence, M m) {
                        create.onNext(m);
                        if (atomicLong.decrementAndGet() == 0) {
                            RedissonTopicRx.this.topic.removeListenerAsync(this);
                            create.onComplete();
                        }
                    }
                });
                ReplayProcessor replayProcessor = create;
                addListenerAsync.onComplete((num, th) -> {
                    if (th != null) {
                        replayProcessor.onError(th);
                    } else {
                        replayProcessor.doOnCancel(new Action() { // from class: org.redisson.rx.RedissonTopicRx.1.2
                            @Override // io.reactivex.functions.Action
                            public void run() throws Exception {
                                RedissonTopicRx.this.topic.removeListenerAsync(num);
                            }
                        });
                    }
                });
            }
        });
    }
}
