package com.netease.sloth.flink.connector.hive.adaptor.hive;

import com.netease.sloth.flink.connector.filesystem.EasyStreamingFileSink;
import com.netease.sloth.flink.connector.filesystem.HadoopFileSystemFactory;
import com.netease.sloth.flink.connector.filesystem.stream.StreamingSink;
import com.netease.sloth.flink.connector.filesystem.table.meta.TableMetaStore;
import com.netease.sloth.flink.connector.hive.adaptor.hive.HiveTableSink;
import com.netease.sloth.flink.connector.hive.adaptor.hive.read.HiveCompactReaderFactory;
import com.netease.sloth.flink.connector.hive.adaptor.hive.util.HiveConfUtils;
import com.netease.sloth.flink.connector.hive.adaptor.hive.write.HiveBulkWriterFactory;
import com.netease.sloth.flink.connector.hive.adaptor.hive.write.HiveOutputFormatFactory;
import com.netease.sloth.flink.connector.hive.adaptor.hive.write.HiveWriterFactory;
import com.netease.sloth.flink.connector.hive.table.catalog.hive.client.HiveMetastoreClientFactory;
import com.netease.sloth.flink.connector.hive.table.catalog.hive.client.HiveMetastoreClientWrapper;
import com.netease.sloth.flink.connector.hive.table.catalog.hive.client.HiveShim;
import com.netease.sloth.flink.connector.hive.table.catalog.hive.client.HiveShimLoader;
import com.netease.sloth.flink.connector.hive.table.catalog.hive.util.HiveReflectionUtils;
import com.netease.sloth.flink.connector.hive.table.catalog.hive.util.HiveTableUtil;
import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.time.Duration;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.UUID;
import org.apache.flink.api.common.serialization.BulkWriter;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.fs.Path;
import org.apache.flink.formats.parquet.row.ParquetRowDataBuilder;
import org.apache.flink.orc.OrcSplitReaderUtil;
import org.apache.flink.orc.writer.ThreadLocalClassLoaderConfiguration;
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.filesystem.FileSystemOptions;
import org.apache.flink.table.filesystem.FileSystemOutputFormat;
import org.apache.flink.table.filesystem.FileSystemTableSink;
import org.apache.flink.table.filesystem.stream.compact.CompactOperator;
import org.apache.flink.table.filesystem.stream.compact.CompactReader;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.utils.TableSchemaUtils;
import org.apache.flink.types.Row;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/netease/sloth/flink/connector/hive/adaptor/hive/FlinkSink.class */
public class FlinkSink {
    private static final Logger LOG = LoggerFactory.getLogger(FlinkSink.class);

    /* loaded from: input_file:com/netease/sloth/flink/connector/hive/adaptor/hive/FlinkSink$Builder.class */
    public static class Builder {
        private ReadableConfig flinkConf;
        private JobConf jobConf;
        private CatalogTable catalogTable;
        private ObjectIdentifier identifier;
        private TableSchema tableSchema;
        private String hiveVersion;
        private HiveShim hiveShim;
        private boolean isBounded;
        private DynamicTableSink.DataStructureConverter converter;
        private LinkedHashMap<String, String> staticPartitionSpec;
        private TableMetaStore tableMetaStore;
        private DataStream<RowData> rowDataInput = null;
        private boolean overwrite = false;
        private boolean dynamicGrouping = false;

        /* JADX INFO: Access modifiers changed from: private */
        public Builder forRowData(DataStream<RowData> dataStream) {
            this.rowDataInput = dataStream;
            return this;
        }

        public Builder flinkConf(ReadableConfig readableConfig) {
            this.flinkConf = readableConfig;
            return this;
        }

        public Builder jobConf(JobConf jobConf) {
            this.jobConf = jobConf;
            this.hiveVersion = (String) Preconditions.checkNotNull(jobConf.get("hive-version"), "Hive version is not defined");
            this.hiveShim = HiveShimLoader.loadHiveShim(this.hiveVersion);
            return this;
        }

        public Builder catalogTable(CatalogTable catalogTable) {
            this.catalogTable = catalogTable;
            this.tableSchema = TableSchemaUtils.getPhysicalSchema(catalogTable.getSchema());
            return this;
        }

