package org.apache.flink.table.api.internal;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.api.CatalogNotExistException;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.ConnectorCatalogTable;
import org.apache.flink.table.catalog.ExternalCatalog;
import org.apache.flink.table.catalog.FunctionCatalog;
import org.apache.flink.table.catalog.GenericInMemoryCatalog;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.QueryOperationCatalogView;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.delegation.Executor;
import org.apache.flink.table.delegation.ExecutorFactory;
import org.apache.flink.table.delegation.Planner;
import org.apache.flink.table.delegation.PlannerFactory;
import org.apache.flink.table.descriptors.ConnectTableDescriptor;
import org.apache.flink.table.descriptors.ConnectorDescriptor;
import org.apache.flink.table.descriptors.StreamTableDescriptor;
import org.apache.flink.table.expressions.TableReferenceExpression;
import org.apache.flink.table.factories.ComponentFactoryService;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.operations.CatalogQueryOperation;
import org.apache.flink.table.operations.CatalogSinkModifyOperation;
import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.QueryOperation;
import org.apache.flink.table.operations.TableSourceQueryOperation;
import org.apache.flink.table.operations.ddl.CreateTableOperation;
import org.apache.flink.table.operations.ddl.DropTableOperation;
import org.apache.flink.table.operations.utils.OperationTreeBuilder;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.sources.TableSource;
import org.apache.flink.table.sources.TableSourceValidation;
import org.apache.flink.util.StringUtils;

@Internal
/* loaded from: input_file:org/apache/flink/table/api/internal/TableEnvironmentImpl.class */
public class TableEnvironmentImpl implements TableEnvironment {
    private static final boolean IS_STREAM_TABLE = true;
    private final CatalogManager catalogManager;
    private final OperationTreeBuilder operationTreeBuilder;
    private final List<ModifyOperation> bufferedModifyOperations = new ArrayList();
    protected final TableConfig tableConfig;
    protected final Executor execEnv;
    protected final FunctionCatalog functionCatalog;
    protected final Planner planner;

    protected TableEnvironmentImpl(CatalogManager catalogManager, TableConfig tableConfig, Executor executor, FunctionCatalog functionCatalog, Planner planner, boolean z) {
        this.catalogManager = catalogManager;
        this.execEnv = executor;
        this.tableConfig = tableConfig;
        this.functionCatalog = functionCatalog;
        this.planner = planner;
        this.operationTreeBuilder = OperationTreeBuilder.create(functionCatalog, str -> {
            return scanInternal(str).map(catalogQueryOperation -> {
                return new TableReferenceExpression(str, catalogQueryOperation);
            });
        }, z);
    }

    public static TableEnvironmentImpl create(EnvironmentSettings environmentSettings) {
        CatalogManager catalogManager = new CatalogManager(environmentSettings.getBuiltInCatalogName(), new GenericInMemoryCatalog(environmentSettings.getBuiltInCatalogName(), environmentSettings.getBuiltInDatabaseName()));
        FunctionCatalog functionCatalog = new FunctionCatalog(catalogManager);
        Map<String, String> executorProperties = environmentSettings.toExecutorProperties();
        Executor create = ((ExecutorFactory) ComponentFactoryService.find(ExecutorFactory.class, executorProperties)).create(executorProperties);
        TableConfig tableConfig = new TableConfig();
        Map<String, String> plannerProperties = environmentSettings.toPlannerProperties();
        return new TableEnvironmentImpl(catalogManager, tableConfig, create, functionCatalog, ((PlannerFactory) ComponentFactoryService.find(PlannerFactory.class, plannerProperties)).create(plannerProperties, create, tableConfig, functionCatalog, catalogManager), environmentSettings.isStreamingMode());
    }

    @VisibleForTesting
    public Planner getPlanner() {
        return this.planner;
    }

    @Override // org.apache.flink.table.api.TableEnvironment
    public Table fromTableSource(TableSource<?> tableSource) {
        return createTable(new TableSourceQueryOperation(tableSource, false));
    }

    @Override // org.apache.flink.table.api.TableEnvironment
    public void registerExternalCatalog(String str, ExternalCatalog externalCatalog) {
        this.catalogManager.registerExternalCatalog(str, externalCatalog);
    }

    @Override // org.apache.flink.table.api.TableEnvironment
    public ExternalCatalog getRegisteredExternalCatalog(String str) {
        return this.catalogManager.getExternalCatalog(str).orElseThrow(() -> {
            return new CatalogNotExistException(str);
        });
    }

