package com.netease.sloth.flink.connector.filesystem.stream;

import com.netease.sloth.flink.connector.filesystem.EasyStreamingFileSink;
import com.netease.sloth.flink.connector.filesystem.stream.compact.CompactFileWriter;
import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.util.List;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.filesystem.FileSystemFactory;
import org.apache.flink.table.filesystem.FileSystemOptions;
import org.apache.flink.table.filesystem.TableMetaStoreFactory;
import org.apache.flink.table.filesystem.stream.PartitionCommitInfo;
import org.apache.flink.table.filesystem.stream.PartitionCommitter;
import org.apache.flink.table.filesystem.stream.compact.CompactBucketWriter;
import org.apache.flink.table.filesystem.stream.compact.CompactCoordinator;
import org.apache.flink.table.filesystem.stream.compact.CompactMessages;
import org.apache.flink.table.filesystem.stream.compact.CompactOperator;
import org.apache.flink.table.filesystem.stream.compact.CompactReader;
import org.apache.flink.util.function.SupplierWithException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/netease/sloth/flink/connector/filesystem/stream/StreamingSink.class */
public class StreamingSink {
    private static final Logger LOG = LoggerFactory.getLogger(StreamingSink.class);

    private StreamingSink() {
    }

    public static <T> DataStream<PartitionCommitInfo> writer(DataStream<T> dataStream, long j, EasyStreamingFileSink.BucketsBuilder<T, String, ? extends EasyStreamingFileSink.BucketsBuilder<T, String, ?>> bucketsBuilder, int i) {
        return dataStream.transform(StreamingFileWriter.class.getSimpleName(), TypeInformation.of(PartitionCommitInfo.class), new StreamingFileWriter(j, bucketsBuilder)).setParallelism(i);
    }

    public static <T> DataStream<PartitionCommitInfo> compactionWriter(DataStream<T> dataStream, long j, EasyStreamingFileSink.BucketsBuilder<T, String, ? extends EasyStreamingFileSink.BucketsBuilder<T, String, ?>> bucketsBuilder, FileSystemFactory fileSystemFactory, Path path, CompactReader.Factory<T> factory, long j2, int i) {
        CompactFileWriter compactFileWriter = new CompactFileWriter(j, bucketsBuilder);
        SupplierWithException supplierWithException = (Serializable) () -> {
            return fileSystemFactory.create(path.toUri());
        };
        SingleOutputStreamOperator maxParallelism = dataStream.transform("streaming-writer", TypeInformation.of(CompactMessages.CoordinatorInput.class), compactFileWriter).setParallelism(i).transform("compact-coordinator", TypeInformation.of(CompactMessages.CoordinatorOutput.class), new CompactCoordinator(supplierWithException, j2)).setParallelism(1).setMaxParallelism(1);
        bucketsBuilder.getClass();
        return maxParallelism.broadcast().transform("compact-operator", TypeInformation.of(PartitionCommitInfo.class), new CompactOperator(supplierWithException, factory, CompactBucketWriter.factory((Serializable) bucketsBuilder::createBucketWriter))).setParallelism(i);
    }

    public static DataStreamSink<?> sink(DataStream<PartitionCommitInfo> dataStream, Path path, ObjectIdentifier objectIdentifier, List<String> list, TableMetaStoreFactory tableMetaStoreFactory, FileSystemFactory fileSystemFactory, Configuration configuration) {
        DataStream<PartitionCommitInfo> dataStream2 = dataStream;
        if (list.size() > 0 && configuration.contains(FileSystemOptions.SINK_PARTITION_COMMIT_POLICY_KIND)) {
            dataStream2 = dataStream.transform(PartitionCommitter.class.getSimpleName(), Types.VOID, new PartitionCommitter(path, objectIdentifier, list, tableMetaStoreFactory, fileSystemFactory, configuration)).setParallelism(1).setMaxParallelism(1);
        }
        return dataStream2.addSink(new DiscardingSink()).name("end").setParallelism(1);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -595219675:
                if (implMethodName.equals("lambda$compactionWriter$554efc5$1")) {
                    z = false;
                    break;
                }
                break;
            case 1968872985:
                if (implMethodName.equals("createBucketWriter")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/util/function/SupplierWithException") && serializedLambda.getFunctionalInterfaceMethodName().equals("get") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/netease/sloth/flink/connector/filesystem/stream/StreamingSink") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/table/filesystem/FileSystemFactory;Lorg/apache/flink/core/fs/Path;)Lorg/apache/flink/core/fs/FileSystem;")) {
                    FileSystemFactory fileSystemFactory = (FileSystemFactory) serializedLambda.getCapturedArg(0);
                    Path path = (Path) serializedLambda.getCapturedArg(1);
                    return () -> {
                        return fileSystemFactory.create(path.toUri());
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/util/function/SupplierWithException") && serializedLambda.getFunctionalInterfaceMethodName().equals("get") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/netease/sloth/flink/connector/filesystem/EasyStreamingFileSink$BucketsBuilder") && serializedLambda.getImplMethodSignature().equals("()Lorg/apache/flink/streaming/api/functions/sink/filesystem/BucketWriter;")) {
                    EasyStreamingFileSink.BucketsBuilder bucketsBuilder = (EasyStreamingFileSink.BucketsBuilder) serializedLambda.getCapturedArg(0);
                    return bucketsBuilder::createBucketWriter;
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
