package org.apache.flink.table.filesystem.stream;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.Set;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.functions.sink.filesystem.Bucket;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketLifeCycleListener;
import org.apache.flink.streaming.api.functions.sink.filesystem.Buckets;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.filesystem.stream.StreamingFileCommitter;

/* loaded from: input_file:org/apache/flink/table/filesystem/stream/StreamingFileWriter.class */
public class StreamingFileWriter extends AbstractStreamOperator<StreamingFileCommitter.CommitMessage> implements OneInputStreamOperator<RowData, StreamingFileCommitter.CommitMessage>, BoundedOneInput {
    private static final long serialVersionUID = 1;
    private final long bucketCheckInterval;
    private final StreamingFileSink.BucketsBuilder<RowData, String, ? extends StreamingFileSink.BucketsBuilder<RowData, String, ?>> bucketsBuilder;
    private transient Buckets<RowData, String> buckets;
    private transient StreamingFileSinkHelper<RowData> helper;
    private transient long currentWatermark;
    private transient Set<String> inactivePartitions;

    public StreamingFileWriter(long j, StreamingFileSink.BucketsBuilder<RowData, String, ? extends StreamingFileSink.BucketsBuilder<RowData, String, ?>> bucketsBuilder) {
        this.bucketCheckInterval = j;
        this.bucketsBuilder = bucketsBuilder;
        setChainingStrategy(ChainingStrategy.ALWAYS);
    }

    public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
        super.initializeState(stateInitializationContext);
        this.buckets = this.bucketsBuilder.createBuckets(getRuntimeContext().getIndexOfThisSubtask());
        this.inactivePartitions = new HashSet();
        this.buckets.setBucketLifeCycleListener(new BucketLifeCycleListener<RowData, String>() { // from class: org.apache.flink.table.filesystem.stream.StreamingFileWriter.1
            public void bucketCreated(Bucket<RowData, String> bucket) {
            }

            public void bucketInactive(Bucket<RowData, String> bucket) {
                StreamingFileWriter.this.inactivePartitions.add(bucket.getBucketId());
            }
        });
        this.helper = new StreamingFileSinkHelper<>(this.buckets, stateInitializationContext.isRestored(), stateInitializationContext.getOperatorStateStore(), getRuntimeContext().getProcessingTimeService(), this.bucketCheckInterval);
        this.currentWatermark = Long.MIN_VALUE;
    }

    public void snapshotState(StateSnapshotContext stateSnapshotContext) throws Exception {
        super.snapshotState(stateSnapshotContext);
        this.helper.snapshotState(stateSnapshotContext.getCheckpointId());
    }

    public void processWatermark(Watermark watermark) throws Exception {
        super.processWatermark(watermark);
        this.currentWatermark = watermark.getTimestamp();
    }

    public void processElement(StreamRecord<RowData> streamRecord) throws Exception {
        this.helper.onElement(streamRecord.getValue(), getProcessingTimeService().getCurrentProcessingTime(), streamRecord.hasTimestamp() ? Long.valueOf(streamRecord.getTimestamp()) : null, this.currentWatermark);
    }

    public void notifyCheckpointComplete(long j) throws Exception {
        super.notifyCheckpointComplete(j);
        commitUpToCheckpoint(j);
    }

    private void commitUpToCheckpoint(long j) throws Exception {
        this.helper.commitUpToCheckpoint(j);
        this.output.collect(new StreamRecord(new StreamingFileCommitter.CommitMessage(j, getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks(), new ArrayList(this.inactivePartitions))));
        this.inactivePartitions.clear();
    }

    public void endInput() throws Exception {
        this.buckets.onProcessingTime(Long.MAX_VALUE);
        this.helper.snapshotState(Long.MAX_VALUE);
        this.output.emitWatermark(new Watermark(Long.MAX_VALUE));
        commitUpToCheckpoint(Long.MAX_VALUE);
    }

    public void dispose() throws Exception {
        super.dispose();
        if (this.helper != null) {
            this.helper.close();
        }
    }
}