    @Override // org.apache.flink.table.api.TableEnvironment
    public void registerCatalog(String str, Catalog catalog) {
        this.catalogManager.registerCatalog(str, catalog);
    }

    @Override // org.apache.flink.table.api.TableEnvironment
    public Optional<Catalog> getCatalog(String str) {
        return this.catalogManager.getCatalog(str);
    }

    @Override // org.apache.flink.table.api.TableEnvironment
    public void registerFunction(String str, ScalarFunction scalarFunction) {
        this.functionCatalog.registerScalarFunction(str, scalarFunction);
    }

    @Override // org.apache.flink.table.api.TableEnvironment
    public void registerTable(String str, Table table) {
        if (((TableImpl) table).getTableEnvironment() != this) {
            throw new TableException("Only tables that belong to this TableEnvironment can be registered.");
        }
        registerTableInternal(str, new QueryOperationCatalogView(table.getQueryOperation()));
    }

    @Override // org.apache.flink.table.api.TableEnvironment
    public void registerTableSource(String str, TableSource<?> tableSource) {
        registerTableSourceInternal(str, tableSource);
    }

    @Override // org.apache.flink.table.api.TableEnvironment
    public void registerTableSink(String str, String[] strArr, TypeInformation<?>[] typeInformationArr, TableSink<?> tableSink) {
        registerTableSink(str, tableSink.configure(strArr, typeInformationArr));
    }

    @Override // org.apache.flink.table.api.TableEnvironment
    public void registerTableSink(String str, TableSink<?> tableSink) {
        if (tableSink.getTableSchema().getFieldCount() == 0) {
            throw new TableException("Table schema cannot be empty.");
        }
        checkValidTableName(str);
        registerTableSinkInternal(str, tableSink);
    }

    @Override // org.apache.flink.table.api.TableEnvironment
    public Table scan(String... strArr) {
        return (Table) scanInternal(strArr).map((v1) -> {
            return createTable(v1);
        }).orElseThrow(() -> {
            return new ValidationException(String.format("Table '%s' was not found.", String.join(".", strArr)));
        });
    }

    private Optional<CatalogQueryOperation> scanInternal(String... strArr) {
        return this.catalogManager.resolveTable(strArr).map(resolvedTable -> {
            return new CatalogQueryOperation(resolvedTable.getTablePath(), resolvedTable.getTableSchema());
        });
    }

    @Override // org.apache.flink.table.api.TableEnvironment
    public ConnectTableDescriptor connect(ConnectorDescriptor connectorDescriptor) {
        return new StreamTableDescriptor(this, connectorDescriptor);
    }

    @Override // org.apache.flink.table.api.TableEnvironment
    public String[] listCatalogs() {
        return (String[]) this.catalogManager.getCatalogs().toArray(new String[0]);
    }

    @Override // org.apache.flink.table.api.TableEnvironment
    public String[] listDatabases() {
        return (String[]) this.catalogManager.getCatalog(this.catalogManager.getCurrentCatalog()).get().listDatabases().toArray(new String[0]);
    }

    @Override // org.apache.flink.table.api.TableEnvironment
    public String[] listTables() {
        String currentCatalog = this.catalogManager.getCurrentCatalog();
        return (String[]) this.catalogManager.getCatalog(currentCatalog).map(catalog -> {
            try {
                return (String[]) catalog.listTables(this.catalogManager.getCurrentDatabase()).toArray(new String[0]);
            } catch (DatabaseNotExistException e) {
                throw new ValidationException("Current database does not exist", e);
            }
        }).orElseThrow(() -> {
            return new TableException(String.format("The current catalog %s does not exist.", currentCatalog));
        });
    }

    @Override // org.apache.flink.table.api.TableEnvironment
    public String[] listUserDefinedFunctions() {
        return this.functionCatalog.getUserDefinedFunctions();
    }

    @Override // org.apache.flink.table.api.TableEnvironment
    public String explain(Table table) {
        return explain(table, false);
    }

    @Override // org.apache.flink.table.api.TableEnvironment
    public String explain(Table table, boolean z) {
        return this.planner.explain(Collections.singletonList(table.getQueryOperation()), z);
    }

    @Override // org.apache.flink.table.api.TableEnvironment
    public String explain(boolean z) {
        return this.planner.explain((List) this.bufferedModifyOperations.stream().map(modifyOperation -> {
            return modifyOperation;
        }).collect(Collectors.toList()), z);
    }

    @Override // org.apache.flink.table.api.TableEnvironment
    public String[] getCompletionHints(String str, int i) {
        return this.planner.getCompletionHints(str, i);
    }

