package org.apache.flink.runtime.checkpoint;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.ConcurrentModificationException;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.state.RetrievableStateHandle;
import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
import org.apache.flink.shaded.org.apache.curator.framework.CuratorFramework;
import org.apache.flink.shaded.org.apache.curator.framework.api.BackgroundCallback;
import org.apache.flink.shaded.org.apache.curator.framework.api.CuratorEvent;
import org.apache.flink.shaded.org.apache.curator.framework.api.CuratorEventType;
import org.apache.flink.shaded.org.apache.curator.utils.ZKPaths;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.class */
public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointStore {
    private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperCompletedCheckpointStore.class);
    private final CuratorFramework client;
    private final ZooKeeperStateHandleStore<CompletedCheckpoint> checkpointsInZooKeeper;
    private final int maxNumberOfCheckpointsToRetain;
    private final ArrayDeque<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> checkpointStateHandles;

    public ZooKeeperCompletedCheckpointStore(int i, CuratorFramework curatorFramework, String str, RetrievableStateStorageHelper<CompletedCheckpoint> retrievableStateStorageHelper, Executor executor) throws Exception {
        Preconditions.checkArgument(i >= 1, "Must retain at least one checkpoint.");
        Preconditions.checkNotNull(retrievableStateStorageHelper, "State storage");
        this.maxNumberOfCheckpointsToRetain = i;
        Preconditions.checkNotNull(curatorFramework, "Curator client");
        Preconditions.checkNotNull(str, "Checkpoints path");
        curatorFramework.newNamespaceAwareEnsurePath(str).ensure(curatorFramework.getZookeeperClient());
        this.client = curatorFramework.usingNamespace(curatorFramework.getNamespace() + str);
        this.checkpointsInZooKeeper = new ZooKeeperStateHandleStore<>(this.client, retrievableStateStorageHelper, executor);
        this.checkpointStateHandles = new ArrayDeque<>(i + 1);
        LOG.info("Initialized in '{}'.", str);
    }

    @Override // org.apache.flink.runtime.checkpoint.CompletedCheckpointStore
    public void recover() throws Exception {
        List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> allSortedByName;
        LOG.info("Recovering checkpoints from ZooKeeper.");
        this.checkpointStateHandles.clear();
        while (true) {
            try {
                allSortedByName = this.checkpointsInZooKeeper.getAllSortedByName();
                break;
            } catch (ConcurrentModificationException e) {
                LOG.warn("Concurrent modification while reading from ZooKeeper. Retrying.");
            }
        }
        int size = allSortedByName.size();
        LOG.info("Found {} checkpoints in ZooKeeper.", Integer.valueOf(size));
        if (size > 0) {
            Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> tuple2 = allSortedByName.get(size - 1);
            long pathToCheckpointId = pathToCheckpointId((String) tuple2.f1);
            LOG.info("Trying to retrieve checkpoint {}.", Long.valueOf(pathToCheckpointId));
            try {
                CompletedCheckpoint completedCheckpoint = (CompletedCheckpoint) ((RetrievableStateHandle) tuple2.f0).retrieveState();
                this.checkpointStateHandles.add(tuple2);
                LOG.info("Initialized with {}. Removing all older checkpoints.", completedCheckpoint);
                for (int i = 0; i < size - 1; i++) {
                    try {
                        removeSubsumed(allSortedByName.get(i));
                    } catch (Exception e2) {
                        LOG.error("Failed to discard checkpoint", e2);
                    }
                }
            } catch (Exception e3) {
                throw new Exception("Could not retrieve the completed checkpoint " + pathToCheckpointId + " from the state storage.", e3);
            }
        }
    }

    @Override // org.apache.flink.runtime.checkpoint.CompletedCheckpointStore
    public void addCheckpoint(CompletedCheckpoint completedCheckpoint) throws Exception {
        Preconditions.checkNotNull(completedCheckpoint, "Checkpoint");
        String checkpointIdToPath = checkpointIdToPath(completedCheckpoint.getCheckpointID());
        this.checkpointStateHandles.addLast(new Tuple2<>(this.checkpointsInZooKeeper.add(checkpointIdToPath, completedCheckpoint), checkpointIdToPath));
        if (this.checkpointStateHandles.size() > this.maxNumberOfCheckpointsToRetain) {
            removeSubsumed(this.checkpointStateHandles.removeFirst());
        }
        LOG.debug("Added {} to {}.", completedCheckpoint, checkpointIdToPath);
    }

    @Override // org.apache.flink.runtime.checkpoint.CompletedCheckpointStore
    public CompletedCheckpoint getLatestCheckpoint() throws Exception {
        if (this.checkpointStateHandles.isEmpty()) {
            return null;
        }
        return (CompletedCheckpoint) ((RetrievableStateHandle) this.checkpointStateHandles.getLast().f0).retrieveState();
    }

    @Override // org.apache.flink.runtime.checkpoint.CompletedCheckpointStore
    public List<CompletedCheckpoint> getAllCheckpoints() throws Exception {
        ArrayList arrayList = new ArrayList(this.checkpointStateHandles.size());
        Iterator<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> it = this.checkpointStateHandles.iterator();
        while (it.hasNext()) {
            arrayList.add(((RetrievableStateHandle) it.next().f0).retrieveState());
        }
        return arrayList;
    }

    @Override // org.apache.flink.runtime.checkpoint.CompletedCheckpointStore
    public int getNumberOfRetainedCheckpoints() {
        return this.checkpointStateHandles.size();
    }

    @Override // org.apache.flink.runtime.checkpoint.CompletedCheckpointStore
    public void shutdown(JobStatus jobStatus) throws Exception {
        if (!jobStatus.isGloballyTerminalState()) {
            LOG.info("Suspending");
            this.checkpointStateHandles.clear();
            return;
        }
        LOG.info("Shutting down");
        Iterator<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> it = this.checkpointStateHandles.iterator();
        while (it.hasNext()) {
            try {
                removeShutdown(it.next(), jobStatus);
            } catch (Exception e) {
                LOG.error("Failed to discard checkpoint.", e);
            }
        }
        this.checkpointStateHandles.clear();
        String str = ZKPaths.PATH_SEPARATOR + this.client.getNamespace();
        LOG.info("Removing {} from ZooKeeper", str);
        ZKPaths.deleteChildren(this.client.getZookeeperClient().getZooKeeper(), str, true);
    }

    private void removeSubsumed(final Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> tuple2) throws Exception {
        remove(tuple2, new Callable<Void>() { // from class: org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore.1
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Void call() throws Exception {
                ((CompletedCheckpoint) ((RetrievableStateHandle) tuple2.f0).retrieveState()).subsume();
                return null;
            }
        });
    }

    private void removeShutdown(final Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> tuple2, final JobStatus jobStatus) throws Exception {
        remove(tuple2, new Callable<Void>() { // from class: org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore.2
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Void call() throws Exception {
                ((CompletedCheckpoint) ((RetrievableStateHandle) tuple2.f0).retrieveState()).discard(jobStatus);
                return null;
            }
        });
    }

    private void remove(final Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> tuple2, final Callable<Void> callable) throws Exception {
        this.checkpointsInZooKeeper.remove((String) tuple2.f1, new BackgroundCallback() { // from class: org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore.3
            @Override // org.apache.flink.shaded.org.apache.curator.framework.api.BackgroundCallback
            public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
                long pathToCheckpointId = ZooKeeperCompletedCheckpointStore.pathToCheckpointId((String) tuple2.f1);
                try {
                    if (curatorEvent.getType() != CuratorEventType.DELETE) {
                        throw new IllegalStateException("Unexpected event type " + curatorEvent.getType() + " in '" + curatorEvent + "' callback.");
                    }
                    if (curatorEvent.getResultCode() != 0) {
                        throw new IllegalStateException("Unexpected result code " + curatorEvent.getResultCode() + " in '" + curatorEvent + "' callback.");
                    }
                    Exception exc = null;
                    try {
                        callable.call();
                    } catch (Exception e) {
                        exc = new Exception("Could not execute callable action for checkpoint " + pathToCheckpointId + '.', e);
                    }
                    try {
                        ((RetrievableStateHandle) tuple2.f0).discardState();
                    } catch (Exception e2) {
                        Exception exc2 = new Exception("Could not discard meta data for completed checkpoint " + pathToCheckpointId + '.', e2);
                        if (exc == null) {
                            exc = exc2;
                        } else {
                            exc.addSuppressed(exc2);
                        }
                    }
                    if (exc != null) {
                        throw exc;
                    }
                } catch (Exception e3) {
                    ZooKeeperCompletedCheckpointStore.LOG.warn("Failed to discard checkpoint {}.", Long.valueOf(pathToCheckpointId), e3);
                }
            }
        });
    }

    protected static String checkpointIdToPath(long j) {
        return String.format("/%s", Long.valueOf(j));
    }

    protected static long pathToCheckpointId(String str) {
        try {
            return Long.parseLong('/' == str.charAt(0) ? str.substring(1) : str);
        } catch (NumberFormatException e) {
            LOG.warn("Could not parse checkpoint id from {}. This indicates that the checkpoint id to path conversion has changed.", str);
            return -1L;
        }
    }
}
