/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.tools.fedbalance;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
import org.apache.hadoop.hdfs.protocol.OpenFilesIterator;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
import org.apache.hadoop.hdfs.protocol.proto.AclProtos;
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.tools.DistCp;
import org.apache.hadoop.tools.OptionsParser;
import org.apache.hadoop.tools.fedbalance.FedBalanceContext;
import org.apache.hadoop.tools.fedbalance.procedure.BalanceProcedure;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DistCpProcedure
extends BalanceProcedure {
    public static final Logger LOG = LoggerFactory.getLogger(DistCpProcedure.class);
    private FedBalanceContext context;
    private Path src;
    private Path dst;
    private Configuration conf;
    private int mapNum;
    private int bandWidth;
    private String jobId;
    private Stage stage;
    private boolean forceCloseOpenFiles;
    private boolean useMountReadOnly;
    private int diffThreshold;
    private FsPermission fPerm;
    private AclStatus acl;
    private JobClient client;
    private DistributedFileSystem srcFs;
    private DistributedFileSystem dstFs;
    @VisibleForTesting
    private Job localJob;
    @VisibleForTesting
    static boolean enabledForTest = false;

    public static void enableForTest() {
        enabledForTest = true;
    }

    public static void disableForTest() {
        enabledForTest = false;
    }

    public DistCpProcedure() {
    }

    public DistCpProcedure(String name, String nextProcedure, long delayDuration, FedBalanceContext context) throws IOException {
        super(name, nextProcedure, delayDuration);
        this.context = context;
        this.src = context.getSrc();
        this.dst = context.getDst();
        this.conf = context.getConf();
        this.client = new JobClient(this.conf);
        this.stage = Stage.PRE_CHECK;
        this.mapNum = context.getMapNum();
        this.bandWidth = context.getBandwidthLimit();
        this.forceCloseOpenFiles = context.getForceCloseOpenFiles();
        this.useMountReadOnly = context.getUseMountReadOnly();
        this.diffThreshold = context.getDiffThreshold();
        this.srcFs = (DistributedFileSystem)context.getSrc().getFileSystem(this.conf);
        this.dstFs = (DistributedFileSystem)context.getDst().getFileSystem(this.conf);
    }

    @Override
    public boolean execute() throws BalanceProcedure.RetryException, IOException {
        LOG.info("Stage={}", (Object)this.stage.name());
        switch (this.stage) {
            case PRE_CHECK: {
                this.preCheck();
                return false;
            }
            case INIT_DISTCP: {
                this.initDistCp();
                return false;
            }
            case DIFF_DISTCP: {
                this.diffDistCp();
                return false;
            }
            case DISABLE_WRITE: {
                this.disableWrite(this.context);
                return false;
            }
            case FINAL_DISTCP: {
                this.finalDistCp();
                return false;
            }
            case FINISH: {
                this.finish();
                return true;
            }
        }
        throw new IOException("Unexpected stage=" + this.stage);
    }

    void preCheck() throws IOException {
        FileStatus status = this.srcFs.getFileStatus(this.src);
        if (!status.isDirectory()) {
            throw new IOException(this.src + " should be a directory.");
        }
        if (this.dstFs.exists(this.dst)) {
            throw new IOException(this.dst + " already exists.");
        }
        if (this.srcFs.exists(new Path(this.src, ".snapshot"))) {
            throw new IOException(this.src + " shouldn't enable snapshot.");
        }
        this.updateStage(Stage.INIT_DISTCP);
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    void initDistCp() throws IOException, BalanceProcedure.RetryException {
        RunningJobStatus job = this.getCurrentJob();
        if (job != null) {
            if (!job.isComplete()) throw new BalanceProcedure.RetryException();
            this.jobId = null;
            if (job.isSuccessful()) {
                this.updateStage(Stage.DIFF_DISTCP);
                return;
            }
            LOG.warn("DistCp failed. Failure={}", (Object)job.getFailureInfo());
            return;
        } else {
            this.pathCheckBeforeInitDistcp();
            this.srcFs.createSnapshot(this.src, "DISTCP-BALANCE-NEXT");
            this.jobId = this.submitDistCpJob(this.src.toString() + "/.snapshot/DISTCP-BALANCE-NEXT", this.dst.toString(), false);
        }
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    void diffDistCp() throws IOException, BalanceProcedure.RetryException {
        RunningJobStatus job = this.getCurrentJob();
        if (job != null) {
            if (!job.isComplete()) throw new BalanceProcedure.RetryException();
            this.jobId = null;
            if (!job.isSuccessful()) throw new IOException("DistCp failed. jobId=" + job.getJobID() + " failure=" + job.getFailureInfo());
            LOG.info("DistCp succeeded. jobId={}", (Object)job.getJobID());
            return;
        } else if (this.diffDistCpStageDone()) {
            this.updateStage(Stage.DISABLE_WRITE);
            return;
        } else {
            this.submitDiffDistCp();
        }
    }

    protected void disableWrite(FedBalanceContext fbcontext) throws IOException {
        FileStatus status = this.srcFs.getFileStatus(this.src);
        this.fPerm = status.getPermission();
        this.acl = this.srcFs.getAclStatus(this.src);
        this.srcFs.setPermission(this.src, FsPermission.createImmutable((short)0));
        this.updateStage(Stage.FINAL_DISTCP);
    }

    protected void enableWrite() throws IOException {
        this.restorePermission();
    }

    void restorePermission() throws IOException {
        this.dstFs.removeAcl(this.dst);
        if (this.acl != null) {
            this.dstFs.modifyAclEntries(this.dst, this.acl.getEntries());
        }
        if (this.fPerm != null) {
            this.dstFs.setPermission(this.dst, this.fPerm);
        }
    }

    void finalDistCp() throws IOException, BalanceProcedure.RetryException {
        this.closeAllOpenFiles(this.srcFs, this.src);
        RunningJobStatus job = this.getCurrentJob();
        if (job != null) {
            if (job.isComplete()) {
                this.jobId = null;
                if (job.isSuccessful()) {
                    this.updateStage(Stage.FINISH);
                    return;
                }
                throw new IOException("Final DistCp failed. Failure: " + job.getFailureInfo());
            }
            throw new BalanceProcedure.RetryException();
        }
        this.submitDiffDistCp();
    }

    void finish() throws IOException {
        this.enableWrite();
        if (this.srcFs.exists(this.src)) {
            DistCpProcedure.cleanupSnapshot(this.srcFs, this.src);
        }
        if (this.dstFs.exists(this.dst)) {
            DistCpProcedure.cleanupSnapshot(this.dstFs, this.dst);
        }
    }

    @VisibleForTesting
    Stage getStage() {
        return this.stage;
    }

    @VisibleForTesting
    protected void updateStage(Stage value) {
        String oldStage = this.stage == null ? "null" : this.stage.name();
        String newStage = value == null ? "null" : value.name();
        LOG.info("Stage updated from {} to {}.", (Object)oldStage, (Object)newStage);
        this.stage = value;
    }

    private void submitDiffDistCp() throws IOException {
        DistCpProcedure.enableSnapshot(this.dstFs, this.dst);
        DistCpProcedure.deleteSnapshot(this.srcFs, this.src, "DISTCP-BALANCE-CURRENT");
        DistCpProcedure.deleteSnapshot(this.dstFs, this.dst, "DISTCP-BALANCE-CURRENT");
        this.dstFs.createSnapshot(this.dst, "DISTCP-BALANCE-CURRENT");
        this.srcFs.renameSnapshot(this.src, "DISTCP-BALANCE-NEXT", "DISTCP-BALANCE-CURRENT");
        this.srcFs.createSnapshot(this.src, "DISTCP-BALANCE-NEXT");
        this.jobId = this.submitDistCpJob(this.src.toString(), this.dst.toString(), true);
    }

    private void closeAllOpenFiles(DistributedFileSystem dfs, Path path) throws IOException {
        RemoteIterator iterator;
        String pathStr = path.toUri().getPath();
        while ((iterator = dfs.listOpenFiles(EnumSet.of(OpenFilesIterator.OpenFilesType.ALL_OPEN_FILES), pathStr)).hasNext()) {
            while (iterator.hasNext()) {
                OpenFileEntry e = (OpenFileEntry)iterator.next();
                try {
                    this.srcFs.recoverLease(new Path(e.getFilePath()));
                }
                catch (IOException iOException) {}
            }
        }
    }

    @VisibleForTesting
    boolean diffDistCpStageDone() throws IOException, BalanceProcedure.RetryException {
        int diffSize = this.getDiffSize();
        if (diffSize <= this.diffThreshold) {
            if (this.forceCloseOpenFiles || !this.verifyOpenFiles()) {
                return true;
            }
            throw new BalanceProcedure.RetryException();
        }
        return false;
    }

    private int getDiffSize() throws IOException {
        SnapshotDiffReport diffReport = this.srcFs.getSnapshotDiffReport(this.src, "DISTCP-BALANCE-NEXT", "");
        return diffReport.getDiffList().size();
    }

    private boolean verifyOpenFiles() throws IOException {
        RemoteIterator iterator = this.srcFs.listOpenFiles(EnumSet.of(OpenFilesIterator.OpenFilesType.ALL_OPEN_FILES), this.src.toString());
        return iterator.hasNext();
    }

    private RunningJobStatus getCurrentJob() throws IOException {
        if (this.jobId != null) {
            if (enabledForTest) {
                return this.getCurrentLocalJob();
            }
            RunningJob latestJob = this.client.getJob(JobID.forName((String)this.jobId));
            return latestJob == null ? null : new YarnRunningJobStatus(latestJob);
        }
        return null;
    }

    private LocalJobStatus getCurrentLocalJob() throws IOException {
        if (this.localJob != null) {
            Job latestJob;
            try {
                latestJob = this.localJob.getCluster().getJob((org.apache.hadoop.mapreduce.JobID)JobID.forName((String)this.jobId));
            }
            catch (InterruptedException e) {
                throw new IOException(e);
            }
            return latestJob == null ? null : new LocalJobStatus(latestJob);
        }
        return null;
    }

    private void pathCheckBeforeInitDistcp() throws IOException {
        if (this.dstFs.exists(this.dst)) {
            throw new IOException("The dst path=" + this.dst + " already exists. The admin should delete it before submitting the initial distcp job.");
        }
        Path snapshotPath = new Path(this.src, ".snapshot/DISTCP-BALANCE-NEXT");
        if (this.srcFs.exists(snapshotPath)) {
            throw new IOException("The src snapshot=" + snapshotPath + " already exists. The admin should delete the snapshot before submitting the initial distcp.");
        }
        this.srcFs.allowSnapshot(this.src);
    }

    private String submitDistCpJob(String srcParam, String dstParam, boolean useSnapshotDiff) throws IOException {
        ArrayList<Object> command = new ArrayList<Object>();
        command.addAll(Arrays.asList("-async", "-update", "-append", "-pruxgpcab"));
        if (useSnapshotDiff) {
            command.add("-diff");
            command.add("DISTCP-BALANCE-CURRENT");
            command.add("DISTCP-BALANCE-NEXT");
        }
        command.add("-m");
        command.add("" + this.mapNum);
        command.add("-bandwidth");
        command.add("" + this.bandWidth);
        command.add(srcParam);
        command.add(dstParam);
        Configuration config = new Configuration(this.conf);
        try {
            DistCp distCp = new DistCp(config, OptionsParser.parse((String[])command.toArray(new String[0])));
            Job job = distCp.createAndSubmitJob();
            LOG.info("Submit distcp job={}", (Object)job);
            if (enabledForTest) {
                this.localJob = job;
            }
            return job.getJobID().toString();
        }
        catch (Exception e) {
            throw new IOException("Submit job failed.", e);
        }
    }

    @Override
    public void write(DataOutput out) throws IOException {
        super.write(out);
        this.context.write(out);
        if (this.jobId == null) {
            out.writeBoolean(false);
        } else {
            out.writeBoolean(true);
            Text.writeString((DataOutput)out, (String)this.jobId);
        }
        out.writeInt(this.stage.ordinal());
        if (this.fPerm == null) {
            out.writeBoolean(false);
        } else {
            out.writeBoolean(true);
            out.writeShort(this.fPerm.toShort());
        }
        if (this.acl == null) {
            out.writeBoolean(false);
        } else {
            out.writeBoolean(true);
            ByteArrayOutputStream bout = new ByteArrayOutputStream();
            PBHelperClient.convert((AclStatus)this.acl).writeDelimitedTo((OutputStream)bout);
            byte[] data = bout.toByteArray();
            out.writeInt(data.length);
            out.write(data);
        }
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        super.readFields(in);
        this.context = new FedBalanceContext();
        this.context.readFields(in);
        this.src = this.context.getSrc();
        this.dst = this.context.getDst();
        this.conf = this.context.getConf();
        if (in.readBoolean()) {
            this.jobId = Text.readString((DataInput)in);
        }
        this.stage = Stage.values()[in.readInt()];
        if (in.readBoolean()) {
            this.fPerm = FsPermission.read((DataInput)in);
        }
        if (in.readBoolean()) {
            int len = in.readInt();
            byte[] data = new byte[len];
            in.readFully(data);
            ByteArrayInputStream bin = new ByteArrayInputStream(data);
            AclProtos.GetAclStatusResponseProto proto = AclProtos.GetAclStatusResponseProto.parseDelimitedFrom((InputStream)bin);
            this.acl = PBHelperClient.convert((AclProtos.GetAclStatusResponseProto)proto);
        }
        this.srcFs = (DistributedFileSystem)this.context.getSrc().getFileSystem(this.conf);
        this.dstFs = (DistributedFileSystem)this.context.getDst().getFileSystem(this.conf);
        this.mapNum = this.context.getMapNum();
        this.bandWidth = this.context.getBandwidthLimit();
        this.forceCloseOpenFiles = this.context.getForceCloseOpenFiles();
        this.useMountReadOnly = this.context.getUseMountReadOnly();
        this.client = new JobClient(this.conf);
    }

    private static void enableSnapshot(DistributedFileSystem dfs, Path path) throws IOException {
        if (!dfs.exists(new Path(path, ".snapshot"))) {
            dfs.allowSnapshot(path);
        }
    }

    static void deleteSnapshot(DistributedFileSystem dfs, Path path, String snapshotName) throws IOException {
        Path snapshot = new Path(path, ".snapshot/" + snapshotName);
        if (dfs.exists(snapshot)) {
            dfs.deleteSnapshot(path, snapshotName);
        }
    }

    static void cleanupSnapshot(DistributedFileSystem dfs, Path path) throws IOException {
        if (dfs.exists(new Path(path, ".snapshot"))) {
            FileStatus[] status;
            for (FileStatus s : status = dfs.listStatus(new Path(path, ".snapshot"))) {
                DistCpProcedure.deleteSnapshot(dfs, path, s.getPath().getName());
            }
            dfs.disallowSnapshot(path);
        }
    }

    public static enum Stage {
        PRE_CHECK,
        INIT_DISTCP,
        DIFF_DISTCP,
        DISABLE_WRITE,
        FINAL_DISTCP,
        FINISH;

    }

    static interface RunningJobStatus {
        public String getJobID();

        public boolean isComplete() throws IOException;

        public boolean isSuccessful() throws IOException;

        public String getFailureInfo() throws IOException;
    }

    private static class LocalJobStatus
    implements RunningJobStatus {
        private final Job testJob;

        LocalJobStatus(Job testJob) {
            this.testJob = testJob;
        }

        @Override
        public String getJobID() {
            return this.testJob.getJobID().toString();
        }

        @Override
        public boolean isComplete() throws IOException {
            return this.testJob.isComplete();
        }

        @Override
        public boolean isSuccessful() throws IOException {
            return this.testJob.isSuccessful();
        }

        @Override
        public String getFailureInfo() throws IOException {
            try {
                return this.testJob.getStatus().getFailureInfo();
            }
            catch (InterruptedException e) {
                throw new IOException(e);
            }
        }
    }

    private static class YarnRunningJobStatus
    implements RunningJobStatus {
        private final RunningJob job;

        YarnRunningJobStatus(RunningJob job) {
            this.job = job;
        }

        @Override
        public String getJobID() {
            return this.job.getID().toString();
        }

        @Override
        public boolean isComplete() throws IOException {
            return this.job.isComplete();
        }

        @Override
        public boolean isSuccessful() throws IOException {
            return this.job.isSuccessful();
        }

        @Override
        public String getFailureInfo() throws IOException {
            return this.job.getFailureInfo();
        }
    }
}

