package org.apache.flink.runtime.jobmaster.slotpool;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.clusterframework.types.SlotProfile;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.instance.SlotSharingGroupId;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
import org.apache.flink.runtime.jobmanager.scheduler.Locality;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.SlotOwner;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.SlotRequest;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.util.clock.Clock;
import org.apache.flink.runtime.util.clock.SystemClock;
import org.apache.flink.types.SerializableOptional;
import org.apache.flink.util.AbstractID;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/runtime/jobmaster/slotpool/SlotPool.class */
public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedSlotActions {
    private static final int STATUS_LOG_INTERVAL_MS = 60000;
    private final JobID jobId;
    private final SchedulingStrategy schedulingStrategy;
    private final ProviderAndOwner providerAndOwner;
    private final HashSet<ResourceID> registeredTaskManagers;
    private final AllocatedSlots allocatedSlots;
    private final AvailableSlots availableSlots;
    private final DualKeyMap<SlotRequestId, AllocationID, PendingRequest> pendingRequests;
    private final HashMap<SlotRequestId, PendingRequest> waitingForResourceManager;
    private final Time rpcTimeout;
    private final Time idleSlotTimeout;
    private final Clock clock;
    protected final Map<SlotSharingGroupId, SlotSharingManager> slotSharingManagers;
    private JobMasterId jobMasterId;
    private ResourceManagerGateway resourceManagerGateway;
    private String jobManagerAddress;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/runtime/jobmaster/slotpool/SlotPool$AllocatedSlots.class */
    public static class AllocatedSlots {
        private final Map<ResourceID, Set<AllocatedSlot>> allocatedSlotsByTaskManager = new HashMap(16);
        private final DualKeyMap<AllocationID, SlotRequestId, AllocatedSlot> allocatedSlotsById = new DualKeyMap<>(16);

        AllocatedSlots() {
        }

        void add(SlotRequestId slotRequestId, AllocatedSlot allocatedSlot) {
            this.allocatedSlotsById.put(allocatedSlot.getAllocationId(), slotRequestId, allocatedSlot);
            this.allocatedSlotsByTaskManager.computeIfAbsent(allocatedSlot.getTaskManagerLocation().getResourceID(), resourceID -> {
                return new HashSet(4);
            }).add(allocatedSlot);
        }

        AllocatedSlot get(AllocationID allocationID) {
            return this.allocatedSlotsById.getKeyA(allocationID);
        }

        AllocatedSlot get(SlotRequestId slotRequestId) {
            return this.allocatedSlotsById.getKeyB(slotRequestId);
        }

        boolean contains(AllocationID allocationID) {
            return this.allocatedSlotsById.containsKeyA(allocationID);
        }

        @Nullable
        AllocatedSlot remove(AllocationID allocationID) {
            AllocatedSlot removeKeyA = this.allocatedSlotsById.removeKeyA(allocationID);
            if (removeKeyA != null) {
                removeAllocatedSlot(removeKeyA);
            }
            return removeKeyA;
        }

        @Nullable
        AllocatedSlot remove(SlotRequestId slotRequestId) {
            AllocatedSlot removeKeyB = this.allocatedSlotsById.removeKeyB(slotRequestId);
            if (removeKeyB != null) {
                removeAllocatedSlot(removeKeyB);
            }
            return removeKeyB;
        }

        private void removeAllocatedSlot(AllocatedSlot allocatedSlot) {
            Preconditions.checkNotNull(allocatedSlot);
            ResourceID resourceID = allocatedSlot.getTaskManagerLocation().getResourceID();
            Set<AllocatedSlot> set = this.allocatedSlotsByTaskManager.get(resourceID);
            set.remove(allocatedSlot);
            if (set.isEmpty()) {
                this.allocatedSlotsByTaskManager.remove(resourceID);
            }
        }

        Set<AllocatedSlot> removeSlotsForTaskManager(ResourceID resourceID) {
            Set<AllocatedSlot> remove = this.allocatedSlotsByTaskManager.remove(resourceID);
            if (remove == null) {
                return Collections.emptySet();
            }
            Iterator<AllocatedSlot> it = remove.iterator();
            while (it.hasNext()) {
                this.allocatedSlotsById.removeKeyA(it.next().getAllocationId());
            }
            return remove;
        }

        void clear() {
            this.allocatedSlotsById.clear();
            this.allocatedSlotsByTaskManager.clear();
        }

        String printAllSlots() {
            return this.allocatedSlotsByTaskManager.values().toString();
        }

        @VisibleForTesting
        boolean containResource(ResourceID resourceID) {
            return this.allocatedSlotsByTaskManager.containsKey(resourceID);
        }

        @VisibleForTesting
        int size() {
            return this.allocatedSlotsById.size();
        }

        @VisibleForTesting
        Set<AllocatedSlot> getSlotsForTaskManager(ResourceID resourceID) {
            return this.allocatedSlotsByTaskManager.getOrDefault(resourceID, Collections.emptySet());
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/flink/runtime/jobmaster/slotpool/SlotPool$AvailableSlots.class */
    public static class AvailableSlots {
        private final HashMap<ResourceID, Set<AllocatedSlot>> availableSlotsByTaskManager = new HashMap<>();
        private final HashMap<String, Set<AllocatedSlot>> availableSlotsByHost = new HashMap<>();
        private final HashMap<AllocationID, SlotAndTimestamp> availableSlots = new HashMap<>();

        AvailableSlots() {
        }

        void add(AllocatedSlot allocatedSlot, long j) {
            Preconditions.checkNotNull(allocatedSlot);
            if (this.availableSlots.put(allocatedSlot.getAllocationId(), new SlotAndTimestamp(allocatedSlot, j)) != null) {
                throw new IllegalStateException("slot already contained");
            }
            ResourceID resourceID = allocatedSlot.getTaskManagerLocation().getResourceID();
            String fQDNHostname = allocatedSlot.getTaskManagerLocation().getFQDNHostname();
            this.availableSlotsByTaskManager.computeIfAbsent(resourceID, resourceID2 -> {
                return new HashSet();
            }).add(allocatedSlot);
            this.availableSlotsByHost.computeIfAbsent(fQDNHostname, str -> {
                return new HashSet();
            }).add(allocatedSlot);
        }

        boolean contains(AllocationID allocationID) {
            return this.availableSlots.containsKey(allocationID);
        }

        AllocatedSlot get(AllocationID allocationID) {
            SlotAndTimestamp slotAndTimestamp = this.availableSlots.get(allocationID);
            if (slotAndTimestamp != null) {
                return slotAndTimestamp.slot();
            }
            return null;
        }

        SlotAndLocality poll(SchedulingStrategy schedulingStrategy, SlotProfile slotProfile) {
            if (this.availableSlots.isEmpty()) {
                return null;
            }
            Collection<SlotAndTimestamp> values = this.availableSlots.values();
            values.getClass();
            SlotAndLocality slotAndLocality = (SlotAndLocality) schedulingStrategy.findMatchWithLocality(slotProfile, values::stream, (v0) -> {
                return v0.slot();
            }, slotAndTimestamp -> {
                return slotAndTimestamp.slot().getResourceProfile().isMatching(slotProfile.getResourceProfile());
            }, (slotAndTimestamp2, locality) -> {
                return new SlotAndLocality(slotAndTimestamp2.slot(), locality);
            });
            if (slotAndLocality != null) {
                remove(slotAndLocality.getSlot().getAllocationId());
            }
            return slotAndLocality;
        }

        Set<AllocatedSlot> removeAllForTaskManager(ResourceID resourceID) {
            Set<AllocatedSlot> remove = this.availableSlotsByTaskManager.remove(resourceID);
            if (remove == null || remove.size() <= 0) {
                return Collections.emptySet();
            }
            String fQDNHostname = remove.iterator().next().getTaskManagerLocation().getFQDNHostname();
            Set<AllocatedSlot> set = this.availableSlotsByHost.get(fQDNHostname);
            for (AllocatedSlot allocatedSlot : remove) {
                this.availableSlots.remove(allocatedSlot.getAllocationId());
                set.remove(allocatedSlot);
            }
            if (set.isEmpty()) {
                this.availableSlotsByHost.remove(fQDNHostname);
            }
            return remove;
        }

        AllocatedSlot tryRemove(AllocationID allocationID) {
            SlotAndTimestamp remove = this.availableSlots.remove(allocationID);
            if (remove == null) {
                return null;
            }
            AllocatedSlot slot = remove.slot();
            ResourceID resourceID = slot.getTaskManagerLocation().getResourceID();
            String fQDNHostname = slot.getTaskManagerLocation().getFQDNHostname();
            Set<AllocatedSlot> set = this.availableSlotsByTaskManager.get(resourceID);
            Set<AllocatedSlot> set2 = this.availableSlotsByHost.get(fQDNHostname);
            set.remove(slot);
            set2.remove(slot);
            if (set.isEmpty()) {
                this.availableSlotsByTaskManager.remove(resourceID);
            }
            if (set2.isEmpty()) {
                this.availableSlotsByHost.remove(fQDNHostname);
            }
            return slot;
        }

        private void remove(AllocationID allocationID) throws IllegalStateException {
            if (tryRemove(allocationID) == null) {
                throw new IllegalStateException("slot not contained");
            }
        }

        String printAllSlots() {
            return this.availableSlots.values().toString();
        }

        @VisibleForTesting
        boolean containsTaskManager(ResourceID resourceID) {
            return this.availableSlotsByTaskManager.containsKey(resourceID);
        }

        @VisibleForTesting
        public int size() {
            return this.availableSlots.size();
        }

        @VisibleForTesting
        void clear() {
            this.availableSlots.clear();
            this.availableSlotsByTaskManager.clear();
            this.availableSlotsByHost.clear();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/jobmaster/slotpool/SlotPool$PendingRequest.class */
    public static class PendingRequest {
        private final SlotRequestId slotRequestId;
        private final ResourceProfile resourceProfile;
        private final CompletableFuture<AllocatedSlot> allocatedSlotFuture = new CompletableFuture<>();

        PendingRequest(SlotRequestId slotRequestId, ResourceProfile resourceProfile) {
            this.slotRequestId = (SlotRequestId) Preconditions.checkNotNull(slotRequestId);
            this.resourceProfile = (ResourceProfile) Preconditions.checkNotNull(resourceProfile);
        }

        public SlotRequestId getSlotRequestId() {
            return this.slotRequestId;
        }

        public CompletableFuture<AllocatedSlot> getAllocatedSlotFuture() {
            return this.allocatedSlotFuture;
        }

        public ResourceProfile getResourceProfile() {
            return this.resourceProfile;
        }

        public String toString() {
            return "PendingRequest{slotRequestId=" + this.slotRequestId + ", resourceProfile=" + this.resourceProfile + ", allocatedSlotFuture=" + this.allocatedSlotFuture + '}';
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/jobmaster/slotpool/SlotPool$ProviderAndOwner.class */
    public static class ProviderAndOwner implements SlotOwner, SlotProvider {
        private final SlotPoolGateway gateway;
        private final boolean requiresPreviousAllocationsForScheduling;

        ProviderAndOwner(SlotPoolGateway slotPoolGateway, boolean z) {
            this.gateway = slotPoolGateway;
            this.requiresPreviousAllocationsForScheduling = z;
        }

        public boolean requiresPreviousAllocationsForScheduling() {
            return this.requiresPreviousAllocationsForScheduling;
        }

        @Override // org.apache.flink.runtime.jobmaster.SlotOwner
        public CompletableFuture<Boolean> returnAllocatedSlot(LogicalSlot logicalSlot) {
            return this.gateway.releaseSlot(logicalSlot.getSlotRequestId(), logicalSlot.getSlotSharingGroupId(), new FlinkException("Slot is being returned to the SlotPool.")).thenApply(acknowledge -> {
                return true;
            });
        }

        @Override // org.apache.flink.runtime.jobmaster.slotpool.SlotProvider
        public CompletableFuture<LogicalSlot> allocateSlot(SlotRequestId slotRequestId, ScheduledUnit scheduledUnit, boolean z, SlotProfile slotProfile, Time time) {
            CompletableFuture<LogicalSlot> allocateSlot = this.gateway.allocateSlot(slotRequestId, scheduledUnit, slotProfile, z, time);
            allocateSlot.whenComplete((logicalSlot, th) -> {
                if (th != null) {
                    this.gateway.releaseSlot(slotRequestId, scheduledUnit.getSlotSharingGroupId(), th);
                }
            });
            return allocateSlot;
        }

        @Override // org.apache.flink.runtime.jobmaster.slotpool.SlotProvider
        public CompletableFuture<Acknowledge> cancelSlotRequest(SlotRequestId slotRequestId, @Nullable SlotSharingGroupId slotSharingGroupId, Throwable th) {
            return this.gateway.releaseSlot(slotRequestId, slotSharingGroupId, th);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/jobmaster/slotpool/SlotPool$SlotAndTimestamp.class */
    public static class SlotAndTimestamp {
        private final AllocatedSlot slot;
        private final long timestamp;

        SlotAndTimestamp(AllocatedSlot allocatedSlot, long j) {
            this.slot = allocatedSlot;
            this.timestamp = j;
        }

        public AllocatedSlot slot() {
            return this.slot;
        }

        public long timestamp() {
            return this.timestamp;
        }

        public String toString() {
            return this.slot + " @ " + this.timestamp;
        }
    }

    @VisibleForTesting
    protected SlotPool(RpcService rpcService, JobID jobID, SchedulingStrategy schedulingStrategy) {
        this(rpcService, jobID, schedulingStrategy, SystemClock.getInstance(), AkkaUtils.getDefaultTimeout(), Time.milliseconds(((Long) JobManagerOptions.SLOT_IDLE_TIMEOUT.defaultValue()).longValue()));
    }

    public SlotPool(RpcService rpcService, JobID jobID, SchedulingStrategy schedulingStrategy, Clock clock, Time time, Time time2) {
        super(rpcService);
        this.jobId = (JobID) Preconditions.checkNotNull(jobID);
        this.schedulingStrategy = (SchedulingStrategy) Preconditions.checkNotNull(schedulingStrategy);
        this.clock = (Clock) Preconditions.checkNotNull(clock);
        this.rpcTimeout = (Time) Preconditions.checkNotNull(time);
        this.idleSlotTimeout = (Time) Preconditions.checkNotNull(time2);
        this.registeredTaskManagers = new HashSet<>(16);
        this.allocatedSlots = new AllocatedSlots();
        this.availableSlots = new AvailableSlots();
        this.pendingRequests = new DualKeyMap<>(16);
        this.waitingForResourceManager = new HashMap<>(16);
        this.providerAndOwner = new ProviderAndOwner((SlotPoolGateway) getSelfGateway(SlotPoolGateway.class), schedulingStrategy instanceof PreviousAllocationSchedulingStrategy);
        this.slotSharingManagers = new HashMap(4);
        this.jobMasterId = null;
        this.resourceManagerGateway = null;
        this.jobManagerAddress = null;
    }

    @Override // org.apache.flink.runtime.rpc.RpcEndpoint
    public void start() {
        throw new UnsupportedOperationException("Should never call start() without leader ID");
    }

    public void start(JobMasterId jobMasterId, String str) throws Exception {
        this.jobMasterId = (JobMasterId) Preconditions.checkNotNull(jobMasterId);
        this.jobManagerAddress = (String) Preconditions.checkNotNull(str);
        try {
            super.start();
            scheduleRunAsync(this::checkIdleSlot, this.idleSlotTimeout);
            if (this.log.isDebugEnabled()) {
                scheduleRunAsync(this::scheduledLogStatus, 60000L, TimeUnit.MILLISECONDS);
            }
        } catch (Exception e) {
            throw new RuntimeException("This should never happen", e);
        }
    }

    @Override // org.apache.flink.runtime.rpc.RpcEndpoint
    public CompletableFuture<Void> postStop() {
        this.log.info("Stopping SlotPool.");
        Iterator<AllocationID> it = this.pendingRequests.keySetB().iterator();
        while (it.hasNext()) {
            this.resourceManagerGateway.cancelSlotRequest(it.next());
        }
        Iterator<ResourceID> it2 = this.registeredTaskManagers.iterator();
        while (it2.hasNext()) {
            ResourceID next = it2.next();
            releaseTaskManagerInternal(next, new FlinkException("Releasing TaskManager " + next + ", because of stopping of SlotPool"));
        }
        clear();
        return CompletableFuture.completedFuture(null);
    }

    @Override // org.apache.flink.runtime.jobmaster.slotpool.SlotPoolGateway
    public void suspend() {
        this.log.info("Suspending SlotPool.");
        validateRunsInMainThread();
        Iterator<AllocationID> it = this.pendingRequests.keySetB().iterator();
        while (it.hasNext()) {
            this.resourceManagerGateway.cancelSlotRequest(it.next());
        }
        stop();
        this.jobMasterId = null;
        this.resourceManagerGateway = null;
        clear();
    }

    public SlotOwner getSlotOwner() {
        return this.providerAndOwner;
    }

    public SlotProvider getSlotProvider() {
        return this.providerAndOwner;
    }

    @Override // org.apache.flink.runtime.jobmaster.slotpool.SlotPoolGateway
    public void connectToResourceManager(ResourceManagerGateway resourceManagerGateway) {
        this.resourceManagerGateway = (ResourceManagerGateway) Preconditions.checkNotNull(resourceManagerGateway);
        Iterator<PendingRequest> it = this.waitingForResourceManager.values().iterator();
        while (it.hasNext()) {
            requestSlotFromResourceManager(resourceManagerGateway, it.next());
        }
        this.waitingForResourceManager.clear();
    }

    @Override // org.apache.flink.runtime.jobmaster.slotpool.SlotPoolGateway
    public void disconnectResourceManager() {
        this.resourceManagerGateway = null;
    }

    @Override // org.apache.flink.runtime.jobmaster.slotpool.SlotPoolGateway
    public CompletableFuture<LogicalSlot> allocateSlot(SlotRequestId slotRequestId, ScheduledUnit scheduledUnit, SlotProfile slotProfile, boolean z, Time time) {
        this.log.debug("Received slot request [{}] for task: {}", slotRequestId, scheduledUnit.getTaskToExecute());
        return scheduledUnit.getSlotSharingGroupId() == null ? allocateSingleSlot(slotRequestId, slotProfile, z, time) : allocateSharedSlot(slotRequestId, scheduledUnit, slotProfile, z, time);
    }

    private CompletableFuture<LogicalSlot> allocateSingleSlot(SlotRequestId slotRequestId, SlotProfile slotProfile, boolean z, Time time) {
        return requestAllocatedSlot(slotRequestId, slotProfile, z, time).thenApply(slotAndLocality -> {
            AllocatedSlot slot = slotAndLocality.getSlot();
            SingleLogicalSlot singleLogicalSlot = new SingleLogicalSlot(slotRequestId, slot, null, slotAndLocality.getLocality(), this.providerAndOwner);
            if (slot.tryAssignPayload(singleLogicalSlot)) {
                return singleLogicalSlot;
            }
            FlinkException flinkException = new FlinkException("Could not assign payload to allocated slot " + slot.getAllocationId() + '.');
            releaseSingleSlot(slotRequestId, flinkException);
            throw new CompletionException((Throwable) flinkException);
        });
    }

    private CompletableFuture<LogicalSlot> allocateSharedSlot(SlotRequestId slotRequestId, ScheduledUnit scheduledUnit, SlotProfile slotProfile, boolean z, Time time) {
        SlotSharingManager computeIfAbsent = this.slotSharingManagers.computeIfAbsent(scheduledUnit.getSlotSharingGroupId(), slotSharingGroupId -> {
            return new SlotSharingManager(slotSharingGroupId, this, this.providerAndOwner);
        });
        try {
            SlotSharingManager.MultiTaskSlotLocality allocateCoLocatedMultiTaskSlot = scheduledUnit.getCoLocationConstraint() != null ? allocateCoLocatedMultiTaskSlot(scheduledUnit.getCoLocationConstraint(), computeIfAbsent, slotProfile, z, time) : allocateMultiTaskSlot(scheduledUnit.getJobVertexId(), computeIfAbsent, slotProfile, z, time);
            Preconditions.checkState(!allocateCoLocatedMultiTaskSlot.getMultiTaskSlot().contains(scheduledUnit.getJobVertexId()));
            return allocateCoLocatedMultiTaskSlot.getMultiTaskSlot().allocateSingleTaskSlot(slotRequestId, scheduledUnit.getJobVertexId(), allocateCoLocatedMultiTaskSlot.getLocality()).getLogicalSlotFuture();
        } catch (NoResourceAvailableException e) {
            return FutureUtils.completedExceptionally(e);
        }
    }

    private SlotSharingManager.MultiTaskSlotLocality allocateCoLocatedMultiTaskSlot(CoLocationConstraint coLocationConstraint, SlotSharingManager slotSharingManager, SlotProfile slotProfile, boolean z, Time time) throws NoResourceAvailableException {
        SlotRequestId slotRequestId = coLocationConstraint.getSlotRequestId();
        if (slotRequestId != null) {
            SlotSharingManager.TaskSlot taskSlot = slotSharingManager.getTaskSlot(slotRequestId);
            if (taskSlot != null) {
                Preconditions.checkState(taskSlot instanceof SlotSharingManager.MultiTaskSlot);
                return SlotSharingManager.MultiTaskSlotLocality.of((SlotSharingManager.MultiTaskSlot) taskSlot, Locality.LOCAL);
            }
            coLocationConstraint.setSlotRequestId(null);
        }
        if (coLocationConstraint.isAssigned()) {
            slotProfile = new SlotProfile(slotProfile.getResourceProfile(), Collections.singleton(coLocationConstraint.getLocation()), slotProfile.getPreferredAllocations());
        }
        SlotSharingManager.MultiTaskSlotLocality allocateMultiTaskSlot = allocateMultiTaskSlot(coLocationConstraint.getGroupId(), slotSharingManager, slotProfile, z, time);
        if (coLocationConstraint.isAssigned() && allocateMultiTaskSlot.getLocality() != Locality.LOCAL) {
            allocateMultiTaskSlot.getMultiTaskSlot().release(new FlinkException("Multi task slot is not local and, thus, does not fulfill the co-location constraint."));
            throw new NoResourceAvailableException("Could not allocate a local multi task slot for the co location constraint " + coLocationConstraint + '.');
        }
        SlotRequestId slotRequestId2 = new SlotRequestId();
        SlotSharingManager.MultiTaskSlot allocateMultiTaskSlot2 = allocateMultiTaskSlot.getMultiTaskSlot().allocateMultiTaskSlot(slotRequestId2, coLocationConstraint.getGroupId());
        coLocationConstraint.setSlotRequestId(slotRequestId2);
        allocateMultiTaskSlot2.getSlotContextFuture().whenComplete((slotContext, th) -> {
            if (th != null) {
                this.log.debug("Failed to lock colocation constraint {} because the slot allocation for slot request {} failed.", new Object[]{coLocationConstraint.getGroupId(), coLocationConstraint.getSlotRequestId(), th});
            } else if (Objects.equals(coLocationConstraint.getSlotRequestId(), slotRequestId2)) {
                coLocationConstraint.lockLocation(slotContext.getTaskManagerLocation());
            } else {
                this.log.debug("Failed to lock colocation constraint {} because assigned slot request {} differs from fulfilled slot request {}.", new Object[]{coLocationConstraint.getGroupId(), coLocationConstraint.getSlotRequestId(), slotRequestId2});
            }
        });
        return SlotSharingManager.MultiTaskSlotLocality.of(allocateMultiTaskSlot2, allocateMultiTaskSlot.getLocality());
    }

    private SlotSharingManager.MultiTaskSlotLocality allocateMultiTaskSlot(AbstractID abstractID, SlotSharingManager slotSharingManager, SlotProfile slotProfile, boolean z, Time time) throws NoResourceAvailableException {
        SlotSharingManager.MultiTaskSlotLocality resolvedRootSlot = slotSharingManager.getResolvedRootSlot(abstractID, this.schedulingStrategy, slotProfile);
        if (resolvedRootSlot != null && resolvedRootSlot.getLocality() == Locality.LOCAL) {
            return resolvedRootSlot;
        }
        SlotRequestId slotRequestId = new SlotRequestId();
        SlotRequestId slotRequestId2 = new SlotRequestId();
        SlotAndLocality pollAndAllocateSlot = pollAndAllocateSlot(slotRequestId, slotProfile);
        if (pollAndAllocateSlot != null && (pollAndAllocateSlot.getLocality() == Locality.LOCAL || resolvedRootSlot == null)) {
            AllocatedSlot slot = pollAndAllocateSlot.getSlot();
            SlotSharingManager.MultiTaskSlot createRootSlot = slotSharingManager.createRootSlot(slotRequestId2, CompletableFuture.completedFuture(pollAndAllocateSlot.getSlot()), slotRequestId);
            if (slot.tryAssignPayload(createRootSlot)) {
                return SlotSharingManager.MultiTaskSlotLocality.of(createRootSlot, pollAndAllocateSlot.getLocality());
            }
            createRootSlot.release(new FlinkException("Could not assign payload to allocated slot " + slot.getAllocationId() + '.'));
        }
        if (resolvedRootSlot != null) {
            if (pollAndAllocateSlot != null) {
                releaseSingleSlot(slotRequestId, new FlinkException("Locality constraint is not better fulfilled by allocated slot."));
            }
            return resolvedRootSlot;
        }
        if (!z) {
            throw new NoResourceAvailableException("Could not allocate a shared slot for " + abstractID + '.');
        }
        SlotSharingManager.MultiTaskSlot unresolvedRootSlot = slotSharingManager.getUnresolvedRootSlot(abstractID);
        if (unresolvedRootSlot == null) {
            CompletableFuture<AllocatedSlot> requestNewAllocatedSlot = requestNewAllocatedSlot(slotRequestId, slotProfile.getResourceProfile(), time);
            unresolvedRootSlot = slotSharingManager.createRootSlot(slotRequestId2, requestNewAllocatedSlot, slotRequestId);
            requestNewAllocatedSlot.whenComplete((allocatedSlot, th) -> {
                SlotSharingManager.TaskSlot taskSlot = slotSharingManager.getTaskSlot(slotRequestId2);
                if (taskSlot == null) {
                    releaseSingleSlot(slotRequestId, new FlinkException("Could not find task slot with " + slotRequestId2 + '.'));
                    return;
                }
                if (!(taskSlot instanceof SlotSharingManager.MultiTaskSlot) || th != null) {
                    taskSlot.release(th);
                } else {
                    if (allocatedSlot.tryAssignPayload((SlotSharingManager.MultiTaskSlot) taskSlot)) {
                        return;
                    }
                    taskSlot.release(new FlinkException("Could not assign payload to allocated slot " + allocatedSlot.getAllocationId() + '.'));
                }
            });
        }
        return SlotSharingManager.MultiTaskSlotLocality.of(unresolvedRootSlot, Locality.UNKNOWN);
    }

    private CompletableFuture<SlotAndLocality> requestAllocatedSlot(SlotRequestId slotRequestId, SlotProfile slotProfile, boolean z, Time time) {
        SlotAndLocality pollAndAllocateSlot = pollAndAllocateSlot(slotRequestId, slotProfile);
        return pollAndAllocateSlot != null ? CompletableFuture.completedFuture(pollAndAllocateSlot) : z ? requestNewAllocatedSlot(slotRequestId, slotProfile.getResourceProfile(), time).thenApply(allocatedSlot -> {
            return new SlotAndLocality(allocatedSlot, Locality.UNKNOWN);
        }) : FutureUtils.completedExceptionally(new NoResourceAvailableException("Could not allocate a simple slot for " + slotRequestId + '.'));
    }

    private CompletableFuture<AllocatedSlot> requestNewAllocatedSlot(SlotRequestId slotRequestId, ResourceProfile resourceProfile, Time time) {
        PendingRequest pendingRequest = new PendingRequest(slotRequestId, resourceProfile);
        FutureUtils.orTimeout(pendingRequest.getAllocatedSlotFuture(), time.toMilliseconds(), TimeUnit.MILLISECONDS).whenCompleteAsync((allocatedSlot, th) -> {
            if (th instanceof TimeoutException) {
                timeoutPendingSlotRequest(slotRequestId);
            }
        }, (Executor) getMainThreadExecutor());
        if (this.resourceManagerGateway == null) {
            stashRequestWaitingForResourceManager(pendingRequest);
        } else {
            requestSlotFromResourceManager(this.resourceManagerGateway, pendingRequest);
        }
        return pendingRequest.getAllocatedSlotFuture();
    }

    private void requestSlotFromResourceManager(ResourceManagerGateway resourceManagerGateway, PendingRequest pendingRequest) {
        Preconditions.checkNotNull(resourceManagerGateway);
        Preconditions.checkNotNull(pendingRequest);
        this.log.info("Requesting new slot [{}] and profile {} from resource manager.", pendingRequest.getSlotRequestId(), pendingRequest.getResourceProfile());
        AllocationID allocationID = new AllocationID();
        this.pendingRequests.put(pendingRequest.getSlotRequestId(), allocationID, pendingRequest);
        pendingRequest.getAllocatedSlotFuture().whenComplete((allocatedSlot, th) -> {
            if (th == null && allocationID.equals(allocatedSlot.getAllocationId())) {
                return;
            }
            resourceManagerGateway.cancelSlotRequest(allocationID);
        });
        resourceManagerGateway.requestSlot(this.jobMasterId, new SlotRequest(this.jobId, allocationID, pendingRequest.getResourceProfile(), this.jobManagerAddress), this.rpcTimeout).whenCompleteAsync((acknowledge, th2) -> {
            if (th2 != null) {
                slotRequestToResourceManagerFailed(pendingRequest.getSlotRequestId(), th2);
            }
        }, (Executor) getMainThreadExecutor());
    }

    private void slotRequestToResourceManagerFailed(SlotRequestId slotRequestId, Throwable th) {
        PendingRequest removeKeyA = this.pendingRequests.removeKeyA(slotRequestId);
        if (removeKeyA != null) {
            removeKeyA.getAllocatedSlotFuture().completeExceptionally(new NoResourceAvailableException("No pooled slot available and request to ResourceManager for new slot failed", th));
        } else if (this.log.isDebugEnabled()) {
            this.log.debug("Unregistered slot request [{}] failed.", slotRequestId, th);
        }
    }

    private void stashRequestWaitingForResourceManager(PendingRequest pendingRequest) {
        this.log.info("Cannot serve slot request, no ResourceManager connected. Adding as pending request [{}]", pendingRequest.getSlotRequestId());
        this.waitingForResourceManager.put(pendingRequest.getSlotRequestId(), pendingRequest);
    }

    @Override // org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlotActions
    public CompletableFuture<Acknowledge> releaseSlot(SlotRequestId slotRequestId, @Nullable SlotSharingGroupId slotSharingGroupId, Throwable th) {
        this.log.debug("Releasing slot [{}] because: {}", slotRequestId, th != null ? th.getMessage() : "null");
        if (slotSharingGroupId != null) {
            releaseSharedSlot(slotRequestId, slotSharingGroupId, th);
        } else {
            releaseSingleSlot(slotRequestId, th);
        }
        return CompletableFuture.completedFuture(Acknowledge.get());
    }

    private void releaseSharedSlot(SlotRequestId slotRequestId, @Nonnull SlotSharingGroupId slotSharingGroupId, Throwable th) {
        SlotSharingManager slotSharingManager = this.slotSharingManagers.get(slotSharingGroupId);
        if (slotSharingManager == null) {
            this.log.debug("Could not find slot sharing group {}. Ignoring release slot request.", slotSharingGroupId);
            return;
        }
        SlotSharingManager.TaskSlot taskSlot = slotSharingManager.getTaskSlot(slotRequestId);
        if (taskSlot != null) {
            taskSlot.release(th);
        } else {
            this.log.debug("Could not find slot [{}] in slot sharing group {}. Ignoring release slot request.", slotRequestId, slotSharingGroupId);
        }
    }

    private void releaseSingleSlot(SlotRequestId slotRequestId, Throwable th) {
        PendingRequest removePendingRequest = removePendingRequest(slotRequestId);
        if (removePendingRequest != null) {
            failPendingRequest(removePendingRequest, new FlinkException("Pending slot request with " + slotRequestId + " has been released."));
            return;
        }
        AllocatedSlot remove = this.allocatedSlots.remove(slotRequestId);
        if (remove == null) {
            this.log.debug("There is no allocated slot [{}]. Ignoring the release slot request.", slotRequestId);
        } else {
            remove.releasePayload(th);
            tryFulfillSlotRequestOrMakeAvailable(remove);
        }
    }

    @Nullable
    private PendingRequest removePendingRequest(SlotRequestId slotRequestId) {
        PendingRequest remove = this.waitingForResourceManager.remove(slotRequestId);
        if (remove == null) {
            return this.pendingRequests.removeKeyA(slotRequestId);
        }
        if ($assertionsDisabled || !this.pendingRequests.containsKeyA(slotRequestId)) {
            return remove;
        }
        throw new AssertionError("A pending requests should only be part of either the pendingRequests or waitingForResourceManager but not both.");
    }

    private void failPendingRequest(PendingRequest pendingRequest, Exception exc) {
        Preconditions.checkNotNull(pendingRequest);
        Preconditions.checkNotNull(exc);
        if (pendingRequest.getAllocatedSlotFuture().isDone()) {
            return;
        }
        this.log.info("Failing pending slot request [{}]: {}", pendingRequest.getSlotRequestId(), exc.getMessage());
        pendingRequest.getAllocatedSlotFuture().completeExceptionally(exc);
    }

    @Nullable
    private SlotAndLocality pollAndAllocateSlot(SlotRequestId slotRequestId, SlotProfile slotProfile) {
        SlotAndLocality poll = this.availableSlots.poll(this.schedulingStrategy, slotProfile);
        if (poll != null) {
            this.allocatedSlots.add(slotRequestId, poll.getSlot());
        }
        return poll;
    }

    private void tryFulfillSlotRequestOrMakeAvailable(AllocatedSlot allocatedSlot) {
        Preconditions.checkState(!allocatedSlot.isUsed(), "Provided slot is still in use.");
        PendingRequest pollMatchingPendingRequest = pollMatchingPendingRequest(allocatedSlot);
        if (pollMatchingPendingRequest == null) {
            this.log.debug("Adding returned slot [{}] to available slots", allocatedSlot.getAllocationId());
            this.availableSlots.add(allocatedSlot, this.clock.relativeTimeMillis());
        } else {
            this.log.debug("Fulfilling pending slot request [{}] early with returned slot [{}]", pollMatchingPendingRequest.getSlotRequestId(), allocatedSlot.getAllocationId());
            this.allocatedSlots.add(pollMatchingPendingRequest.getSlotRequestId(), allocatedSlot);
            pollMatchingPendingRequest.getAllocatedSlotFuture().complete(allocatedSlot);
        }
    }

    private PendingRequest pollMatchingPendingRequest(AllocatedSlot allocatedSlot) {
        ResourceProfile resourceProfile = allocatedSlot.getResourceProfile();
        for (PendingRequest pendingRequest : this.pendingRequests.values()) {
            if (resourceProfile.isMatching(pendingRequest.getResourceProfile())) {
                this.pendingRequests.removeKeyA(pendingRequest.getSlotRequestId());
                return pendingRequest;
            }
        }
        for (PendingRequest pendingRequest2 : this.waitingForResourceManager.values()) {
            if (resourceProfile.isMatching(pendingRequest2.getResourceProfile())) {
                this.waitingForResourceManager.remove(pendingRequest2.getSlotRequestId());
                return pendingRequest2;
            }
        }
        return null;
    }

    @Override // org.apache.flink.runtime.jobmaster.slotpool.SlotPoolGateway
    public CompletableFuture<Collection<SlotOffer>> offerSlots(TaskManagerLocation taskManagerLocation, TaskManagerGateway taskManagerGateway, Collection<SlotOffer> collection) {
        validateRunsInMainThread();
        return FutureUtils.combineAll((List) collection.stream().map(slotOffer -> {
            return offerSlot(taskManagerLocation, taskManagerGateway, slotOffer).thenApply(bool -> {
                return bool.booleanValue() ? Optional.of(slotOffer) : Optional.empty();
            });
        }).collect(Collectors.toList())).thenApply(collection2 -> {
            return (List) collection2.stream().flatMap(optional -> {
                return (Stream) optional.map((v0) -> {
                    return Stream.of(v0);
                }).orElseGet(Stream::empty);
            }).collect(Collectors.toList());
        });
    }

    @Override // org.apache.flink.runtime.jobmaster.slotpool.SlotPoolGateway
    public CompletableFuture<Boolean> offerSlot(TaskManagerLocation taskManagerLocation, TaskManagerGateway taskManagerGateway, SlotOffer slotOffer) {
        validateRunsInMainThread();
        ResourceID resourceID = taskManagerLocation.getResourceID();
        AllocationID allocationId = slotOffer.getAllocationId();
        if (!this.registeredTaskManagers.contains(resourceID)) {
            this.log.debug("Received outdated slot offering [{}] from unregistered TaskManager: {}", slotOffer.getAllocationId(), taskManagerLocation);
            return CompletableFuture.completedFuture(false);
        }
        AllocatedSlot allocatedSlot = this.allocatedSlots.get(allocationId);
        AllocatedSlot allocatedSlot2 = allocatedSlot;
        if (allocatedSlot == null) {
            AllocatedSlot allocatedSlot3 = this.availableSlots.get(allocationId);
            allocatedSlot2 = allocatedSlot3;
            if (allocatedSlot3 == null) {
                AllocatedSlot allocatedSlot4 = new AllocatedSlot(allocationId, taskManagerLocation, slotOffer.getSlotIndex(), slotOffer.getResourceProfile(), taskManagerGateway);
                PendingRequest removeKeyB = this.pendingRequests.removeKeyB(allocationId);
                if (removeKeyB != null) {
                    this.allocatedSlots.add(removeKeyB.getSlotRequestId(), allocatedSlot4);
                    if (removeKeyB.getAllocatedSlotFuture().complete(allocatedSlot4)) {
                        this.log.debug("Fulfilled slot request [{}] with allocated slot [{}].", removeKeyB.getSlotRequestId(), allocationId);
                    } else {
                        this.allocatedSlots.remove(removeKeyB.getSlotRequestId());
                        tryFulfillSlotRequestOrMakeAvailable(allocatedSlot4);
                    }
                } else {
                    tryFulfillSlotRequestOrMakeAvailable(allocatedSlot4);
                }
                return CompletableFuture.completedFuture(true);
            }
        }
        if (!allocatedSlot2.getSlotId().equals(new SlotID(taskManagerLocation.getResourceID(), slotOffer.getSlotIndex()))) {
            return CompletableFuture.completedFuture(false);
        }
        this.log.info("Received repeated offer for slot [{}]. Ignoring.", allocationId);
        return CompletableFuture.completedFuture(true);
    }

    @Override // org.apache.flink.runtime.jobmaster.slotpool.SlotPoolGateway
    public CompletableFuture<SerializableOptional<ResourceID>> failAllocation(AllocationID allocationID, Exception exc) {
        PendingRequest removeKeyB = this.pendingRequests.removeKeyB(allocationID);
        if (removeKeyB == null) {
            return tryFailingAllocatedSlot(allocationID, exc);
        }
        failPendingRequest(removeKeyB, exc);
        return CompletableFuture.completedFuture(SerializableOptional.empty());
    }

    private CompletableFuture<SerializableOptional<ResourceID>> tryFailingAllocatedSlot(AllocationID allocationID, Exception exc) {
        AllocatedSlot tryRemove = this.availableSlots.tryRemove(allocationID);
        if (tryRemove == null) {
            tryRemove = this.allocatedSlots.remove(allocationID);
        }
        if (tryRemove != null) {
            this.log.debug("Failed allocated slot [{}]: {}", allocationID, exc.getMessage());
            tryRemove.getTaskManagerGateway().freeSlot(allocationID, exc, this.rpcTimeout);
            tryRemove.releasePayload(exc);
            ResourceID taskManagerId = tryRemove.getTaskManagerId();
            if (!this.availableSlots.containsTaskManager(taskManagerId) && !this.allocatedSlots.containResource(taskManagerId)) {
                return CompletableFuture.completedFuture(SerializableOptional.of(taskManagerId));
            }
        }
        return CompletableFuture.completedFuture(SerializableOptional.empty());
    }

    @Override // org.apache.flink.runtime.jobmaster.slotpool.SlotPoolGateway
    public CompletableFuture<Acknowledge> registerTaskManager(ResourceID resourceID) {
        this.log.debug("Register new TaskExecutor {}.", resourceID);
        this.registeredTaskManagers.add(resourceID);
        return CompletableFuture.completedFuture(Acknowledge.get());
    }

    @Override // org.apache.flink.runtime.jobmaster.slotpool.SlotPoolGateway
    public CompletableFuture<Acknowledge> releaseTaskManager(ResourceID resourceID, Exception exc) {
        if (this.registeredTaskManagers.remove(resourceID)) {
            releaseTaskManagerInternal(resourceID, exc);
        }
        return CompletableFuture.completedFuture(Acknowledge.get());
    }

    @VisibleForTesting
    protected void timeoutPendingSlotRequest(SlotRequestId slotRequestId) {
        this.log.info("Pending slot request [{}] timed out.", slotRequestId);
        removePendingRequest(slotRequestId);
    }

    private void releaseTaskManagerInternal(ResourceID resourceID, Exception exc) {
        HashSet<AllocatedSlot> hashSet = new HashSet(this.allocatedSlots.removeSlotsForTaskManager(resourceID));
        Iterator it = hashSet.iterator();
        while (it.hasNext()) {
            ((AllocatedSlot) it.next()).releasePayload(exc);
        }
        hashSet.addAll(this.availableSlots.removeAllForTaskManager(resourceID));
        for (AllocatedSlot allocatedSlot : hashSet) {
            allocatedSlot.getTaskManagerGateway().freeSlot(allocatedSlot.getAllocationId(), exc, this.rpcTimeout);
        }
    }

    private void checkIdleSlot() {
        long relativeTimeMillis = this.clock.relativeTimeMillis();
        ArrayList<AllocatedSlot> arrayList = new ArrayList(this.availableSlots.size());
        for (SlotAndTimestamp slotAndTimestamp : this.availableSlots.availableSlots.values()) {
            if (relativeTimeMillis - slotAndTimestamp.timestamp > this.idleSlotTimeout.toMilliseconds()) {
                arrayList.add(slotAndTimestamp.slot);
            }
        }
        Throwable flinkException = new FlinkException("Releasing idle slot.");
        for (AllocatedSlot allocatedSlot : arrayList) {
            AllocationID allocationId = allocatedSlot.getAllocationId();
            if (this.availableSlots.tryRemove(allocationId) != null) {
                this.log.info("Releasing idle slot [{}].", allocationId);
                allocatedSlot.getTaskManagerGateway().freeSlot(allocationId, flinkException, this.rpcTimeout).whenCompleteAsync((acknowledge, th) -> {
                    if (th != null) {
                        if (!this.registeredTaskManagers.contains(allocatedSlot.getTaskManagerId())) {
                            this.log.debug("Releasing slot [{}] failed and owning TaskExecutor {} is no longer registered. Discarding slot.", allocationId, allocatedSlot.getTaskManagerId());
                        } else {
                            this.log.debug("Releasing slot [{}] of registered TaskExecutor {} failed. Trying to fulfill a different slot request.", new Object[]{allocationId, allocatedSlot.getTaskManagerId(), th});
                            tryFulfillSlotRequestOrMakeAvailable(allocatedSlot);
                        }
                    }
                }, (Executor) getMainThreadExecutor());
            }
        }
        scheduleRunAsync(this::checkIdleSlot, this.idleSlotTimeout);
    }

    private void clear() {
        this.availableSlots.clear();
        this.allocatedSlots.clear();
        this.pendingRequests.clear();
        this.waitingForResourceManager.clear();
        this.registeredTaskManagers.clear();
        this.slotSharingManagers.clear();
    }

    private void scheduledLogStatus() {
        this.log.debug(printStatus());
        scheduleRunAsync(this::scheduledLogStatus, 60000L, TimeUnit.MILLISECONDS);
    }

    private String printStatus() {
        validateRunsInMainThread();
        StringBuilder append = new StringBuilder(1024).append("Slot Pool Status:\n");
        append.append("\tstatus: ");
        if (this.resourceManagerGateway != null) {
            append.append("connected to ").append(this.resourceManagerGateway.getAddress()).append('\n');
        } else {
            append.append("unconnected and waiting for ResourceManager ").append(this.waitingForResourceManager).append('\n');
        }
        append.append("\tregistered TaskManagers: ").append(this.registeredTaskManagers).append('\n');
        append.append("\tavailable slots: ").append(this.availableSlots.printAllSlots()).append('\n');
        append.append("\tallocated slots: ").append(this.allocatedSlots.printAllSlots()).append('\n');
        append.append("\tpending requests: ").append(this.pendingRequests.values()).append('\n');
        append.append("\tsharing groups: {\n");
        for (Map.Entry<SlotSharingGroupId, SlotSharingManager> entry : this.slotSharingManagers.entrySet()) {
            append.append("\t -------- ").append(entry.getKey()).append(" --------\n");
            append.append(entry.getValue());
        }
        append.append("\t}\n");
        return append.toString();
    }

    @VisibleForTesting
    protected AllocatedSlots getAllocatedSlots() {
        return this.allocatedSlots;
    }

    @VisibleForTesting
    protected AvailableSlots getAvailableSlots() {
        return this.availableSlots;
    }

    @VisibleForTesting
    DualKeyMap<SlotRequestId, AllocationID, PendingRequest> getPendingRequests() {
        return this.pendingRequests;
    }

    @VisibleForTesting
    Map<SlotRequestId, PendingRequest> getWaitingForResourceManager() {
        return this.waitingForResourceManager;
    }

    @VisibleForTesting
    void triggerCheckIdleSlot() {
        runAsync(this::checkIdleSlot);
    }

    static {
        $assertionsDisabled = !SlotPool.class.desiredAssertionStatus();
    }
}
