package com.alipay.sofa.jraft.rhea.client;

import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.rhea.client.failover.FailoverClosure;
import com.alipay.sofa.jraft.rhea.client.pd.AbstractPlacementDriverClient;
import com.alipay.sofa.jraft.rhea.client.pd.PlacementDriverClient;
import com.alipay.sofa.jraft.rhea.cmd.store.BaseRequest;
import com.alipay.sofa.jraft.rhea.cmd.store.BaseResponse;
import com.alipay.sofa.jraft.rhea.errors.Errors;
import com.alipay.sofa.jraft.rhea.errors.ErrorsHelper;
import com.alipay.sofa.jraft.rhea.options.RpcOptions;
import com.alipay.sofa.jraft.rhea.rpc.ExtSerializerSupports;
import com.alipay.sofa.jraft.rhea.util.concurrent.CallerRunsPolicyWithReport;
import com.alipay.sofa.jraft.rhea.util.concurrent.NamedThreadFactory;
import com.alipay.sofa.jraft.rpc.InvokeCallback;
import com.alipay.sofa.jraft.rpc.InvokeContext;
import com.alipay.sofa.jraft.rpc.RpcClient;
import com.alipay.sofa.jraft.util.Endpoint;
import com.alipay.sofa.jraft.util.ExecutorServiceHelper;
import com.alipay.sofa.jraft.util.Requires;
import com.alipay.sofa.jraft.util.ThreadPoolUtil;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alipay/sofa/jraft/rhea/client/DefaultRheaKVRpcService.class */
public class DefaultRheaKVRpcService implements RheaKVRpcService {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultRheaKVRpcService.class);
    private final PlacementDriverClient pdClient;
    private final RpcClient rpcClient;
    private final Endpoint selfEndpoint;
    private ThreadPoolExecutor rpcCallbackExecutor;
    private int rpcTimeoutMillis;
    private boolean started;

    public DefaultRheaKVRpcService(PlacementDriverClient placementDriverClient, Endpoint endpoint) {
        this.pdClient = placementDriverClient;
        this.rpcClient = ((AbstractPlacementDriverClient) placementDriverClient).getRpcClient();
        this.selfEndpoint = endpoint;
    }

    public synchronized boolean init(RpcOptions rpcOptions) {
        if (this.started) {
            LOG.info("[DefaultRheaKVRpcService] already started.");
            return true;
        }
        this.rpcCallbackExecutor = createRpcCallbackExecutor(rpcOptions);
        this.rpcTimeoutMillis = rpcOptions.getRpcTimeoutMillis();
        Requires.requireTrue(this.rpcTimeoutMillis > 0, "opts.rpcTimeoutMillis must > 0");
        LOG.info("[DefaultRheaKVRpcService] start successfully, options: {}.", rpcOptions);
        this.started = true;
        return true;
    }

    public synchronized void shutdown() {
        ExecutorServiceHelper.shutdownAndAwaitTermination(this.rpcCallbackExecutor);
        this.started = false;
        LOG.info("[DefaultRheaKVRpcService] shutdown successfully.");
    }

    @Override // com.alipay.sofa.jraft.rhea.client.RheaKVRpcService
    public <V> CompletableFuture<V> callAsyncWithRpc(BaseRequest baseRequest, FailoverClosure<V> failoverClosure, Errors errors) {
        return callAsyncWithRpc(baseRequest, failoverClosure, errors, true);
    }

    @Override // com.alipay.sofa.jraft.rhea.client.RheaKVRpcService
    public <V> CompletableFuture<V> callAsyncWithRpc(BaseRequest baseRequest, FailoverClosure<V> failoverClosure, Errors errors, boolean z) {
        internalCallAsyncWithRpc(getRpcEndpoint(baseRequest.getRegionId(), ErrorsHelper.isInvalidPeer(errors), this.rpcTimeoutMillis, z), baseRequest, failoverClosure);
        return failoverClosure.future();
    }

    public Endpoint getLeader(long j, boolean z, long j2) {
        return this.pdClient.getLeader(j, z, j2);
    }

    public Endpoint getLuckyPeer(long j, boolean z, long j2) {
        return this.pdClient.getLuckyPeer(j, z, j2, this.selfEndpoint);
    }

    public Endpoint getRpcEndpoint(long j, boolean z, long j2, boolean z2) {
        return z2 ? getLeader(j, z, j2) : getLuckyPeer(j, z, j2);
    }

    private <V> void internalCallAsyncWithRpc(final Endpoint endpoint, BaseRequest baseRequest, final FailoverClosure<V> failoverClosure) {
        InvokeContext invokeContext = new InvokeContext();
        invokeContext.put("BOLT_CTX", ExtSerializerSupports.getInvokeContext());
        try {
            this.rpcClient.invokeAsync(endpoint, baseRequest, invokeContext, new InvokeCallback() { // from class: com.alipay.sofa.jraft.rhea.client.DefaultRheaKVRpcService.1
                public void complete(Object obj, Throwable th) {
                    if (th != null) {
                        failoverClosure.failure(th);
                        return;
                    }
                    BaseResponse baseResponse = (BaseResponse) obj;
                    if (baseResponse.isSuccess()) {
                        failoverClosure.setData(baseResponse.getValue());
                        failoverClosure.run(Status.OK());
                    } else {
                        failoverClosure.setError(baseResponse.getError());
                        failoverClosure.run(new Status(-1, "RPC failed with address: %s, response: %s", new Object[]{endpoint, baseResponse}));
                    }
                }

                public Executor executor() {
                    return DefaultRheaKVRpcService.this.rpcCallbackExecutor;
                }
            }, this.rpcTimeoutMillis);
        } catch (Throwable th) {
            failoverClosure.failure(th);
        }
    }

    private ThreadPoolExecutor createRpcCallbackExecutor(RpcOptions rpcOptions) {
        int callbackExecutorCorePoolSize = rpcOptions.getCallbackExecutorCorePoolSize();
        int callbackExecutorMaximumPoolSize = rpcOptions.getCallbackExecutorMaximumPoolSize();
        if (callbackExecutorCorePoolSize <= 0 || callbackExecutorMaximumPoolSize <= 0) {
            return null;
        }
        return ThreadPoolUtil.newBuilder().poolName("rheakv-rpc-callback").enableMetric(true).coreThreads(Integer.valueOf(callbackExecutorCorePoolSize)).maximumThreads(Integer.valueOf(callbackExecutorMaximumPoolSize)).keepAliveSeconds(120L).workQueue(new ArrayBlockingQueue(rpcOptions.getCallbackExecutorQueueCapacity())).threadFactory(new NamedThreadFactory("rheakv-rpc-callback", true)).rejectedHandler(new CallerRunsPolicyWithReport("rheakv-rpc-callback")).build();
    }
}
