package org.apache.flink.streaming.runtime.tasks;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetricsBuilder;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
import org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler;
import org.apache.flink.streaming.runtime.io.InputProcessorUtil;
import org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessorFactory;
import org.apache.flink.streaming.runtime.io.StreamTaskSourceInput;
import org.apache.flink.streaming.runtime.metrics.MinWatermarkGauge;
import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.class */
public class MultipleInputStreamTask<OUT> extends StreamTask<OUT, MultipleInputStreamOperator<OUT>> {
    private static final int MAX_TRACKED_CHECKPOINTS = 100000;
    private final HashMap<Long, CompletableFuture<Boolean>> pendingCheckpointCompletedFutures;

    @Nullable
    private CheckpointBarrierHandler checkpointBarrierHandler;

    public MultipleInputStreamTask(Environment environment) throws Exception {
        super(environment);
        this.pendingCheckpointCompletedFutures = new HashMap<>();
    }

    @Override // org.apache.flink.streaming.runtime.tasks.StreamTask
    public void init() throws Exception {
        StreamConfig configuration = getConfiguration();
        ClassLoader userCodeClassLoader = getUserCodeClassLoader();
        StreamConfig.InputConfig[] inputs = configuration.getInputs(userCodeClassLoader);
        WatermarkGauge[] watermarkGaugeArr = new WatermarkGauge[inputs.length];
        for (int i = 0; i < inputs.length; i++) {
            watermarkGaugeArr[i] = new WatermarkGauge();
            ((MultipleInputStreamOperator) this.mainOperator).getMetricGroup().gauge(MetricNames.currentInputWatermarkName(i + 1), watermarkGaugeArr[i]);
        }
        MinWatermarkGauge minWatermarkGauge = new MinWatermarkGauge(watermarkGaugeArr);
        ((MultipleInputStreamOperator) this.mainOperator).getMetricGroup().gauge("currentInputWatermark", minWatermarkGauge);
        List<StreamEdge> inPhysicalEdges = configuration.getInPhysicalEdges(userCodeClassLoader);
        int numberOfNetworkInputs = configuration.getNumberOfNetworkInputs();
        ArrayList[] arrayListArr = new ArrayList[inputs.length];
        for (int i2 = 0; i2 < arrayListArr.length; i2++) {
            arrayListArr[i2] = new ArrayList();
        }
        for (int i3 = 0; i3 < numberOfNetworkInputs; i3++) {
            arrayListArr[inPhysicalEdges.get(i3).getTypeNumber() - 1].add(getEnvironment().getInputGate(i3));
        }
        ArrayList arrayList = new ArrayList();
        for (ArrayList arrayList2 : arrayListArr) {
            if (!arrayList2.isEmpty()) {
                arrayList.add(arrayList2);
            }
        }
        createInputProcessor((List[]) arrayList.toArray(new ArrayList[0]), inputs, watermarkGaugeArr);
        TaskMetricGroup metricGroup = getEnvironment().getMetricGroup();
        minWatermarkGauge.getClass();
        metricGroup.gauge("currentInputWatermark", minWatermarkGauge::m104getValue);
    }

    protected void createInputProcessor(List<IndexedInputGate>[] listArr, StreamConfig.InputConfig[] inputConfigArr, WatermarkGauge[] watermarkGaugeArr) {
        this.checkpointBarrierHandler = InputProcessorUtil.createCheckpointBarrierHandler(this, getConfiguration(), getCheckpointCoordinator(), getTaskNameWithSubtaskAndId(), listArr, this.operatorChain.getSourceTaskInputs());
        this.inputProcessor = StreamMultipleInputProcessorFactory.create(this, InputProcessorUtil.createCheckpointedMultipleInputGate(this.mainMailboxExecutor, listArr, getEnvironment().getMetricGroup().getIOMetricGroup(), this.checkpointBarrierHandler, this.configuration), inputConfigArr, getEnvironment().getIOManager(), getEnvironment().getMemoryManager(), getEnvironment().getMetricGroup().getIOMetricGroup(), setupNumRecordsInCounter(this.mainOperator), getStreamStatusMaintainer(), (MultipleInputStreamOperator) this.mainOperator, watermarkGaugeArr, getConfiguration(), getTaskConfiguration(), getJobConfiguration(), getExecutionConfig(), getUserCodeClassLoader(), this.operatorChain);
    }

    @Override // org.apache.flink.streaming.runtime.tasks.StreamTask
    public Future<Boolean> triggerCheckpointAsync(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
        CompletableFuture completableFuture = new CompletableFuture();
        this.mainMailboxExecutor.execute(() -> {
            try {
                this.pendingCheckpointCompletedFutures.put(Long.valueOf(checkpointMetaData.getCheckpointId()), completableFuture);
                checkPendingCheckpointCompletedFuturesSize();
                triggerSourcesCheckpoint(new CheckpointBarrier(checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp(), checkpointOptions));
            } catch (Exception e) {
                this.pendingCheckpointCompletedFutures.remove(Long.valueOf(checkpointMetaData.getCheckpointId()));
                completableFuture.completeExceptionally(e);
                throw e;
            }
        }, "checkpoint %s with %s", checkpointMetaData, checkpointOptions);
        return completableFuture;
    }

    private void checkPendingCheckpointCompletedFuturesSize() {
        if (this.pendingCheckpointCompletedFutures.size() > MAX_TRACKED_CHECKPOINTS) {
            ArrayList arrayList = new ArrayList(this.pendingCheckpointCompletedFutures.keySet());
            arrayList.sort((v0, v1) -> {
                return v0.compareTo(v1);
            });
            Iterator it = arrayList.subList(0, arrayList.size() - MAX_TRACKED_CHECKPOINTS).iterator();
            while (it.hasNext()) {
                this.pendingCheckpointCompletedFutures.remove((Long) it.next()).completeExceptionally(new IllegalStateException("Too many pending checkpoints"));
            }
        }
    }

    private void triggerSourcesCheckpoint(CheckpointBarrier checkpointBarrier) throws IOException {
        Iterator<StreamTaskSourceInput<?>> it = this.operatorChain.getSourceTaskInputs().iterator();
        while (it.hasNext()) {
            Iterator<InputChannelInfo> it2 = it.next().getChannelInfos().iterator();
            while (it2.hasNext()) {
                this.checkpointBarrierHandler.processBarrier(checkpointBarrier, it2.next());
            }
        }
    }

    @Override // org.apache.flink.streaming.runtime.tasks.StreamTask
    public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetricsBuilder checkpointMetricsBuilder) throws IOException {
        CompletableFuture<Boolean> remove = this.pendingCheckpointCompletedFutures.remove(Long.valueOf(checkpointMetaData.getCheckpointId()));
        try {
            super.triggerCheckpointOnBarrier(checkpointMetaData, checkpointOptions, checkpointMetricsBuilder);
            if (remove != null) {
                remove.complete(true);
            }
        } catch (IOException e) {
            if (remove != null) {
                remove.completeExceptionally(e);
            }
            throw e;
        }
    }

    @Override // org.apache.flink.streaming.runtime.tasks.StreamTask
    public void abortCheckpointOnBarrier(long j, Throwable th) throws IOException {
        CompletableFuture<Boolean> remove = this.pendingCheckpointCompletedFutures.remove(Long.valueOf(j));
        if (remove != null) {
            remove.completeExceptionally(th);
        }
        super.abortCheckpointOnBarrier(j, th);
    }
}