    @Override // org.apache.flink.table.api.TableEnvironment
    public Table sqlQuery(String str) {
        List<Operation> parse = this.planner.parse(str);
        if (parse.size() != IS_STREAM_TABLE) {
            throw new ValidationException("Unsupported SQL query! sqlQuery() only accepts a single SQL query.");
        }
        Operation operation = parse.get(0);
        if (!(operation instanceof QueryOperation) || (operation instanceof ModifyOperation)) {
            throw new ValidationException("Unsupported SQL query! sqlQuery() only accepts a single SQL query of type SELECT, UNION, INTERSECT, EXCEPT, VALUES, and ORDER_BY.");
        }
        return createTable((QueryOperation) operation);
    }

    @Override // org.apache.flink.table.api.TableEnvironment
    public void insertInto(Table table, String str, String... strArr) {
        ArrayList arrayList = new ArrayList(Arrays.asList(strArr));
        arrayList.add(0, str);
        List<ModifyOperation> singletonList = Collections.singletonList(new CatalogSinkModifyOperation(arrayList, table.getQueryOperation()));
        if (isEagerOperationTranslation()) {
            translate(singletonList);
        } else {
            buffer(singletonList);
        }
    }

    @Override // org.apache.flink.table.api.TableEnvironment
    public void sqlUpdate(String str) {
        List<Operation> parse = this.planner.parse(str);
        if (parse.size() != IS_STREAM_TABLE) {
            throw new TableException("Unsupported SQL query! sqlUpdate() only accepts a single SQL statement of type INSERT, CREATE TABLE, DROP TABLE");
        }
        Operation operation = parse.get(0);
        if (operation instanceof ModifyOperation) {
            List<ModifyOperation> singletonList = Collections.singletonList((ModifyOperation) operation);
            if (isEagerOperationTranslation()) {
                translate(singletonList);
                return;
            } else {
                buffer(singletonList);
                return;
            }
        }
        if (operation instanceof CreateTableOperation) {
            CreateTableOperation createTableOperation = (CreateTableOperation) operation;
            registerCatalogTableInternal(createTableOperation.getTablePath(), createTableOperation.getCatalogTable(), createTableOperation.isIgnoreIfExists());
            return;
        }
        if (!(operation instanceof DropTableOperation)) {
            throw new TableException("Unsupported SQL query! sqlUpdate() only accepts a single SQL statements of type INSERT, CREATE TABLE, DROP TABLE");
        }
        String[] tableName = ((DropTableOperation) operation).getTableName();
        boolean isIfExists = ((DropTableOperation) operation).isIfExists();
        String[] fullTablePath = this.catalogManager.getFullTablePath(Arrays.asList(tableName));
        Optional<Catalog> catalog = getCatalog(fullTablePath[0]);
        if (!catalog.isPresent()) {
            if (!isIfExists) {
                throw new TableException("Catalog " + fullTablePath[0] + " does not exist.");
            }
        } else {
            try {
                catalog.get().dropTable(new ObjectPath(fullTablePath[IS_STREAM_TABLE], fullTablePath[2]), isIfExists);
            } catch (TableNotExistException e) {
                throw new TableException(e.getMessage());
            }
        }
    }

    @Override // org.apache.flink.table.api.TableEnvironment
    public String getCurrentCatalog() {
        return this.catalogManager.getCurrentCatalog();
    }

    @Override // org.apache.flink.table.api.TableEnvironment
    public void useCatalog(String str) {
        this.catalogManager.setCurrentCatalog(str);
    }

    @Override // org.apache.flink.table.api.TableEnvironment
    public String getCurrentDatabase() {
        return this.catalogManager.getCurrentDatabase();
    }

    @Override // org.apache.flink.table.api.TableEnvironment
    public void useDatabase(String str) {
        this.catalogManager.setCurrentDatabase(str);
    }

    @Override // org.apache.flink.table.api.TableEnvironment
    public TableConfig getConfig() {
        return this.tableConfig;
    }

    @Override // org.apache.flink.table.api.TableEnvironment
    public JobExecutionResult execute(String str) throws Exception {
        translate(this.bufferedModifyOperations);
        this.bufferedModifyOperations.clear();
        return this.execEnv.execute(str);
    }

    protected boolean isEagerOperationTranslation() {
        return false;
    }

    protected void validateTableSource(TableSource<?> tableSource) {
        TableSourceValidation.validateTableSource(tableSource);
    }

    private void translate(List<ModifyOperation> list) {
        this.execEnv.apply(this.planner.translate(list));
    }