        public Builder identifier(ObjectIdentifier objectIdentifier) {
            this.identifier = objectIdentifier;
            return this;
        }

        public Builder tableMetaStore(TableMetaStore tableMetaStore) {
            this.tableMetaStore = tableMetaStore;
            return this;
        }

        public Builder isBounded(boolean z) {
            this.isBounded = z;
            return this;
        }

        public Builder converter(DynamicTableSink.DataStructureConverter dataStructureConverter) {
            this.converter = dataStructureConverter;
            return this;
        }

        public Builder staticPartitionSpec(LinkedHashMap<String, String> linkedHashMap) {
            this.staticPartitionSpec = linkedHashMap;
            return this;
        }

        public Builder overwrite(boolean z) {
            this.overwrite = z;
            return this;
        }

        public Builder dynamicGrouping(boolean z) {
            this.dynamicGrouping = z;
            return this;
        }

        public DataStreamSink<?> build() {
            HiveTableUtil.checkAcidTable(this.catalogTable, this.identifier.toObjectPath());
            try {
                try {
                    HiveMetastoreClientWrapper hiveMetastoreClientWrapper = (HiveMetastoreClientWrapper) this.tableMetaStore.doAs(() -> {
                        return HiveMetastoreClientFactory.create(HiveConfUtils.create(this.jobConf), this.hiveVersion);
                    });
                    Throwable th = null;
                    try {
                        Table table = hiveMetastoreClientWrapper.getTable(this.identifier.getDatabaseName(), this.identifier.getObjectName());
                        StorageDescriptor sd = table.getSd();
                        Class hiveOutputFormatClass = this.hiveShim.getHiveOutputFormatClass(Class.forName(sd.getOutputFormat()));
                        boolean z = this.jobConf.getBoolean(HiveConf.ConfVars.COMPRESSRESULT.varname, false);
                        HiveWriterFactory hiveWriterFactory = new HiveWriterFactory(this.jobConf, hiveOutputFormatClass, sd.getSerdeInfo(), this.tableSchema, getPartitionKeyArray(), HiveReflectionUtils.getTableMetadata(this.hiveShim, table), this.hiveShim, z, this.tableMetaStore);
                        String fileExtension = Utilities.getFileExtension(this.jobConf, z, (HiveOutputFormat) hiveOutputFormatClass.newInstance());
                        OutputFileConfig.OutputFileConfigBuilder withPartSuffix = OutputFileConfig.builder().withPartPrefix("part-" + UUID.randomUUID().toString()).withPartSuffix(fileExtension == null ? "" : fileExtension);
                        if (this.isBounded) {
                            DataStreamSink<Row> createBatchSink = createBatchSink(this.rowDataInput, this.converter, sd, hiveWriterFactory, withPartSuffix.build());
                            if (hiveMetastoreClientWrapper != null) {
                                if (0 != 0) {
                                    try {
                                        hiveMetastoreClientWrapper.close();
                                    } catch (Throwable th2) {
                                        th.addSuppressed(th2);
                                    }
                                } else {
                                    hiveMetastoreClientWrapper.close();
                                }
                            }
                            return createBatchSink;
                        }
                        if (this.overwrite) {
                            throw new IllegalStateException("Streaming mode not support overwrite.");
                        }
                        DataStreamSink<?> createStreamSink = createStreamSink(this.rowDataInput, sd, HiveReflectionUtils.getTableMetadata(this.hiveShim, table), hiveWriterFactory, withPartSuffix);
                        if (hiveMetastoreClientWrapper != null) {
                            if (0 != 0) {
                                try {
                                    hiveMetastoreClientWrapper.close();
                                } catch (Throwable th3) {
                                    th.addSuppressed(th3);
                                }
                            } else {
                                hiveMetastoreClientWrapper.close();
                            }
                        }
                        return createStreamSink;
                    } catch (Throwable th4) {
                        if (hiveMetastoreClientWrapper != null) {
                            if (0 != 0) {
                                try {
                                    hiveMetastoreClientWrapper.close();
                                } catch (Throwable th5) {
                                    th.addSuppressed(th5);
                                }
                            } else {
                                hiveMetastoreClientWrapper.close();
                            }
                        }
                        throw th4;
                    }
                } catch (IllegalAccessException | InstantiationException e) {
                    throw new FlinkHiveException("Failed to instantiate output format instance", e);
                }
            } catch (IOException e2) {
                throw new FlinkRuntimeException("Failed to create staging dir", e2);
            } catch (TException e3) {
                throw new CatalogException("Failed to query Hive metaStore", e3);
            } catch (ClassNotFoundException e4) {
                throw new FlinkHiveException("Failed to get output format class", e4);
            }
        }

