package com.alipay.sofa.jraft.core;

import com.alipay.sofa.jraft.FSMCaller;
import com.alipay.sofa.jraft.ReadOnlyService;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.closure.ReadIndexClosure;
import com.alipay.sofa.jraft.entity.ReadIndexState;
import com.alipay.sofa.jraft.entity.ReadIndexStatus;
import com.alipay.sofa.jraft.error.RaftError;
import com.alipay.sofa.jraft.error.RaftException;
import com.alipay.sofa.jraft.option.RaftOptions;
import com.alipay.sofa.jraft.option.ReadOnlyServiceOptions;
import com.alipay.sofa.jraft.rpc.RpcRequests;
import com.alipay.sofa.jraft.rpc.RpcResponseClosureAdapter;
import com.alipay.sofa.jraft.util.Bytes;
import com.alipay.sofa.jraft.util.DisruptorBuilder;
import com.alipay.sofa.jraft.util.DisruptorMetricSet;
import com.alipay.sofa.jraft.util.LogExceptionHandler;
import com.alipay.sofa.jraft.util.NamedThreadFactory;
import com.alipay.sofa.jraft.util.OnlyForTest;
import com.alipay.sofa.jraft.util.ThreadHelper;
import com.alipay.sofa.jraft.util.Utils;
import com.google.protobuf.ZeroByteStringHelper;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.EventTranslator;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alipay/sofa/jraft/core/ReadOnlyServiceImpl.class */
public class ReadOnlyServiceImpl implements ReadOnlyService, FSMCaller.LastAppliedLogIndexListener {
    private static final int MAX_ADD_REQUEST_RETRY_TIMES = 3;
    private Disruptor<ReadIndexEvent> readIndexDisruptor;
    private RingBuffer<ReadIndexEvent> readIndexQueue;
    private RaftOptions raftOptions;
    private NodeImpl node;
    private FSMCaller fsmCaller;
    private volatile CountDownLatch shutdownLatch;
    private ScheduledExecutorService scheduledExecutorService;
    private NodeMetrics nodeMetrics;
    private volatile RaftException error;
    private static final Logger LOG = LoggerFactory.getLogger(ReadOnlyServiceImpl.class);
    private final Lock lock = new ReentrantLock();
    private final TreeMap<Long, List<ReadIndexStatus>> pendingNotifyStatus = new TreeMap<>();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/alipay/sofa/jraft/core/ReadOnlyServiceImpl$ReadIndexEvent.class */
    public static class ReadIndexEvent {
        Bytes requestContext;
        ReadIndexClosure done;
        CountDownLatch shutdownLatch;
        long startTime;

