package com.netease.sloth.flink.sql.transformer;

import com.netease.sloth.common.metahub.MetahubTableDTO;
import com.netease.sloth.flink.sql.transformer.catalog.SlothCatalogTable;
import com.netease.sloth.flink.sql.util.CatalogBridgeUtil;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.calcite.shaded.com.google.common.collect.Maps;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.constraints.UniqueConstraint;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.descriptors.Json;
import org.apache.flink.table.types.AtomicDataType;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.TimestampKind;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/netease/sloth/flink/sql/transformer/AbstractCatalogTransformer.class */
public abstract class AbstractCatalogTransformer implements CatalogTransformer<MetahubTableDTO> {
    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractCatalogTransformer.class);

    @Override // com.netease.sloth.flink.sql.transformer.CatalogTransformer
    public CatalogTable transformToCatalogTable(String str, Map<String, String> map, TableSchema tableSchema, Configuration configuration) throws Exception {
        Preconditions.checkNotNull(tableSchema, "tableSchema can't be null");
        try {
            Map map2 = configuration.toMap();
            String[] split = str.split("\\.");
            String str2 = split[split.length - 1];
            Map<String, String> extractConfig = CatalogBridgeUtil.extractConfig(map2, str2);
            map.putAll(extractConfig);
            validate(str2, map);
            TableSchema transform = transform(str2, map, tableSchema);
            formatOptions(transform, map);
            map.putAll(toDescriptor(map));
            return new SlothCatalogTable(transform, filterProperties(map, extractConfig), "sloth catalog table");
        } catch (Exception e) {
            LOGGER.error("transform meta  {} with table schema {} to catalog table failed", new Object[]{map, tableSchema, e});
            throw new RuntimeException(e);
        }
    }

    private void configWatermark(Map<String, String> map, TableSchema.Builder builder) {
        if (map.remove("connections.sink") == null) {
            ArrayList arrayList = new ArrayList();
            map.forEach((str, str2) -> {
                if (str.startsWith("schema.watermark.")) {
                    builder.watermark(str.substring("schema.watermark.".length()), str2, DataTypes.TIMESTAMP(3));
                    arrayList.add(str);
                }
            });
            arrayList.forEach(str3 -> {
            });
        }
    }

    private TableSchema transform(String str, Map<String, String> map, TableSchema tableSchema) {
        TableSchema.Builder builder = TableSchema.builder();
        Optional<List<String>> configPrimaryKey = configPrimaryKey(str, map, tableSchema, builder);
        if (configPrimaryKey.isPresent()) {
            configFields(tableSchema, builder, configPrimaryKey.get());
        } else {
            configFields(tableSchema, builder);
        }
        configComputedColumn(map, builder);
        configProcTime(map, builder);
        configWatermark(map, builder);
        return builder.build();
    }

    private void configFields(TableSchema tableSchema, TableSchema.Builder builder, List<String> list) {
        tableSchema.getTableColumns().forEach(tableColumn -> {
            String name = tableColumn.getName();
            DataType type = tableColumn.getType();
            if (list.contains(name) && type.getLogicalType().isNullable()) {
                type = TypeConversions.fromLogicalToDataType(type.getLogicalType().copy(false));
            }
            builder.field(name, type);
        });
    }

    private void configFields(TableSchema tableSchema, TableSchema.Builder builder) {
        builder.fields(tableSchema.getFieldNames(), tableSchema.getFieldDataTypes());
    }

    protected Optional<List<String>> configPrimaryKey(String str, Map<String, String> map, TableSchema tableSchema, TableSchema.Builder builder) {
        String remove = map.remove("primary.keys");
        if (StringUtils.isNoneBlank(new CharSequence[]{remove})) {
            LOGGER.info("table = {}, {} = {}.", new Object[]{str, "primary.keys", remove});
            String[] split = remove.split(",");
            validatePrimaryKeys(split, tableSchema.getFieldNames());
            builder.primaryKey(String.format("PK_%s", str), split);
            return Optional.of(Arrays.asList(split));
        }
        if (!tableSchema.getPrimaryKey().isPresent()) {
            return Optional.empty();
        }
        UniqueConstraint uniqueConstraint = (UniqueConstraint) tableSchema.getPrimaryKey().get();
        builder.primaryKey(uniqueConstraint.getName(), (String[]) uniqueConstraint.getColumns().toArray(new String[0]));
        return Optional.of(uniqueConstraint.getColumns());
    }

    private void validatePrimaryKeys(String[] strArr, String[] strArr2) {
        List asList = Arrays.asList(strArr2);
        Arrays.stream(strArr).forEach(str -> {
            if (!asList.contains(str)) {
                throw new RuntimeException(String.format("primary key field name: %s is not excepted, available fields: %s", str, Arrays.toString(strArr)));
            }
        });
    }

    private static void addComputedColumn(String str, TableSchema.Builder builder, Map<String, String> map, DataType[] dataTypeArr) {
        String format = String.format("%s.%s", str, "computed.field.expression.");
        map.keySet().forEach(str2 -> {
            if (str2.startsWith(format)) {
                String substring = str2.substring(format.length());
                String str2 = (String) map.get(str2);
                String str3 = (String) map.get(String.format("%s.%s.%s", str, "computed.field.type", substring));
                if (StringUtils.isEmpty(str3)) {
                    throw new UnsupportedOperationException(String.format("You should specify the type of the computed column with name  %s", substring));
                }
                builder.field(substring, ColumnUtil.getDataType(str3, null), str2);
            }
        });
    }

    private void configComputedColumn(Map<String, String> map, TableSchema.Builder builder) {
        if (map.remove("connections.sink") == null) {
            ArrayList arrayList = new ArrayList();
            map.forEach((str, str2) -> {
                if (str.startsWith("computed.field.expression.")) {
                    arrayList.add(str);
                    String substring = str.substring("computed.field.expression.".length());
                    String str = (String) map.get(str);
                    String format = String.format("%s.%s", "computed.field.type", substring);
                    arrayList.add(format);
                    String str2 = (String) map.get(format);
                    if (StringUtils.isEmpty(str2)) {
                        throw new UnsupportedOperationException(String.format("You should specify the type of the computed column with name  %s", substring));
                    }
                    builder.field(substring, ColumnUtil.getDataType(str2, null), str);
                }
            });
            arrayList.forEach(str3 -> {
            });
        }
    }

    private void configProcTime(Map<String, String> map, TableSchema.Builder builder) {
        if (map.remove("connections.sink") == null) {
            String remove = map.remove("use.proctime");
            if (!StringUtils.isNotBlank(remove) || Boolean.parseBoolean(map.getOrDefault("connections.sink", "false"))) {
                return;
            }
            builder.field(remove.trim(), new AtomicDataType(new TimestampType(true, TimestampKind.PROCTIME, 3)).bridgedTo(Timestamp.class));
        }
    }

    @Override // com.netease.sloth.flink.sql.transformer.CatalogTransformer
    public Map<String, String> formatProperties(TableSchema tableSchema, Map<String, String> map) {
        Preconditions.checkNotNull(map, "use config can't be null");
        HashMap newHashMap = Maps.newHashMap();
        if (Boolean.parseBoolean(map.getOrDefault("connections.use.update.mode", "true"))) {
            newHashMap.putIfAbsent("update-mode", map.getOrDefault("update-mode", getDefaultUpdateMode()));
        }
        return newHashMap;
    }

    public Map<String, String> forTimeAndFormat(TableSchema tableSchema, Map<String, String> map) {
        HashMap newHashMap = Maps.newHashMap();
        if (StringUtils.isNotBlank(map.getOrDefault("format.type", "")) && "json".equals(map.get("format.type"))) {
            Json json = new Json();
            if (!map.containsKey("format.schema")) {
                json.deriveSchema();
            }
            newHashMap.putAll(json.toProperties());
        }
        return newHashMap;
    }

    public Map<String, String> filterProperties(Map<String, String> map, Map<String, String> map2) {
        Map<String, String> map3 = (Map) map.entrySet().stream().filter(entry -> {
            return entry.getValue() != null;
        }).filter(entry2 -> {
            return !((String) entry2.getKey()).startsWith("connections.");
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }));
        if (map.containsKey("connector.topic.mode")) {
            map3.put("connector.topic.mode", map.get("connector.topic.mode"));
        }
        return map3;
    }

    protected String getDefaultUpdateMode() {
        return "append";
    }
}