        private DataStreamSink<Row> createBatchSink(DataStream<RowData> dataStream, DynamicTableSink.DataStructureConverter dataStructureConverter, StorageDescriptor storageDescriptor, HiveWriterFactory hiveWriterFactory, OutputFileConfig outputFileConfig) throws IOException {
            FileSystemOutputFormat.Builder builder = new FileSystemOutputFormat.Builder();
            builder.setPartitionComputer(new HiveRowPartitionComputer(this.hiveShim, defaultPartName(), this.tableSchema.getFieldNames(), this.tableSchema.getFieldDataTypes(), getPartitionKeyArray()));
            builder.setDynamicGrouped(this.dynamicGrouping);
            builder.setPartitionColumns(getPartitionKeyArray());
            builder.setFileSystemFactory(fsFactory());
            builder.setFormatFactory(new HiveOutputFormatFactory(hiveWriterFactory));
            builder.setMetaStoreFactory(msFactory());
            builder.setOverwrite(this.overwrite);
            builder.setStaticPartitions(this.staticPartitionSpec);
            builder.setTempPath(new Path(toStagingDir(storageDescriptor.getLocation(), this.jobConf)));
            builder.setOutputFileConfig(outputFileConfig);
            return dataStream.map(rowData -> {
                return (Row) dataStructureConverter.toExternal(rowData);
            }).writeUsingOutputFormat(builder.build()).setParallelism(dataStream.getParallelism());
        }

