package com.netease.sloth.flink.connector.filesystem.hdfs;

import com.netease.sloth.flink.connector.filesystem.FileSystemWrapper;
import com.netease.sloth.flink.connector.filesystem.hdfs.HadoopRecoverableCompressionOutputStream;
import com.netease.sloth.flink.connector.filesystem.hdfs.HadoopRecoverableFsDataOutputStream;
import com.netease.sloth.flink.connector.filesystem.hdfs.util.CodecUtils;
import com.netease.sloth.flink.connector.filesystem.table.meta.TableMetaStore;
import java.io.IOException;
import java.net.URI;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
import org.apache.flink.core.fs.RecoverableWriter;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.util.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/netease/sloth/flink/connector/filesystem/hdfs/SlothRecoverableWriter.class */
public class SlothRecoverableWriter implements RecoverableWriter {
    private static final Logger LOG = LoggerFactory.getLogger(SlothRecoverableWriter.class);
    private FileSystem fs;
    private final String compression;
    private final TableMetaStore tableMetaStore;
    private String refreshPoolName = "sloth-refresh-kerberos";
    private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1, new ExecutorThreadFactory(this.refreshPoolName));

    public SlothRecoverableWriter(URI uri, String str, TableMetaStore tableMetaStore) {
        this.compression = str;
        this.tableMetaStore = (TableMetaStore) Preconditions.checkNotNull(tableMetaStore, "tableMetaStore is null.");
        this.fs = getFileSystem(uri);
        this.scheduler.scheduleAtFixedRate(() -> {
            try {
                tableMetaStore.getUGI();
            } catch (Exception e) {
                LOG.info("sloth refresh kerberos error:{}", ExceptionUtils.getStackTrace(e));
            }
        }, 0L, 10, TimeUnit.MINUTES);
    }

    public RecoverableFsDataOutputStream open(Path path) throws IOException {
        org.apache.hadoop.fs.Path hadoopPath = HadoopFileSystem.toHadoopPath(path);
        org.apache.hadoop.fs.Path generateStagingTempFilePath = generateStagingTempFilePath(this.fs, hadoopPath);
        LOG.info("open FsDataOutputStream, fs: {}, tempFile: {}.", this.fs, generateStagingTempFilePath);
        try {
            if (!StringUtils.isNotBlank(this.compression)) {
                return (RecoverableFsDataOutputStream) this.tableMetaStore.getUGI().doAs(() -> {
                    return new HadoopRecoverableFsDataOutputStreamWrapper(new HadoopRecoverableFsDataOutputStream(this.fs, hadoopPath, generateStagingTempFilePath), this.tableMetaStore);
                });
            }
            CompressionCodec codecByName = CodecUtils.getCodecByName(this.compression, this.tableMetaStore.getConfiguration());
            return (RecoverableFsDataOutputStream) this.tableMetaStore.getUGI().doAs(() -> {
                return new HadoopRecoverableCompressionOutputStreamWrapper(new HadoopRecoverableCompressionOutputStream(this.fs, hadoopPath, generateStagingTempFilePath, codecByName), this.tableMetaStore);
            });
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    public RecoverableFsDataOutputStream recover(RecoverableWriter.ResumeRecoverable resumeRecoverable) throws IOException {
        LOG.info("recover FsDataOutputStream, fs: {}.", this.fs);
        try {
            if (!(resumeRecoverable instanceof HadoopFsRecoverable)) {
                throw new IllegalArgumentException("Hadoop File System cannot recover a recoverable for another file system: " + resumeRecoverable);
            }
            if (!StringUtils.isNotBlank(this.compression)) {
                return (RecoverableFsDataOutputStream) this.tableMetaStore.getUGI().doAs(() -> {
                    return new HadoopRecoverableFsDataOutputStreamWrapper(new HadoopRecoverableFsDataOutputStream(this.fs, (HadoopFsRecoverable) resumeRecoverable), this.tableMetaStore);
                });
            }
            CompressionCodec codecByName = CodecUtils.getCodecByName(this.compression, this.tableMetaStore.getConfiguration());
            return (RecoverableFsDataOutputStream) this.tableMetaStore.getUGI().doAs(() -> {
                return new HadoopRecoverableCompressionOutputStreamWrapper(new HadoopRecoverableCompressionOutputStream(this.fs, (HadoopFsRecoverable) resumeRecoverable, codecByName), this.tableMetaStore);
            });
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    public boolean requiresCleanupOfRecoverableState() {
        return false;
    }

    public boolean cleanupRecoverableState(RecoverableWriter.ResumeRecoverable resumeRecoverable) throws IOException {
        return false;
    }

    public RecoverableFsDataOutputStream.Committer recoverForCommit(RecoverableWriter.CommitRecoverable commitRecoverable) throws IOException {
        if (commitRecoverable instanceof HadoopFsRecoverable) {
            return StringUtils.isNotBlank(this.compression) ? new HadoopRecoverableCompressionOutputStream.HadoopFsCommitter(this.fs, (HadoopFsRecoverable) commitRecoverable) : new HadoopRecoverableFsDataOutputStream.HadoopFsCommitter(this.fs, (HadoopFsRecoverable) commitRecoverable);
        }
        throw new IllegalArgumentException("Hadoop File System  cannot recover a recoverable for another file system: " + commitRecoverable);
    }

    public SimpleVersionedSerializer<RecoverableWriter.CommitRecoverable> getCommitRecoverableSerializer() {
        return HadoopRecoverableSerializer.INSTANCE;
    }

    public SimpleVersionedSerializer<RecoverableWriter.ResumeRecoverable> getResumeRecoverableSerializer() {
        return HadoopRecoverableSerializer.INSTANCE;
    }

    public boolean supportsResume() {
        LOG.info("judge resume compression:{}.", this.compression);
        if (StringUtils.isNotBlank(this.compression) && "lzop".equalsIgnoreCase(this.compression)) {
            LOG.info("don't support resume, it couldn't read success after lzop compress output stream closed once");
            return false;
        }
        LOG.info("support resume.");
        return true;
    }

    @VisibleForTesting
    org.apache.hadoop.fs.Path generateStagingTempFilePath(FileSystem fileSystem, org.apache.hadoop.fs.Path path) throws IOException {
        org.apache.hadoop.fs.Path path2;
        Preconditions.checkArgument(path.isAbsolute(), "targetFile must be absolute");
        org.apache.hadoop.fs.Path parent = path.getParent();
        String name = path.getName();
        Preconditions.checkArgument(parent != null, "targetFile must not be the root directory");
        do {
            path2 = new org.apache.hadoop.fs.Path(parent, "." + name + ".inprogress." + UUID.randomUUID().toString());
            LOG.info("fs={}", fileSystem.toString());
            LOG.info("current user name={}, targetFile={}.", UserGroupInformation.getCurrentUser().getUserName(), path2);
            LOG.info("hadoop.security.authentication={}.", fileSystem.getConf().get("hadoop.security.authentication"));
        } while (((Boolean) this.tableMetaStore.getUGI().doAs(() -> {
            try {
                return Boolean.valueOf(fileSystem.exists(path2));
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        })).booleanValue());
        LOG.info("candidate={} is not exists.", path2);
        return path2;
    }

    public FileSystem getFileSystem(URI uri) {
        try {
            Configuration configuration = this.tableMetaStore.getConfiguration();
            return (FileSystem) this.tableMetaStore.doAs(() -> {
                return new FileSystemWrapper(FileSystem.newInstance(uri, configuration), this.tableMetaStore);
            });
        } catch (Exception e) {
            LOG.error("get FileSystem failed:", e);
            throw new RuntimeException("get fs failed.");
        }
    }

    public void close() {
        if (!this.scheduler.isShutdown()) {
            this.scheduler.shutdown();
        }
        LOG.info("scheduler poll: {} has been closed.", this.refreshPoolName);
        try {
            this.fs.close();
            LOG.info("the filesystem:{} has been closed", this.fs);
        } catch (IOException e) {
            LOG.error("Could not close filesystem successfully.{}", this.fs.toString());
        }
    }
}
