package com.netease.sloth.flink.sql.planner;

import com.netease.sloth.flink.sql.api.classload.ClassLoadWrap;
import com.netease.sloth.flink.sql.api.classload.ClassLoaderException;
import com.netease.sloth.flink.sql.api.classload.FlinkUserCodeClassLoadersWrap;
import com.netease.sloth.flink.sql.config.SqlConf;
import java.net.URI;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.table.delegation.Executor;
import org.apache.flink.table.planner.delegation.BatchExecutor;
import org.apache.flink.table.planner.delegation.BlinkExecutorFactory;
import org.apache.flink.table.planner.delegation.StreamExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/netease/sloth/flink/sql/planner/LocalExecutorFactory.class */
public class LocalExecutorFactory extends BlinkExecutorFactory {
    private static final Logger LOG = LoggerFactory.getLogger(LocalExecutorFactory.class);
    public static final String EXECUTOR_MODE = "executor-mode";
    public static final String LOCAL_EXECUTOR = "local-executor";

    /* loaded from: input_file:com/netease/sloth/flink/sql/planner/LocalExecutorFactory$LocalExecutionEnvironment.class */
    public static class LocalExecutionEnvironment extends StreamExecutionEnvironment {
        private List<Path> jarFiles;
        private MiniCluster miniCluster;
        private int clusterParallelism = 1;
        private int clusterTmNum = 1;

        public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
            JobGraph jobGraph = (JobGraph) new ClassLoadWrap(getFlinkUserCodeClassLoadersWrap(), this.jarFiles).execute(() -> {
                return streamGraph.getJobGraph();
            }).orElse(null);
            if (CollectionUtils.isNotEmpty(this.jarFiles)) {
                Iterator<Path> it = this.jarFiles.iterator();
                while (it.hasNext()) {
                    jobGraph.addJar(it.next());
                }
            }
            MiniCluster prepareMiniCluster = prepareMiniCluster(jobGraph);
            try {
                JobExecutionResult executeJobBlocking = prepareMiniCluster.executeJobBlocking(jobGraph);
                this.transformations.clear();
                prepareMiniCluster.close();
                return executeJobBlocking;
            } catch (Throwable th) {
                this.transformations.clear();
                prepareMiniCluster.close();
                throw th;
            }
        }

        public JobID executeDetached(StreamGraph streamGraph) throws Exception {
            JobGraph jobGraph = (JobGraph) new ClassLoadWrap(getFlinkUserCodeClassLoadersWrap(), this.jarFiles).execute(() -> {
                return streamGraph.getJobGraph();
            }).orElse(null);
            if (CollectionUtils.isNotEmpty(this.jarFiles)) {
                Iterator<Path> it = this.jarFiles.iterator();
                while (it.hasNext()) {
                    jobGraph.addJar(it.next());
                }
            }
            MiniCluster prepareMiniCluster = prepareMiniCluster(jobGraph);
            this.miniCluster = prepareMiniCluster;
            prepareMiniCluster.runDetached(jobGraph);
            return jobGraph.getJobID();
        }

        public MiniCluster getMiniCluster() {
            return this.miniCluster;
        }

        protected FlinkUserCodeClassLoadersWrap getFlinkUserCodeClassLoadersWrap() {
            throw new ClassLoaderException("have not impl this abstract method.");
        }

        public void addJarFiles(List<Path> list) {
            this.jarFiles = list;
        }

        private MiniCluster prepareMiniCluster(JobGraph jobGraph) throws Exception {
            Configuration configuration = new Configuration();
            configuration.addAll(jobGraph.getJobConfiguration());
            configuration.setString(TaskManagerOptions.TOTAL_FLINK_MEMORY.key(), String.valueOf(Integer.MAX_VALUE));
            configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 536870911);
            configuration.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE.key(), String.valueOf(524287));
            if (!configuration.contains(RestOptions.PORT)) {
                configuration.setInteger(RestOptions.PORT, 0);
            }
            MiniClusterConfiguration build = new MiniClusterConfiguration.Builder().setConfiguration(configuration).setNumSlotsPerTaskManager(this.clusterParallelism).setNumTaskManagers(this.clusterTmNum).build();
            if (LocalExecutorFactory.LOG.isInfoEnabled()) {
                LocalExecutorFactory.LOG.info("Running job on local embedded Flink mini cluster");
            }
            MiniCluster miniCluster = new MiniCluster(build);
            miniCluster.start();
            configuration.setInteger(RestOptions.PORT, ((URI) miniCluster.getRestAddress().get()).getPort());
            return miniCluster;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public void setClusterParallelism(int i) {
            this.clusterParallelism = i;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public void setClusterTmNum(int i) {
            this.clusterTmNum = i;
        }
    }

    public Executor create(Map<String, String> map, StreamExecutionEnvironment streamExecutionEnvironment) {
        StreamExecutionEnvironment executionEnvironment = getExecutionEnvironment();
        if (!Boolean.parseBoolean(map.getOrDefault("streaming-mode", "true"))) {
            return new BatchExecutor(executionEnvironment);
        }
        String str = map.get(SqlConf.CHECKPOINT_INTERVAL.toLowerCase());
        executionEnvironment.getCheckpointConfig().setCheckpointInterval((StringUtils.isNotBlank(str) ? Long.valueOf(str) : SqlConf.DEFAULT_CHECKPOINT_INTERVAL).longValue() * 1000);
        String str2 = map.get(SqlConf.CHECKPOINT_TIMEOUT.toLowerCase());
        executionEnvironment.getCheckpointConfig().setCheckpointTimeout((StringUtils.isNoneBlank(new CharSequence[]{str2}) ? Long.valueOf(str2) : SqlConf.DEFAULT_CHECKPOINT_TIMEOUT).longValue() * 1000);
        String str3 = map.get(SqlConf.CHECKPOINT_MODE.toLowerCase());
        if (StringUtils.isBlank(str3)) {
            str3 = CheckpointingMode.EXACTLY_ONCE.name();
        }
        executionEnvironment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.valueOf(str3));
        executionEnvironment.setStateBackend(new FsStateBackend("file:///Users/shidayang/Desktop/tmp"));
        return new StreamExecutor(executionEnvironment);
    }

    protected StreamExecutionEnvironment getExecutionEnvironment() {
        return new LocalExecutionEnvironment();
    }

    public Map<String, String> requiredContext() {
        HashMap hashMap = new HashMap(super.requiredContext());
        hashMap.put("executor-mode", LOCAL_EXECUTOR);
        return hashMap;
    }
}