        private DataStreamSink<?> createStreamSink(DataStream<RowData> dataStream, StorageDescriptor storageDescriptor, Properties properties, HiveWriterFactory hiveWriterFactory, OutputFileConfig.OutputFileConfigBuilder outputFileConfigBuilder) {
            EasyStreamingFileSink.BucketsBuilder<RowData, String, ? extends EasyStreamingFileSink.BucketsBuilder<RowData, ?, ?>> bucketsBuilderForMRWriter;
            DataStream writer;
            Configuration configuration = new Configuration();
            Map options = this.catalogTable.getOptions();
            configuration.getClass();
            options.forEach(configuration::setString);
            HiveRowDataPartitionComputer hiveRowDataPartitionComputer = new HiveRowDataPartitionComputer(this.hiveShim, defaultPartName(), this.tableSchema.getFieldNames(), this.tableSchema.getFieldDataTypes(), getPartitionKeyArray());
            FileSystemTableSink.TableBucketAssigner tableBucketAssigner = new FileSystemTableSink.TableBucketAssigner(hiveRowDataPartitionComputer);
            HiveTableSink.HiveRollingPolicy hiveRollingPolicy = new HiveTableSink.HiveRollingPolicy(((MemorySize) configuration.get(FileSystemOptions.SINK_ROLLING_POLICY_FILE_SIZE)).getBytes(), ((Duration) configuration.get(FileSystemOptions.SINK_ROLLING_POLICY_ROLLOVER_INTERVAL)).toMillis());
            boolean z = configuration.getBoolean(FileSystemOptions.AUTO_COMPACTION);
            if (z) {
                outputFileConfigBuilder.withPartPrefix(CompactOperator.convertToUncompacted(outputFileConfigBuilder.build().getPartPrefix()));
            }
            OutputFileConfig build = outputFileConfigBuilder.build();
            Path path = new Path(storageDescriptor.getLocation());
            if (((Boolean) this.flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER)).booleanValue()) {
                bucketsBuilderForMRWriter = bucketsBuilderForMRWriter(hiveWriterFactory, storageDescriptor, tableBucketAssigner, hiveRollingPolicy, build, this.tableMetaStore);
                FlinkSink.LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer.");
            } else {
                Optional<BulkWriter.Factory<RowData>> createBulkWriterFactory = createBulkWriterFactory(getPartitionKeyArray(), storageDescriptor);
                if (createBulkWriterFactory.isPresent()) {
                    bucketsBuilderForMRWriter = EasyStreamingFileSink.forBulkFormat(path, new FileSystemTableSink.ProjectionBulkFactory(createBulkWriterFactory.get(), hiveRowDataPartitionComputer)).withTableMetaStore(this.tableMetaStore).withBucketAssigner(tableBucketAssigner).withRollingPolicy(hiveRollingPolicy).withOutputFileConfig(build);
                    FlinkSink.LOG.info("Hive streaming sink: Use native parquet&orc writer.");
                } else {
                    bucketsBuilderForMRWriter = bucketsBuilderForMRWriter(hiveWriterFactory, storageDescriptor, tableBucketAssigner, hiveRollingPolicy, build, this.tableMetaStore);
                    FlinkSink.LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer because BulkWriter Factory not available.");
                }
            }
            long millis = ((Duration) configuration.get(FileSystemOptions.SINK_ROLLING_POLICY_CHECK_INTERVAL)).toMillis();
            int intValue = ((Integer) configuration.getOptional(FactoryUtil.SINK_PARALLELISM).orElse(Integer.valueOf(dataStream.getParallelism()))).intValue();
            if (z) {
                writer = StreamingSink.compactionWriter(dataStream, millis, bucketsBuilderForMRWriter, fsFactory(), path, createCompactReaderFactory(storageDescriptor, properties), ((MemorySize) configuration.getOptional(FileSystemOptions.COMPACTION_FILE_SIZE).orElse(configuration.get(FileSystemOptions.SINK_ROLLING_POLICY_FILE_SIZE))).getBytes(), intValue);
            } else {
                writer = StreamingSink.writer(dataStream, millis, bucketsBuilderForMRWriter, intValue);
            }
            return StreamingSink.sink(writer, path, this.identifier, getPartitionKeys(), msFactory(), fsFactory(), configuration);
        }

        private String defaultPartName() {
            return this.jobConf.get(HiveConf.ConfVars.DEFAULTPARTITIONNAME.varname, HiveConf.ConfVars.DEFAULTPARTITIONNAME.defaultStrVal);
        }