        private ReadIndexEvent() {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/alipay/sofa/jraft/core/ReadOnlyServiceImpl$ReadIndexEventFactory.class */
    public static class ReadIndexEventFactory implements EventFactory<ReadIndexEvent> {
        private ReadIndexEventFactory() {
        }

        /* renamed from: newInstance, reason: merged with bridge method [inline-methods] */
        public ReadIndexEvent m22newInstance() {
            return new ReadIndexEvent();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/alipay/sofa/jraft/core/ReadOnlyServiceImpl$ReadIndexEventHandler.class */
    public class ReadIndexEventHandler implements EventHandler<ReadIndexEvent> {
        private final List<ReadIndexEvent> events;

        private ReadIndexEventHandler() {
            this.events = new ArrayList(ReadOnlyServiceImpl.this.raftOptions.getApplyBatch());
        }

        public void onEvent(ReadIndexEvent readIndexEvent, long j, boolean z) throws Exception {
            if (readIndexEvent.shutdownLatch != null) {
                ReadOnlyServiceImpl.this.executeReadIndexEvents(this.events);
                this.events.clear();
                readIndexEvent.shutdownLatch.countDown();
            } else {
                this.events.add(readIndexEvent);
                if (this.events.size() >= ReadOnlyServiceImpl.this.raftOptions.getApplyBatch() || z) {
                    ReadOnlyServiceImpl.this.executeReadIndexEvents(this.events);
                    this.events.clear();
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/alipay/sofa/jraft/core/ReadOnlyServiceImpl$ReadIndexResponseClosure.class */
    public class ReadIndexResponseClosure extends RpcResponseClosureAdapter<RpcRequests.ReadIndexResponse> {
        final List<ReadIndexState> states;
        final RpcRequests.ReadIndexRequest request;

        public ReadIndexResponseClosure(List<ReadIndexState> list, RpcRequests.ReadIndexRequest readIndexRequest) {
            this.states = list;
            this.request = readIndexRequest;
        }

        @Override // com.alipay.sofa.jraft.Closure
        public void run(Status status) {
            if (!status.isOk()) {
                notifyFail(status);
                return;
            }
            RpcRequests.ReadIndexResponse response = getResponse();
            if (!response.getSuccess()) {
                notifyFail(new Status(-1, "Fail to run ReadIndex task, maybe the leader stepped down."));
                return;
            }
            ReadIndexStatus readIndexStatus = new ReadIndexStatus(this.states, this.request, response.getIndex());
            Iterator<ReadIndexState> it = this.states.iterator();
            while (it.hasNext()) {
                it.next().setIndex(response.getIndex());
            }
            boolean z = true;
            ReadOnlyServiceImpl.this.lock.lock();
            try {
                if (readIndexStatus.isApplied(ReadOnlyServiceImpl.this.fsmCaller.getLastAppliedIndex())) {
                    ReadOnlyServiceImpl.this.lock.unlock();
                    z = false;
                    ReadOnlyServiceImpl.this.notifySuccess(readIndexStatus);
                } else {
                    ((List) ReadOnlyServiceImpl.this.pendingNotifyStatus.computeIfAbsent(Long.valueOf(readIndexStatus.getIndex()), l -> {
                        return new ArrayList(10);
                    })).add(readIndexStatus);
                }
                z = z;
            } finally {
                if (1 != 0) {
                    ReadOnlyServiceImpl.this.lock.unlock();
                }
            }
        }

        private void notifyFail(Status status) {
            long monotonicMs = Utils.monotonicMs();
            for (ReadIndexState readIndexState : this.states) {
                ReadOnlyServiceImpl.this.nodeMetrics.recordLatency("read-index", monotonicMs - readIndexState.getStartTimeMs());
                ReadIndexClosure done = readIndexState.getDone();
                if (done != null) {
                    Bytes requestContext = readIndexState.getRequestContext();
                    done.run(status, -1L, requestContext != null ? requestContext.get() : null);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void executeReadIndexEvents(List<ReadIndexEvent> list) {
        if (list.isEmpty()) {
            return;
        }
        RpcRequests.ReadIndexRequest.Builder serverId = RpcRequests.ReadIndexRequest.newBuilder().setGroupId(this.node.getGroupId()).setServerId(this.node.getServerId().toString());
        ArrayList arrayList = new ArrayList(list.size());
        for (ReadIndexEvent readIndexEvent : list) {
            serverId.addEntries(ZeroByteStringHelper.wrap(readIndexEvent.requestContext.get()));
            arrayList.add(new ReadIndexState(readIndexEvent.requestContext, readIndexEvent.done, readIndexEvent.startTime));
        }
        RpcRequests.ReadIndexRequest build = serverId.build();
        this.node.handleReadIndexRequest(build, new ReadIndexResponseClosure(arrayList, build));
    }

    private void resetPendingStatusError(Status status) {
        this.lock.lock();
        try {
            Iterator<List<ReadIndexStatus>> it = this.pendingNotifyStatus.values().iterator();
            while (it.hasNext()) {
                Iterator<ReadIndexStatus> it2 = it.next().iterator();
                while (it2.hasNext()) {
                    reportError(it2.next(), status);
                }
            }
            this.pendingNotifyStatus.clear();
            this.lock.unlock();
        } catch (Throwable th) {
            this.lock.unlock();
            throw th;
        }
    }

    @Override // com.alipay.sofa.jraft.Lifecycle
    public boolean init(ReadOnlyServiceOptions readOnlyServiceOptions) {
        this.node = readOnlyServiceOptions.getNode();
        this.nodeMetrics = this.node.getNodeMetrics();
        this.fsmCaller = readOnlyServiceOptions.getFsmCaller();
        this.raftOptions = readOnlyServiceOptions.getRaftOptions();
        this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("ReadOnlyService-PendingNotify-Scanner", true));
        this.readIndexDisruptor = DisruptorBuilder.newInstance().setEventFactory(new ReadIndexEventFactory()).setRingBufferSize(this.raftOptions.getDisruptorBufferSize()).setThreadFactory(new NamedThreadFactory("JRaft-ReadOnlyService-Disruptor-", true)).setWaitStrategy(new BlockingWaitStrategy()).setProducerType(ProducerType.MULTI).build();
        this.readIndexDisruptor.handleEventsWith(new EventHandler[]{new ReadIndexEventHandler()});
        this.readIndexDisruptor.setDefaultExceptionHandler(new LogExceptionHandler(getClass().getSimpleName()));
        this.readIndexQueue = this.readIndexDisruptor.start();
        if (this.nodeMetrics.getMetricRegistry() != null) {
            this.nodeMetrics.getMetricRegistry().register("jraft-read-only-service-disruptor", new DisruptorMetricSet(this.readIndexQueue));
        }
        this.fsmCaller.addLastAppliedLogIndexListener(this);
        this.scheduledExecutorService.scheduleAtFixedRate(() -> {
            onApplied(this.fsmCaller.getLastAppliedIndex());
        }, this.raftOptions.getMaxElectionDelayMs(), this.raftOptions.getMaxElectionDelayMs(), TimeUnit.MILLISECONDS);
        return true;
    }

    @Override // com.alipay.sofa.jraft.ReadOnlyService
    public synchronized void setError(RaftException raftException) {
        if (this.error == null) {
            this.error = raftException;
        }
    }

    @Override // com.alipay.sofa.jraft.Lifecycle
    public synchronized void shutdown() {
        if (this.shutdownLatch != null) {
            return;
        }
        this.shutdownLatch = new CountDownLatch(1);
        Utils.runInThread(() -> {
            this.readIndexQueue.publishEvent((readIndexEvent, j) -> {
                readIndexEvent.shutdownLatch = this.shutdownLatch;
            });
        });
        this.scheduledExecutorService.shutdown();
    }

    @Override // com.alipay.sofa.jraft.ReadOnlyService
    public void join() throws InterruptedException {
        if (this.shutdownLatch != null) {
            this.shutdownLatch.await();
        }
        this.readIndexDisruptor.shutdown();
        resetPendingStatusError(new Status(RaftError.ESTOP, "Node is quit.", new Object[0]));
        this.scheduledExecutorService.awaitTermination(5L, TimeUnit.SECONDS);
    }

    @Override // com.alipay.sofa.jraft.ReadOnlyService
    public void addRequest(byte[] bArr, ReadIndexClosure readIndexClosure) {
        if (this.shutdownLatch != null) {
            Utils.runClosureInThread(readIndexClosure, new Status(RaftError.EHOSTDOWN, "Was stopped", new Object[0]));
            throw new IllegalStateException("Service already shutdown.");
        }
        try {
            EventTranslator eventTranslator = (readIndexEvent, j) -> {
                readIndexEvent.done = readIndexClosure;
                readIndexEvent.requestContext = new Bytes(bArr);
                readIndexEvent.startTime = Utils.monotonicMs();
            };
            int i = 0;
            while (!this.readIndexQueue.tryPublishEvent(eventTranslator)) {
                i++;
                if (i > 3) {
                    Utils.runClosureInThread(readIndexClosure, new Status(RaftError.EBUSY, "Node is busy, has too many read-only requests.", new Object[0]));
                    this.nodeMetrics.recordTimes("read-index-overload-times", 1L);
                    LOG.warn("Node {} ReadOnlyServiceImpl readIndexQueue is overload.", this.node.getNodeId());
                    return;
                }
                ThreadHelper.onSpinWait();
            }
        } catch (Exception e) {
            Utils.runClosureInThread(readIndexClosure, new Status(RaftError.EPERM, "Node is down.", new Object[0]));
        }
    }

    @Override // com.alipay.sofa.jraft.FSMCaller.LastAppliedLogIndexListener
    public void onApplied(long j) {
        ArrayList arrayList = null;
        this.lock.lock();
        try {
            if (this.pendingNotifyStatus.isEmpty()) {
                this.lock.unlock();
                if (0 == 0 || arrayList.isEmpty()) {
                    return;
                }
                Iterator it = arrayList.iterator();
                while (it.hasNext()) {
                    notifySuccess((ReadIndexStatus) it.next());
                }
                return;
            }
            NavigableMap<Long, List<ReadIndexStatus>> headMap = this.pendingNotifyStatus.headMap(Long.valueOf(j), true);
            if (headMap != null) {
                arrayList = new ArrayList(headMap.size() << 1);
                Iterator<Map.Entry<Long, List<ReadIndexStatus>>> it2 = headMap.entrySet().iterator();
                while (it2.hasNext()) {
                    arrayList.addAll(it2.next().getValue());
                    it2.remove();
                }
            }
            if (this.error != null) {
                resetPendingStatusError(this.error.getStatus());
            }
        } finally {
            this.lock.unlock();
            if (arrayList != null && !arrayList.isEmpty()) {
                Iterator it3 = arrayList.iterator();
                while (it3.hasNext()) {
                    notifySuccess((ReadIndexStatus) it3.next());
                }
            }
        }
    }

    @OnlyForTest
    void flush() throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        this.readIndexQueue.publishEvent((readIndexEvent, j) -> {
            readIndexEvent.shutdownLatch = countDownLatch;
        });
        countDownLatch.await();
    }

    @OnlyForTest
    TreeMap<Long, List<ReadIndexStatus>> getPendingNotifyStatus() {
        return this.pendingNotifyStatus;
    }

    private void reportError(ReadIndexStatus readIndexStatus, Status status) {
        long monotonicMs = Utils.monotonicMs();
        List<ReadIndexState> states = readIndexStatus.getStates();
        int size = states.size();
        for (int i = 0; i < size; i++) {
            ReadIndexState readIndexState = states.get(i);
            ReadIndexClosure done = readIndexState.getDone();
            if (done != null) {
                this.nodeMetrics.recordLatency("read-index", monotonicMs - readIndexState.getStartTimeMs());
                done.run(status);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void notifySuccess(ReadIndexStatus readIndexStatus) {
        long monotonicMs = Utils.monotonicMs();
        List<ReadIndexState> states = readIndexStatus.getStates();
        int size = states.size();
        for (int i = 0; i < size; i++) {
            ReadIndexState readIndexState = states.get(i);
            ReadIndexClosure done = readIndexState.getDone();
            if (done != null) {
                this.nodeMetrics.recordLatency("read-index", monotonicMs - readIndexState.getStartTimeMs());
                done.setResult(readIndexState.getIndex(), readIndexState.getRequestContext().get());
                done.run(Status.OK());
            }
        }
    }
}