    private void buffer(List<ModifyOperation> list) {
        this.bufferedModifyOperations.addAll(list);
    }

    private void registerCatalogTableInternal(String[] strArr, CatalogBaseTable catalogBaseTable, boolean z) {
        String[] fullTablePath = this.catalogManager.getFullTablePath(Arrays.asList(strArr));
        try {
            getCatalog(fullTablePath[0]).orElseThrow(() -> {
                return new TableException("Catalog " + fullTablePath[0] + " does not exist");
            }).createTable(new ObjectPath(fullTablePath[IS_STREAM_TABLE], fullTablePath[2]), catalogBaseTable, z);
        } catch (Exception e) {
            throw new TableException("Could not register table", e);
        }
    }

    protected void registerTableInternal(String str, CatalogBaseTable catalogBaseTable) {
        try {
            checkValidTableName(str);
            ObjectPath objectPath = new ObjectPath(this.catalogManager.getBuiltInDatabaseName(), str);
            Optional<Catalog> catalog = this.catalogManager.getCatalog(this.catalogManager.getBuiltInCatalogName());
            if (catalog.isPresent()) {
                catalog.get().createTable(objectPath, catalogBaseTable, false);
            }
        } catch (Exception e) {
            throw new TableException("Could not register table", e);
        }
    }

    private void replaceTableInternal(String str, CatalogBaseTable catalogBaseTable) {
        try {
            ObjectPath objectPath = new ObjectPath(this.catalogManager.getBuiltInDatabaseName(), str);
            Optional<Catalog> catalog = this.catalogManager.getCatalog(this.catalogManager.getBuiltInCatalogName());
            if (catalog.isPresent()) {
                catalog.get().alterTable(objectPath, catalogBaseTable, false);
            }
        } catch (Exception e) {
            throw new TableException("Could not register table", e);
        }
    }

    private void checkValidTableName(String str) {
        if (StringUtils.isNullOrWhitespaceOnly(str)) {
            throw new ValidationException("A table name cannot be null or consist of only whitespaces.");
        }
    }

    private void registerTableSourceInternal(String str, TableSource<?> tableSource) {
        validateTableSource(tableSource);
        Optional<CatalogBaseTable> catalogTable = getCatalogTable(this.catalogManager.getBuiltInCatalogName(), this.catalogManager.getBuiltInDatabaseName(), str);
        if (!catalogTable.isPresent()) {
            registerTableInternal(str, ConnectorCatalogTable.source(tableSource, false));
        } else {
            if (!(catalogTable.get() instanceof ConnectorCatalogTable)) {
                throw new ValidationException(String.format("Table '%s' already exists. Please choose a different name.", str));
            }
            ConnectorCatalogTable connectorCatalogTable = catalogTable.get();
            if (connectorCatalogTable.getTableSource().isPresent()) {
                throw new ValidationException(String.format("Table '%s' already exists. Please choose a different name.", str));
            }
            replaceTableInternal(str, ConnectorCatalogTable.sourceAndSink(tableSource, (TableSink) connectorCatalogTable.getTableSink().get(), false));
        }
    }

    private void registerTableSinkInternal(String str, TableSink<?> tableSink) {
        Optional<CatalogBaseTable> catalogTable = getCatalogTable(this.catalogManager.getBuiltInCatalogName(), this.catalogManager.getBuiltInDatabaseName(), str);
        if (!catalogTable.isPresent()) {
            registerTableInternal(str, ConnectorCatalogTable.sink(tableSink, false));
        } else {
            if (!(catalogTable.get() instanceof ConnectorCatalogTable)) {
                throw new ValidationException(String.format("Table '%s' already exists. Please choose a different name.", str));
            }
            ConnectorCatalogTable connectorCatalogTable = catalogTable.get();
            if (connectorCatalogTable.getTableSink().isPresent()) {
                throw new ValidationException(String.format("Table '%s' already exists. Please choose a different name.", str));
            }
            replaceTableInternal(str, ConnectorCatalogTable.sourceAndSink((TableSource) connectorCatalogTable.getTableSource().get(), tableSink, false));
        }
    }

    private Optional<CatalogBaseTable> getCatalogTable(String... strArr) {
        return this.catalogManager.resolveTable(strArr).flatMap((v0) -> {
            return v0.getCatalogTable();
        });
    }

    protected TableImpl createTable(QueryOperation queryOperation) {
        return TableImpl.createTable(this, queryOperation, this.operationTreeBuilder, this.functionCatalog);
    }
}