        private CompactReader.Factory<RowData> createCompactReaderFactory(StorageDescriptor storageDescriptor, Properties properties) {
            return new HiveCompactReaderFactory(storageDescriptor, properties, this.jobConf, this.catalogTable, this.hiveVersion, this.tableSchema.toRowDataType().getLogicalType(), ((Boolean) this.flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER)).booleanValue(), this.tableMetaStore);
        }

        private HiveTableMetaStoreFactory msFactory() {
            FlinkSink.LOG.info("sasl:{}.", this.jobConf.get(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL.varname));
            return new HiveTableMetaStoreFactory(this.jobConf, this.hiveVersion, this.identifier.getDatabaseName(), this.identifier.getObjectName(), this.tableMetaStore);
        }

        private HadoopFileSystemFactory fsFactory() {
            return new HadoopFileSystemFactory(this.tableMetaStore);
        }

        private EasyStreamingFileSink.BucketsBuilder<RowData, String, ? extends EasyStreamingFileSink.BucketsBuilder<RowData, ?, ?>> bucketsBuilderForMRWriter(HiveWriterFactory hiveWriterFactory, StorageDescriptor storageDescriptor, FileSystemTableSink.TableBucketAssigner tableBucketAssigner, HiveTableSink.HiveRollingPolicy hiveRollingPolicy, OutputFileConfig outputFileConfig, TableMetaStore tableMetaStore) {
            return new HadoopPathBasedBulkFormatBuilder(new org.apache.hadoop.fs.Path(storageDescriptor.getLocation()), new HiveBulkWriterFactory(hiveWriterFactory), this.jobConf, tableBucketAssigner, tableMetaStore).withRollingPolicy(hiveRollingPolicy).withOutputFileConfig(outputFileConfig);
        }

        private Optional<BulkWriter.Factory<RowData>> createBulkWriterFactory(String[] strArr, StorageDescriptor storageDescriptor) {
            String lowerCase = storageDescriptor.getSerdeInfo().getSerializationLib().toLowerCase();
            int fieldCount = this.tableSchema.getFieldCount() - strArr.length;
            String[] strArr2 = new String[fieldCount];
            LogicalType[] logicalTypeArr = new LogicalType[fieldCount];
            ArrayList newArrayList = Lists.newArrayList(strArr);
            int i = 0;
            for (int i2 = 0; i2 < this.tableSchema.getFieldCount(); i2++) {
                if (!newArrayList.contains(this.tableSchema.getFieldName(i2).get())) {
                    strArr2[i] = (String) this.tableSchema.getFieldName(i2).get();
                    int i3 = i;
                    i++;
                    logicalTypeArr[i3] = ((DataType) this.tableSchema.getFieldDataType(i2).get()).getLogicalType();
                }
            }
            RowType of = RowType.of(logicalTypeArr, strArr2);
            if (lowerCase.contains("parquet")) {
                org.apache.hadoop.conf.Configuration configuration = new org.apache.hadoop.conf.Configuration(this.jobConf);
                Map parameters = storageDescriptor.getSerdeInfo().getParameters();
                configuration.getClass();
                parameters.forEach(configuration::set);
                return Optional.of(ParquetRowDataBuilder.createWriterFactory(of, configuration, this.hiveVersion.startsWith("3.")));
            }
            if (!lowerCase.contains("orc")) {
                return Optional.empty();
            }
            org.apache.hadoop.conf.Configuration threadLocalClassLoaderConfiguration = new ThreadLocalClassLoaderConfiguration(this.jobConf);
            Map parameters2 = storageDescriptor.getSerdeInfo().getParameters();
            threadLocalClassLoaderConfiguration.getClass();
            parameters2.forEach(threadLocalClassLoaderConfiguration::set);
            return Optional.of(this.hiveShim.createOrcBulkWriterFactory(threadLocalClassLoaderConfiguration, OrcSplitReaderUtil.logicalTypeToOrcType(of).toString(), logicalTypeArr));
        }

        private String toStagingDir(String str, org.apache.hadoop.conf.Configuration configuration) throws IOException {
            String str2 = str;
            if (!str.endsWith("/")) {
                str2 = str2 + "/";
            }
            String str3 = str2 + ".staging_" + System.currentTimeMillis();
            org.apache.hadoop.fs.Path path = new org.apache.hadoop.fs.Path(str3);
            FileSystem fileSystem = path.getFileSystem(configuration);
            Preconditions.checkState(fileSystem.exists(path) || fileSystem.mkdirs(path), "Failed to create staging dir " + path);
            fileSystem.deleteOnExit(path);
            return str3;
        }

        private List<String> getPartitionKeys() {
            return this.catalogTable.getPartitionKeys();
        }

        private String[] getPartitionKeyArray() {
            return (String[]) getPartitionKeys().toArray(new String[0]);
        }

        private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
            String implMethodName = serializedLambda.getImplMethodName();
            boolean z = -1;
            switch (implMethodName.hashCode()) {
                case -167597376:
                    if (implMethodName.equals("lambda$createBatchSink$71aa92e5$1")) {
                        z = false;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("map") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/netease/sloth/flink/connector/hive/adaptor/hive/FlinkSink$Builder") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/table/connector/sink/DynamicTableSink$DataStructureConverter;Lorg/apache/flink/table/data/RowData;)Lorg/apache/flink/types/Row;")) {
                        DynamicTableSink.DataStructureConverter dataStructureConverter = (DynamicTableSink.DataStructureConverter) serializedLambda.getCapturedArg(0);
                        return rowData -> {
                            return (Row) dataStructureConverter.toExternal(rowData);
                        };
                    }
                    break;
            }
            throw new IllegalArgumentException("Invalid lambda deserialization");
        }
    }

    public static Builder forRowData(DataStream<RowData> dataStream) {
        return new Builder().forRowData(dataStream);
    }
}
