package org.apache.flink.streaming.connectors.kafka;

import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffleProducer;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.sources.DefinedFieldMapping;
import org.apache.flink.table.sources.DefinedProctimeAttribute;
import org.apache.flink.table.sources.DefinedRowtimeAttributes;
import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.utils.TableConnectorUtils;
import org.apache.flink.table.utils.TableSchemaUtils;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;

@Internal
@Deprecated
/* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/KafkaTableSourceBase.class */
public abstract class KafkaTableSourceBase implements StreamTableSource<Row>, DefinedProctimeAttribute, DefinedRowtimeAttributes, DefinedFieldMapping {
    private final TableSchema schema;
    private final Optional<String> proctimeAttribute;
    private final List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors;
    private final Optional<Map<String, String>> fieldMapping;
    private final String topic;
    private final Properties properties;
    private final DeserializationSchema<Row> deserializationSchema;
    private final StartupMode startupMode;
    private final Map<KafkaTopicPartition, Long> specificStartupOffsets;
    private final long startupTimestampMillis;
    private static final long DEFAULT_STARTUP_TIMESTAMP_MILLIS = 0;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.flink.streaming.connectors.kafka.KafkaTableSourceBase$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/KafkaTableSourceBase$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$streaming$connectors$kafka$config$StartupMode = new int[StartupMode.values().length];

