package com.netease.sloth.flink.sql.planner;

import com.netease.sloth.flink.sql.api.context.ExecutionContext;
import com.netease.sloth.flink.sql.api.parse.SqlParser;
import com.netease.sloth.flink.sql.api.table.BaseTableEnvironment;
import com.netease.sloth.flink.sql.catalog.CatalogConstant;
import com.netease.sloth.flink.sql.context.SlothContext;
import com.netease.sloth.flink.sql.memory.Constant;
import com.netease.sloth.flink.sql.parse.SqlCommandParser;
import com.netease.sloth.flink.sql.shim.FlinkShim;
import com.netease.sloth.flink.sql.transformer.catalog.SlothCatalogTable;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.SqlParserException;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/netease/sloth/flink/sql/planner/SlothJobExecutor.class */
public class SlothJobExecutor {
    private static final Logger log = LoggerFactory.getLogger(SlothJobExecutor.class);
    private static final Logger LOG = LoggerFactory.getLogger(SlothJobExecutor.class);
    public static final String SELECT_TABLE_NAME = "select_result";
    protected final ExecutionContext executionContext;
    private BaseTableEnvironment tableEnvironment;
    protected Listener<ExecutionContext> listener;
    protected SqlParser sqlParser = FlinkShim.getSqlParse();

    public SlothJobExecutor(ExecutionContext executionContext, Listener<ExecutionContext> listener) {
        this.executionContext = executionContext;
        this.listener = listener;
        SlothContext.setContext(this.executionContext);
    }

    public SlothJobExecutor(ExecutionContext executionContext) {
        this.executionContext = executionContext;
        SlothContext.setContext(this.executionContext);
    }

    public Optional<Table> validatorSql() {
        EnvironmentSettings.Builder useBlinkPlanner = EnvironmentSettings.newInstance().useBlinkPlanner();
        if (this.executionContext.isStreamMode()) {
            useBlinkPlanner.inStreamingMode();
        } else {
            useBlinkPlanner.inBatchMode();
        }
        try {
            this.tableEnvironment = this.executionContext.getTableEnvBuilder().withSettings(useBlinkPlanner.build()).withExecutionContext(this.executionContext).withExecutorMode(getExecutorMode()).build();
            this.tableEnvironment.getConfig().getConfiguration().setBoolean(CatalogConstant.TABLE_EXEC_SOURCE_CDC_EVENTS_DUPLICATE_KEY, true);
            LOG.info("[CDC-config]set table.exec.source.cdc-events-duplicate  true");
            List<SqlCommandParser.SqlCommandCall> parseStmts = SqlCommandParser.parseStmts(this.executionContext.getSqlText());
            Optional<Table> empty = Optional.empty();
            for (int i = 0; i < parseStmts.size(); i++) {
                SqlCommandParser.SqlCommandCall sqlCommandCall = parseStmts.get(i);
                empty = callCommand(sqlCommandCall);
                if (parseStmts.size() == 1 && sqlCommandCall.command == SqlCommandParser.SqlCommand.CREATE_TABLE && this.executionContext.isDebugV2()) {
                    String str = "select * from " + FlinkShim.getFlinkPlannerFactory().create(SqlDialect.DEFAULT).parser().parse(sqlCommandCall.operands[0]).getTableName().getSimple();
                    addSinkForQuery(callCommand(new SqlCommandParser.SqlCommandCall(SqlCommandParser.SqlCommand.QUERY_SELECT, new String[]{str})), str);
                }
                if (i == parseStmts.size() - 1 && this.executionContext.isDebug() && empty.isPresent() && this.executionContext.isDebugV2()) {
                    addSinkForQuery(empty, sqlCommandCall.operands[0]);
                }
            }
            this.tableEnvironment.setAllSqlUpdated();
            return empty;
        } catch (Exception e) {
            throw new RuntimeException(String.format("create table environment failed:%s", e.getMessage()), e);
        }
    }

    private void addSinkForQuery(Optional<Table> optional, String str) {
        Catalog catalog = (Catalog) this.tableEnvironment.getCatalog(this.tableEnvironment.getCurrentCatalog()).get();
        HashMap hashMap = new HashMap();
        hashMap.put(FlinkShim.getFlinkVersionAdapter().connectorIdentityKey(), "memory");
        hashMap.put(Constant.TABLE_NAME, SELECT_TABLE_NAME);
        try {
            catalog.createTable(new ObjectPath(this.tableEnvironment.getCurrentDatabase(), SELECT_TABLE_NAME), new SlothCatalogTable(optional.get().getSchema(), hashMap, null), false);
        } catch (DatabaseNotExistException e) {
            log.error("", e);
        } catch (TableAlreadyExistException e2) {
            log.error("不能定义名称为{}的表！！！！！！！！！！！！！！！！！！！！！", SELECT_TABLE_NAME);
        }
        callCommand(new SqlCommandParser.SqlCommandCall(SqlCommandParser.SqlCommand.INSERT_INTO, new String[]{"insert into select_result " + str}));
    }

