package org.apache.flink.yarn;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.pattern.Patterns;
import akka.util.Timeout;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.net.ConnectionUtils;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
import org.apache.flink.runtime.yarn.FlinkYarnClusterStatus;
import org.apache.flink.shaded.com.google.common.base.Preconditions;
import org.apache.flink.yarn.YarnMessages;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.None$;
import scala.Option;
import scala.Some;
import scala.Tuple2;
import scala.concurrent.Await;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:org/apache/flink/yarn/FlinkYarnCluster.class */
public class FlinkYarnCluster extends AbstractFlinkYarnCluster {
    private static final Logger LOG = LoggerFactory.getLogger(FlinkYarnCluster.class);
    private static final int POLLING_THREAD_INTERVAL_MS = 1000;
    private YarnClient yarnClient;
    private Thread actorRunner;
    private PollingThread pollingRunner;
    private final Configuration hadoopConfig;
    private final Path sessionFilesDir;
    private final InetSocketAddress jobManagerAddress;
    private ActorSystem actorSystem;
    private ActorRef applicationClient;
    private ApplicationReport intialAppReport;
    private final FiniteDuration akkaDuration;
    private final Timeout akkaTimeout;
    private final ApplicationId applicationId;
    private final boolean detached;
    private final org.apache.flink.configuration.Configuration flinkConfig;
    private final ApplicationId appId;
    private Thread clientShutdownHook = new ClientShutdownHook();
    private boolean isConnected = false;
    private AtomicBoolean hasBeenShutDown = new AtomicBoolean(false);

    /* loaded from: input_file:org/apache/flink/yarn/FlinkYarnCluster$ClientShutdownHook.class */
    public class ClientShutdownHook extends Thread {
        public ClientShutdownHook() {
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            FlinkYarnCluster.LOG.info("Shutting down FlinkYarnCluster from the client shutdown hook");
            FlinkYarnCluster.this.shutdown(false);
        }
    }

    /* loaded from: input_file:org/apache/flink/yarn/FlinkYarnCluster$PollingThread.class */
    public static class PollingThread extends Thread {
        private YarnClient yarnClient;
        private ApplicationId appId;
        private ApplicationReport lastReport;
        AtomicBoolean running = new AtomicBoolean(true);
        private final Object lock = new Object();

        public PollingThread(YarnClient yarnClient, ApplicationId applicationId) {
            this.yarnClient = yarnClient;
            this.appId = applicationId;
        }

        public void stopRunner() {
            if (!this.running.get()) {
                FlinkYarnCluster.LOG.warn("Polling thread was already stopped");
            }
            this.running.set(false);
        }