        static {
            try {
                $SwitchMap$org$apache$flink$streaming$connectors$kafka$config$StartupMode[StartupMode.EARLIEST.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$flink$streaming$connectors$kafka$config$StartupMode[StartupMode.LATEST.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$flink$streaming$connectors$kafka$config$StartupMode[StartupMode.GROUP_OFFSETS.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$flink$streaming$connectors$kafka$config$StartupMode[StartupMode.SPECIFIC_OFFSETS.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$apache$flink$streaming$connectors$kafka$config$StartupMode[StartupMode.TIMESTAMP.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public KafkaTableSourceBase(TableSchema tableSchema, Optional<String> optional, List<RowtimeAttributeDescriptor> list, Optional<Map<String, String>> optional2, String str, Properties properties, DeserializationSchema<Row> deserializationSchema, StartupMode startupMode, Map<KafkaTopicPartition, Long> map, long j) {
        this.schema = TableSchemaUtils.checkOnlyPhysicalColumns(tableSchema);
        this.proctimeAttribute = validateProctimeAttribute(optional);
        this.rowtimeAttributeDescriptors = validateRowtimeAttributeDescriptors(list);
        this.fieldMapping = optional2;
        this.topic = (String) Preconditions.checkNotNull(str, "Topic must not be null.");
        this.properties = (Properties) Preconditions.checkNotNull(properties, "Properties must not be null.");
        this.deserializationSchema = (DeserializationSchema) Preconditions.checkNotNull(deserializationSchema, "Deserialization schema must not be null.");
        this.startupMode = (StartupMode) Preconditions.checkNotNull(startupMode, "Startup mode must not be null.");
        this.specificStartupOffsets = (Map) Preconditions.checkNotNull(map, "Specific offsets must not be null.");
        this.startupTimestampMillis = j;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public KafkaTableSourceBase(TableSchema tableSchema, String str, Properties properties, DeserializationSchema<Row> deserializationSchema) {
        this(tableSchema, Optional.empty(), Collections.emptyList(), Optional.empty(), str, properties, deserializationSchema, StartupMode.GROUP_OFFSETS, Collections.emptyMap(), DEFAULT_STARTUP_TIMESTAMP_MILLIS);
    }

    public DataStream<Row> getDataStream(StreamExecutionEnvironment streamExecutionEnvironment) {
        return streamExecutionEnvironment.addSource(getKafkaConsumer(this.topic, this.properties, getDeserializationSchema())).name(explainSource());
    }

    public TypeInformation<Row> getReturnType() {
        return this.deserializationSchema.getProducedType();
    }

    public TableSchema getTableSchema() {
        return this.schema;
    }

    public String getProctimeAttribute() {
        return this.proctimeAttribute.orElse(null);
    }

    public List<RowtimeAttributeDescriptor> getRowtimeAttributeDescriptors() {
        return this.rowtimeAttributeDescriptors;
    }

    public Map<String, String> getFieldMapping() {
        return this.fieldMapping.orElse(null);
    }

    public String explainSource() {
        return TableConnectorUtils.generateRuntimeName(getClass(), this.schema.getFieldNames());
    }

    public Properties getProperties() {
        return this.properties;
    }

    public DeserializationSchema<Row> getDeserializationSchema() {
        return this.deserializationSchema;
    }

    public boolean equals(Object obj) {
        if (this == obj) {
            return true;
        }
        if (obj == null || getClass() != obj.getClass()) {
            return false;
        }
        KafkaTableSourceBase kafkaTableSourceBase = (KafkaTableSourceBase) obj;
        return Objects.equals(this.schema, kafkaTableSourceBase.schema) && Objects.equals(this.proctimeAttribute, kafkaTableSourceBase.proctimeAttribute) && Objects.equals(this.rowtimeAttributeDescriptors, kafkaTableSourceBase.rowtimeAttributeDescriptors) && Objects.equals(this.fieldMapping, kafkaTableSourceBase.fieldMapping) && Objects.equals(this.topic, kafkaTableSourceBase.topic) && Objects.equals(this.properties, kafkaTableSourceBase.properties) && Objects.equals(this.deserializationSchema, kafkaTableSourceBase.deserializationSchema) && this.startupMode == kafkaTableSourceBase.startupMode && Objects.equals(this.specificStartupOffsets, kafkaTableSourceBase.specificStartupOffsets) && this.startupTimestampMillis == kafkaTableSourceBase.startupTimestampMillis;
    }

    public int hashCode() {
        return Objects.hash(this.schema, this.proctimeAttribute, this.rowtimeAttributeDescriptors, this.fieldMapping, this.topic, this.properties, this.deserializationSchema, this.startupMode, this.specificStartupOffsets, Long.valueOf(this.startupTimestampMillis));
    }

    protected FlinkKafkaConsumerBase<Row> getKafkaConsumer(String str, Properties properties, DeserializationSchema<Row> deserializationSchema) {
        FlinkKafkaConsumerBase<Row> createKafkaConsumer = createKafkaConsumer(str, properties, deserializationSchema);
        switch (AnonymousClass1.$SwitchMap$org$apache$flink$streaming$connectors$kafka$config$StartupMode[this.startupMode.ordinal()]) {
            case FlinkKafkaShuffleProducer.KafkaSerializer.TAG_REC_WITHOUT_TIMESTAMP /* 1 */:
                createKafkaConsumer.setStartFromEarliest();
                break;
            case FlinkKafkaShuffleProducer.KafkaSerializer.TAG_WATERMARK /* 2 */:
                createKafkaConsumer.setStartFromLatest();
                break;
            case 3:
                createKafkaConsumer.setStartFromGroupOffsets();
                break;
            case 4:
                createKafkaConsumer.setStartFromSpecificOffsets(this.specificStartupOffsets);
                break;
            case 5:
                createKafkaConsumer.setStartFromTimestamp(this.startupTimestampMillis);
                break;
        }
        createKafkaConsumer.setCommitOffsetsOnCheckpoints(properties.getProperty("group.id") != null);
        return createKafkaConsumer;
    }

    private Optional<String> validateProctimeAttribute(Optional<String> optional) {
        return optional.map(str -> {
            Optional fieldDataType = this.schema.getFieldDataType(str);
            if (!fieldDataType.isPresent()) {
                throw new ValidationException("Processing time attribute '" + str + "' is not present in TableSchema.");
            }
            if (((DataType) fieldDataType.get()).getLogicalType().getTypeRoot() != LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE) {
                throw new ValidationException("Processing time attribute '" + str + "' is not of type TIMESTAMP.");
            }
            return str;
        });
    }

    private List<RowtimeAttributeDescriptor> validateRowtimeAttributeDescriptors(List<RowtimeAttributeDescriptor> list) {
        Preconditions.checkNotNull(list, "List of rowtime attributes must not be null.");
        Iterator<RowtimeAttributeDescriptor> it = list.iterator();
        while (it.hasNext()) {
            String attributeName = it.next().getAttributeName();
            Optional fieldDataType = this.schema.getFieldDataType(attributeName);
            if (!fieldDataType.isPresent()) {
                throw new ValidationException("Rowtime attribute '" + attributeName + "' is not present in TableSchema.");
            }
            if (((DataType) fieldDataType.get()).getLogicalType().getTypeRoot() != LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE) {
                throw new ValidationException("Rowtime attribute '" + attributeName + "' is not of type TIMESTAMP.");
            }
        }
        return list;
    }

    protected abstract FlinkKafkaConsumerBase<Row> createKafkaConsumer(String str, Properties properties, DeserializationSchema<Row> deserializationSchema);
}
