package com.netease.sloth.flink.sql.planner;

import com.netease.sloth.flink.sql.api.context.ExecutionContext;
import com.netease.sloth.flink.sql.parse.SlothSqlUtil;
import com.netease.sloth.flink.sql.planner.LocalExecutorFactory;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.apache.commons.collections.MapUtils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/netease/sloth/flink/sql/planner/LocalJobExecutor.class */
public class LocalJobExecutor extends SlothJobExecutor {
    private static final Logger log = LoggerFactory.getLogger(LocalJobExecutor.class);
    private static final Logger LOG = LoggerFactory.getLogger(LocalJobExecutor.class);
    private MiniCluster miniCluster;
    private JobID jobID;
    private int clusterTmNum;
    private int clusterParallelism;
    private int jobParallelism;

    public LocalJobExecutor(ExecutionContext executionContext, Listener<ExecutionContext> listener) {
        super(executionContext, listener);
        this.clusterTmNum = 1;
        this.clusterParallelism = 1;
        this.jobParallelism = 1;
    }

    public LocalJobExecutor(ExecutionContext executionContext) {
        super(executionContext);
        this.clusterTmNum = 1;
        this.clusterParallelism = 1;
        this.jobParallelism = 1;
    }

    public LocalJobExecutor(ExecutionContext executionContext, int i, int i2, int i3) {
        super(executionContext);
        this.clusterTmNum = 1;
        this.clusterParallelism = 1;
        this.jobParallelism = 1;
        this.clusterTmNum = i;
        this.clusterParallelism = i2;
        this.jobParallelism = i3;
    }

    public void deploy(String str) throws Exception {
        deploy();
    }

    public void deploy() throws Exception {
        validatorSql();
        notifyListener();
        execute();
    }

    public void deployDetached() throws Exception {
        validatorSql();
        notifyListener();
        executeDetached();
    }

    @Override // com.netease.sloth.flink.sql.planner.SlothJobExecutor
    public void execute() throws Exception {
        StreamGraph generate = generate(this.executionContext.getJobName());
        LocalExecutorFactory.LocalExecutionEnvironment localExecutionEnvironment = getLocalExecutionEnvironment();
        localExecutionEnvironment.execute(generate);
        this.miniCluster = localExecutionEnvironment.getMiniCluster();
    }

    public void executeDetached() throws Exception {
        StreamGraph generate = generate(this.executionContext.getJobName());
        LocalExecutorFactory.LocalExecutionEnvironment localExecutionEnvironment = getLocalExecutionEnvironment();
        this.jobID = localExecutionEnvironment.executeDetached(generate);
        this.miniCluster = localExecutionEnvironment.getMiniCluster();
    }

    private LocalExecutorFactory.LocalExecutionEnvironment getLocalExecutionEnvironment() {
        LocalExecutorFactory.LocalExecutionEnvironment localExecutionEnvironment = (LocalExecutorFactory.LocalExecutionEnvironment) getExecutionEnvironment();
        if (this.executionContext.isDebug()) {
            localExecutionEnvironment.setRestartStrategy(new RestartStrategies.NoRestartStrategyConfiguration());
        }
        localExecutionEnvironment.setClusterTmNum(this.clusterTmNum);
        localExecutionEnvironment.setClusterParallelism(this.clusterParallelism);
        localExecutionEnvironment.setParallelism(this.jobParallelism);
        localExecutionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        localExecutionEnvironment.getConfig().setAutoWatermarkInterval(1L);
        initJobName(localExecutionEnvironment);
        Map dependencies = this.executionContext.getDependencies();
        if (MapUtils.isNotEmpty(dependencies)) {
            localExecutionEnvironment.addJarFiles((List) dependencies.values().stream().map(Path::new).collect(Collectors.toList()));
        }
        return localExecutionEnvironment;
    }

    @Override // com.netease.sloth.flink.sql.planner.SlothJobExecutor
    public String getExecutorMode() {
        return LocalExecutorFactory.LOCAL_EXECUTOR;
    }

    protected void initJobName(StreamExecutionEnvironment streamExecutionEnvironment) {
        Map map = streamExecutionEnvironment.getConfig().getGlobalJobParameters().toMap();
        if (map.isEmpty()) {
            map = new HashMap();
        }
        map.put(SlothSqlUtil.JOB_NAME, this.executionContext.getJobName());
        Configuration configuration = new Configuration();
        map.forEach((str, str2) -> {
            configuration.setString(str, str2);
        });
        streamExecutionEnvironment.getConfig().setGlobalJobParameters(configuration);
    }

    public boolean hasCluster() {
        return this.miniCluster != null;
    }

    public boolean isRunning() {
        if (!hasCluster()) {
            return false;
        }
        if (this.jobID == null) {
            return this.miniCluster.isRunning();
        }
        try {
            return !((JobStatus) this.miniCluster.getJobStatus(this.jobID).get()).isGloballyTerminalState();
        } catch (InterruptedException e) {
            return false;
        } catch (ExecutionException e2) {
            return false;
        }
    }

    public void close() throws Exception {
        if (hasCluster()) {
            this.miniCluster.close();
            log.info("miniCluster has closed jobName:{}", this.executionContext.getJobName());
        }
    }
}
