package org.redisson.rx;

import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.reactivex.Flowable;
import io.reactivex.functions.Action;
import io.reactivex.functions.LongConsumer;
import io.reactivex.processors.ReplayProcessor;
import java.util.concurrent.atomic.AtomicLong;
import org.redisson.api.RTopic;
import org.redisson.api.listener.MessageListener;

/* loaded from: input_file:BOOT-INF/lib/redisson-3.9.1.jar:org/redisson/rx/RedissonTopicRx.class */
public class RedissonTopicRx {
    private final RTopic topic;

    /* renamed from: org.redisson.rx.RedissonTopicRx$1, reason: invalid class name */
    /* loaded from: input_file:BOOT-INF/lib/redisson-3.9.1.jar:org/redisson/rx/RedissonTopicRx$1.class */
    class AnonymousClass1 implements LongConsumer {
        final /* synthetic */ Class val$type;
        final /* synthetic */ ReplayProcessor val$p;

        AnonymousClass1(Class cls, ReplayProcessor replayProcessor) {
            this.val$type = cls;
            this.val$p = replayProcessor;
        }

        @Override // io.reactivex.functions.LongConsumer
        public void accept(long j) throws Exception {
            final AtomicLong atomicLong = new AtomicLong(j);
            RedissonTopicRx.this.topic.addListenerAsync(this.val$type, new MessageListener<M>() { // from class: org.redisson.rx.RedissonTopicRx.1.1
                @Override // org.redisson.api.listener.MessageListener
                public void onMessage(CharSequence charSequence, M m) {
                    AnonymousClass1.this.val$p.onNext(m);
                    if (atomicLong.decrementAndGet() == 0) {
                        RedissonTopicRx.this.topic.removeListenerAsync(this);
                        AnonymousClass1.this.val$p.onComplete();
                    }
                }
            }).addListener(new FutureListener<Integer>() { // from class: org.redisson.rx.RedissonTopicRx.1.2
                @Override // io.netty.util.concurrent.GenericFutureListener
                public void operationComplete(Future<Integer> future) throws Exception {
                    if (!future.isSuccess()) {
                        AnonymousClass1.this.val$p.onError(future.cause());
                    } else {
                        final Integer now = future.getNow();
                        AnonymousClass1.this.val$p.doOnCancel(new Action() { // from class: org.redisson.rx.RedissonTopicRx.1.2.1
                            @Override // io.reactivex.functions.Action
                            public void run() throws Exception {
                                RedissonTopicRx.this.topic.removeListenerAsync(now.intValue());
                            }
                        });
                    }
                }
            });
        }
    }

    public RedissonTopicRx(RTopic rTopic) {
        this.topic = rTopic;
    }

    public <M> Flowable<M> getMessages(Class<M> cls) {
        ReplayProcessor create = ReplayProcessor.create();
        return (Flowable<M>) create.doOnRequest(new AnonymousClass1(cls, create));
    }
}