    public String getExecutorMode() {
        return SlothExecutorFactory.SLOTH_EXECUTOR;
    }

    public StreamGraph generate(String str) {
        return this.tableEnvironment.generateStreamGraph(str);
    }

    public void execute() throws Exception {
        validatorSql();
        notifyListener();
        this.tableEnvironment.execute(this.executionContext.getJobName());
    }

    private List<String> getJobTables() {
        try {
            Optional catalog = this.tableEnvironment.getCatalog(this.tableEnvironment.getCurrentCatalog());
            if (catalog.isPresent()) {
                return ((Catalog) catalog.get()).listTables(((Catalog) catalog.get()).getDefaultDatabase());
            }
        } catch (DatabaseNotExistException e) {
            LOG.error("", e);
        }
        return Collections.emptyList();
    }

    private Optional<Table> callCommand(SqlCommandParser.SqlCommandCall sqlCommandCall) {
        switch (sqlCommandCall.command) {
            case SET:
                callSet(sqlCommandCall);
                break;
            case QUERY_SELECT:
                return callSelect(sqlCommandCall);
            case CREATE_TABLE:
                callCreateTable(sqlCommandCall);
                break;
            case INSERT_INTO:
            case INSERT_OVERWRITE:
                callInsertInto(sqlCommandCall);
                break;
            case CREATE_VIEW:
                callCreateView(sqlCommandCall);
                break;
            case CREATE_CATALOG:
                callCreateCatalog(sqlCommandCall);
                break;
            case CREATE_FUNCTION:
                callCreateFunction(sqlCommandCall);
                break;
            default:
                throw new RuntimeException("Unsupported command: " + sqlCommandCall.command);
        }
        return Optional.empty();
    }

    private void callCreateCatalog(SqlCommandParser.SqlCommandCall sqlCommandCall) {
        String str = sqlCommandCall.operands[0];
        try {
            this.tableEnvironment.sqlUpdate(str);
        } catch (SqlParserException e) {
            throw new SqlParserException("SQL parse failed:\n" + str + "\n", e);
        }
    }

    private void callCreateView(SqlCommandParser.SqlCommandCall sqlCommandCall) {
        String str = sqlCommandCall.operands[0];
        try {
            this.tableEnvironment.registerTable(sqlCommandCall.operands[1], this.tableEnvironment.sqlQuery(sqlCommandCall.operands[2]));
        } catch (SqlParserException e) {
            throw new RuntimeException("SQL parse failed:\n" + str + "\n", e);
        }
    }

    private void callCreateFunction(SqlCommandParser.SqlCommandCall sqlCommandCall) {
        String str = sqlCommandCall.operands[0];
        try {
            this.tableEnvironment.sqlUpdate(str);
        } catch (SqlParserException e) {
            throw new RuntimeException("SQL parse failed:\n" + str + "\n", e);
        }
    }

    private void callSet(SqlCommandParser.SqlCommandCall sqlCommandCall) {
        this.tableEnvironment.callSet((String) Preconditions.checkNotNull(sqlCommandCall.operands[0], "call set key is null."), (String) Preconditions.checkNotNull(sqlCommandCall.operands[1], "call set value is null"));
    }

    private void callCreateTable(SqlCommandParser.SqlCommandCall sqlCommandCall) {
        String str = sqlCommandCall.operands[0];
        try {
            this.tableEnvironment.sqlUpdate(str);
        } catch (SqlParserException e) {
            throw new RuntimeException("SQL parse failed:\n" + str + "\n", e);
        }
    }

    private void callInsertInto(SqlCommandParser.SqlCommandCall sqlCommandCall) {
        String str = sqlCommandCall.operands[0];
        try {
            this.tableEnvironment.sqlUpdate(str);
        } catch (SqlParserException e) {
            throw new RuntimeException("SQL parse failed:\n" + str + "\n", e);
        }
    }

    private Optional<Table> callSelect(SqlCommandParser.SqlCommandCall sqlCommandCall) {
        String str = sqlCommandCall.operands[0];
        try {
            return Optional.of(this.tableEnvironment.sqlQuery(str));
        } catch (SqlParserException e) {
            throw new RuntimeException("SQL parse failed:\n" + str + "\n", e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void notifyListener() {
        if (this.listener == null || this.executionContext == null) {
            return;
        }
        this.listener.notify(this.executionContext);
    }

    @VisibleForTesting
    public BaseTableEnvironment getTableEnvironment() {
        return this.tableEnvironment;
    }

    @VisibleForTesting
    public void executeWithoutValidator() throws Exception {
        this.tableEnvironment.execute(this.executionContext.getJobName());
    }

    public StreamExecutionEnvironment getExecutionEnvironment() {
        return this.tableEnvironment.getExecutionEnvironment();
    }
}