        public ApplicationReport getLastReport() {
            ApplicationReport applicationReport;
            synchronized (this.lock) {
                applicationReport = this.lastReport;
            }
            return applicationReport;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (this.running.get() && this.yarnClient.isInState(Service.STATE.STARTED)) {
                try {
                    ApplicationReport applicationReport = this.yarnClient.getApplicationReport(this.appId);
                    synchronized (this.lock) {
                        this.lastReport = applicationReport;
                    }
                } catch (Exception e) {
                    FlinkYarnCluster.LOG.warn("Error while getting application report", e);
                }
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException e2) {
                    FlinkYarnCluster.LOG.error("Polling thread got interrupted", e2);
                    Thread.currentThread().interrupt();
                }
            }
            if (!this.running.get() || this.yarnClient.isInState(Service.STATE.STARTED)) {
                return;
            }
            FlinkYarnCluster.LOG.warn("YARN client is unexpected in state " + this.yarnClient.getServiceState());
        }
    }

    public FlinkYarnCluster(YarnClient yarnClient, ApplicationId applicationId, Configuration configuration, org.apache.flink.configuration.Configuration configuration2, Path path, boolean z) throws IOException, YarnException {
        this.akkaDuration = AkkaUtils.getTimeout(configuration2);
        this.akkaTimeout = Timeout.durationToTimeout(this.akkaDuration);
        this.yarnClient = yarnClient;
        this.hadoopConfig = configuration;
        this.sessionFilesDir = path;
        this.applicationId = applicationId;
        this.detached = z;
        this.flinkConfig = configuration2;
        this.appId = applicationId;
        this.intialAppReport = yarnClient.getApplicationReport(applicationId);
        this.jobManagerAddress = new InetSocketAddress(this.intialAppReport.getHost(), this.intialAppReport.getRpcPort());
    }

    public void connectToCluster() throws IOException {
        if (this.isConnected) {
            throw new IllegalStateException("Can not connect to the cluster again");
        }
        LOG.info("Start actor system.");
        this.actorSystem = AkkaUtils.createActorSystem(this.flinkConfig, new Some(new Tuple2(ConnectionUtils.findConnectingAddress(this.jobManagerAddress, 2000L, 400L).getCanonicalHostName(), 0)));
        this.flinkConfig.setString("jobmanager.rpc.address", this.jobManagerAddress.getHostName());
        this.flinkConfig.setInteger("jobmanager.rpc.port", this.jobManagerAddress.getPort());
        try {
            LeaderRetrievalService createLeaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(this.flinkConfig);
            LOG.info("Start application client.");
            this.applicationClient = this.actorSystem.actorOf(Props.create(ApplicationClient.class, new Object[]{this.flinkConfig, createLeaderRetrievalService}), "applicationClient");
            this.actorRunner = new Thread(new Runnable() { // from class: org.apache.flink.yarn.FlinkYarnCluster.1
                @Override // java.lang.Runnable
                public void run() {
                    FlinkYarnCluster.this.actorSystem.awaitTermination();
                    try {
                        ApplicationReport applicationReport = FlinkYarnCluster.this.yarnClient.getApplicationReport(FlinkYarnCluster.this.appId);
                        FlinkYarnCluster.LOG.info("Application " + FlinkYarnCluster.this.appId + " finished with state " + applicationReport.getYarnApplicationState() + " and final state " + applicationReport.getFinalApplicationStatus() + " at " + applicationReport.getFinishTime());
                        if (applicationReport.getYarnApplicationState() == YarnApplicationState.FAILED || applicationReport.getYarnApplicationState() == YarnApplicationState.KILLED) {
                            FlinkYarnCluster.LOG.warn("Application failed. Diagnostics " + applicationReport.getDiagnostics());
                            FlinkYarnCluster.LOG.warn("If log aggregation is activated in the Hadoop cluster, we recommend to retrieve the full application log using this command:\n\tyarn logs -applicationId " + applicationReport.getApplicationId() + "\n(It sometimes takes a few seconds until the logs are aggregated)");
                        }
                    } catch (Exception e) {
                        FlinkYarnCluster.LOG.warn("Error while getting final application report", e);
                    }
                }
            });
            this.actorRunner.setDaemon(true);
            this.actorRunner.start();
            this.pollingRunner = new PollingThread(this.yarnClient, this.appId);
            this.pollingRunner.setDaemon(true);
            this.pollingRunner.start();
            Runtime.getRuntime().addShutdownHook(this.clientShutdownHook);
            this.isConnected = true;
        } catch (Exception e) {
            throw new IOException("Could not create the leader retrieval service.", e);
        }
    }

    public void disconnect() {
        if (!this.isConnected) {
            throw new IllegalStateException("Can not disconnect from an unconnected cluster.");
        }
        LOG.info("Disconnecting FlinkYarnCluster from ApplicationMaster");
        if (!Runtime.getRuntime().removeShutdownHook(this.clientShutdownHook)) {
            LOG.warn("Error while removing the shutdown hook. The YARN session might be killed unintentionally");
        }
        this.applicationClient.tell(PoisonPill.getInstance(), this.applicationClient);
        try {
            this.actorRunner.join(1000L);
        } catch (InterruptedException e) {
            LOG.warn("Shutdown of the actor runner was interrupted", e);
            Thread.currentThread().interrupt();
        }
        try {
            this.pollingRunner.stopRunner();
            this.pollingRunner.join(1000L);
        } catch (InterruptedException e2) {
            LOG.warn("Shutdown of the polling runner was interrupted", e2);
            Thread.currentThread().interrupt();
        }
        this.isConnected = false;
    }

    public void stopAfterJob(JobID jobID) {
        Preconditions.checkNotNull("The job id must not be null", jobID);
        try {
            Await.result(Patterns.ask(this.applicationClient, new YarnMessages.LocalStopAMAfterJob(jobID), this.akkaTimeout), this.akkaDuration);
        } catch (Exception e) {
            throw new RuntimeException("Unable to tell application master to stop once the specified job has been finised", e);
        }
    }

    public org.apache.flink.configuration.Configuration getFlinkConfiguration() {
        return this.flinkConfig;
    }

    public InetSocketAddress getJobManagerAddress() {
        return this.jobManagerAddress;
    }

    public String getWebInterfaceURL() {
        String trackingUrl = this.intialAppReport.getTrackingUrl();
        if (!trackingUrl.startsWith("http://")) {
            trackingUrl = "http://" + trackingUrl;
        }
        return trackingUrl;
    }

    public String getApplicationId() {
        return this.applicationId.toString();
    }

    public boolean isDetached() {
        return this.detached;
    }

    public FlinkYarnClusterStatus getClusterStatus() {
        if (!this.isConnected) {
            throw new IllegalStateException("The cluster is not connected to the ApplicationMaster.");
        }
        if (hasBeenStopped()) {
            throw new RuntimeException("The FlinkYarnCluster has already been stopped");
        }
        try {
            Object result = Await.result(Patterns.ask(this.applicationClient, YarnMessages.getLocalGetyarnClusterStatus(), this.akkaTimeout), this.akkaDuration);
            if (result instanceof None$) {
                return null;
            }
            if (result instanceof Some) {
                return (FlinkYarnClusterStatus) ((Some) result).get();
            }
            throw new RuntimeException("Unexpected type: " + result.getClass().getCanonicalName());
        } catch (Exception e) {
            throw new RuntimeException("Unable to get Cluster status from Application Client", e);
        }
    }

    public boolean hasFailed() {
        if (!this.isConnected) {
            throw new IllegalStateException("The cluster has been connected to the ApplicationMaster.");
        }
        if (this.pollingRunner == null) {
            LOG.warn("FlinkYarnCluster.hasFailed() has been called on an uninitialized cluster.The system might be in an erroneous state");
        }
        ApplicationReport lastReport = this.pollingRunner.getLastReport();
        if (lastReport == null) {
            LOG.warn("FlinkYarnCluster.hasFailed() has been called on a cluster that didn't receive a status so far.The system might be in an erroneous state");
            return false;
        }
        YarnApplicationState yarnApplicationState = lastReport.getYarnApplicationState();
        boolean z = yarnApplicationState == YarnApplicationState.FAILED || yarnApplicationState == YarnApplicationState.KILLED;
        if (z) {
            LOG.warn("YARN reported application state {}", yarnApplicationState);
            LOG.warn("Diagnostics: {}", lastReport.getDiagnostics());
        }
        return z;
    }

    public String getDiagnostics() {
        if (!this.isConnected) {
            throw new IllegalStateException("The cluster has been connected to the ApplicationMaster.");
        }
        if (!hasFailed()) {
            LOG.warn("getDiagnostics() called for cluster which is not in failed state");
        }
        ApplicationReport lastReport = this.pollingRunner.getLastReport();
        if (lastReport != null) {
            return lastReport.getDiagnostics();
        }
        LOG.warn("Last report is null");
        return null;
    }

    public List<String> getNewMessages() {
        ArrayList arrayList;
        if (!this.isConnected) {
            throw new IllegalStateException("The cluster has been connected to the ApplicationMaster.");
        }
        if (hasBeenStopped()) {
            throw new RuntimeException("The FlinkYarnCluster has already been stopped");
        }
        arrayList = new ArrayList();
        while (true) {
            try {
                Object result = Await.result(Patterns.ask(this.applicationClient, YarnMessages.getLocalGetYarnMessage(), new Timeout(this.akkaDuration)), this.akkaDuration);
                if (!(result instanceof Option)) {
                    throw new RuntimeException("LocalGetYarnMessage requires a response of type Option. Instead the response is of type " + result.getClass() + ".");
                }
                Option option = (Option) result;
                LOG.debug("Received message option {}", option);
                if (option.isEmpty()) {
                    break;
                }
                Object obj = option.get();
                if (obj instanceof YarnMessages.YarnMessage) {
                    YarnMessages.YarnMessage yarnMessage = (YarnMessages.YarnMessage) obj;
                    arrayList.add("[" + yarnMessage.date() + "] " + yarnMessage.message());
                } else {
                    LOG.warn("LocalGetYarnMessage returned unexpected type: " + option);
                }
            } catch (Exception e) {
                LOG.warn("Error retrieving the YARN messages locally", e);
            }
        }
        return arrayList;
    }

    public void shutdown(boolean z) {
        if (!this.isConnected) {
            throw new IllegalStateException("The cluster has been connected to the ApplicationMaster.");
        }
        if (this.hasBeenShutDown.getAndSet(true)) {
            return;
        }
        try {
            Runtime.getRuntime().removeShutdownHook(this.clientShutdownHook);
        } catch (IllegalStateException e) {
        }
        if (this.actorSystem != null) {
            LOG.info("Sending shutdown request to the Application Master");
            if (this.applicationClient != ActorRef.noSender()) {
                try {
                    Await.ready(Patterns.ask(this.applicationClient, new YarnMessages.LocalStopYarnSession(z ? FinalApplicationStatus.FAILED : FinalApplicationStatus.SUCCEEDED, "Flink YARN Client requested shutdown"), new Timeout(this.akkaDuration)), this.akkaDuration);
                } catch (Exception e2) {
                    LOG.warn("Error while stopping YARN Application Client", e2);
                }
            }
            this.actorSystem.shutdown();
            this.actorSystem.awaitTermination();
            this.actorSystem = null;
        }
        LOG.info("Deleting files in " + this.sessionFilesDir);
        try {
            FileSystem fileSystem = FileSystem.get(this.hadoopConfig);
            fileSystem.delete(this.sessionFilesDir, true);
            fileSystem.close();
        } catch (IOException e3) {
            LOG.error("Could not delete the Flink jar and configuration files in HDFS..", e3);
        }
        try {
            this.actorRunner.join(1000L);
        } catch (InterruptedException e4) {
            LOG.warn("Shutdown of the actor runner was interrupted", e4);
            Thread.currentThread().interrupt();
        }
        try {
            this.pollingRunner.stopRunner();
            this.pollingRunner.join(1000L);
        } catch (InterruptedException e5) {
            LOG.warn("Shutdown of the polling runner was interrupted", e5);
            Thread.currentThread().interrupt();
        }
        LOG.info("YARN Client is shutting down");
        this.yarnClient.stop();
        this.yarnClient = null;
    }

    public boolean hasBeenStopped() {
        return this.hasBeenShutDown.get();
    }
}
