/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.fs.tosfs.commit;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.tosfs.commit.CommitContext;
import org.apache.hadoop.fs.tosfs.commit.CommitUtils;
import org.apache.hadoop.fs.tosfs.commit.Pending;
import org.apache.hadoop.fs.tosfs.commit.PendingSet;
import org.apache.hadoop.fs.tosfs.commit.SuccessData;
import org.apache.hadoop.fs.tosfs.commit.ops.PendingOps;
import org.apache.hadoop.fs.tosfs.commit.ops.PendingOpsFactory;
import org.apache.hadoop.fs.tosfs.common.Tasks;
import org.apache.hadoop.fs.tosfs.common.ThreadPools;
import org.apache.hadoop.fs.tosfs.object.MultipartUpload;
import org.apache.hadoop.fs.tosfs.object.ObjectStorage;
import org.apache.hadoop.fs.tosfs.object.ObjectStorageFactory;
import org.apache.hadoop.fs.tosfs.object.ObjectUtils;
import org.apache.hadoop.fs.tosfs.util.CommonUtils;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.thirdparty.com.google.common.base.MoreObjects;
import org.apache.hadoop.util.Lists;
import org.apache.hadoop.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Committer
extends PathOutputCommitter {
    private static final Logger LOG = LoggerFactory.getLogger(Committer.class);
    public static final String COMMITTER_THREADS = "fs.job.committer.threads";
    public static final String COMMITTER_SUMMARY_REPORT_DIR = "fs.job.committer.summary.report.directory";
    public static final int DEFAULT_COMMITTER_THREADS = Runtime.getRuntime().availableProcessors();
    public static final String THREADS_PREFIX = "job-committer-thread-pool";
    private final String jobId;
    private final Path outputPath;
    private Path workPath;
    private final String role;
    private final Configuration conf;
    private final FileSystem destFs;
    private final ObjectStorage storage;
    private final PendingOps ops;

    public Committer(Path outputPath, TaskAttemptContext context) throws IOException {
        this(outputPath, (JobContext)context, String.format("Task committer %s", context.getTaskAttemptID()));
        this.workPath = CommitUtils.magicTaskAttemptBasePath(context, outputPath);
        LOG.info("Task attempt {} has work path {}", (Object)context.getTaskAttemptID(), (Object)this.getWorkPath());
    }

    public Committer(Path outputPath, JobContext context) throws IOException {
        this(outputPath, context, String.format("Job committer %s", context.getJobID()));
    }

    private Committer(Path outputPath, JobContext context, String role) throws IOException {
        super(outputPath, context);
        this.jobId = CommitUtils.buildJobId(context);
        this.outputPath = outputPath;
        this.role = role;
        this.conf = context.getConfiguration();
        this.destFs = outputPath.getFileSystem(this.conf);
        LOG.info("{} instantiated for job '{}' ID {} with destination {}", new Object[]{role, CommitUtils.jobName(context), this.jobId, outputPath});
        this.storage = ObjectStorageFactory.create(outputPath.toUri().getScheme(), outputPath.toUri().getAuthority(), this.conf);
        this.ops = PendingOpsFactory.create(this.destFs, this.storage);
    }

    public Path getOutputPath() {
        return this.outputPath;
    }

    public Path getWorkPath() {
        return this.workPath;
    }

    public void setupJob(JobContext context) throws IOException {
        this.checkJobId(context);
        LOG.info("Setup Job {}", (Object)this.jobId);
        Path jobOutput = this.getOutputPath();
        this.destFs.delete(CommitUtils.successMarker(jobOutput), false);
        this.destFs.mkdirs(jobOutput);
        this.logUncompletedMPUIfPresent(jobOutput);
        Path jobPath = CommitUtils.magicJobPath(this.jobId, this.outputPath);
        Path jobAttemptPath = CommitUtils.magicJobAttemptPath(context, this.outputPath);
        this.destFs.delete(jobPath, true);
        this.destFs.mkdirs(jobAttemptPath);
    }

    private void logUncompletedMPUIfPresent(Path jobOutput) {
        int nums = 0;
        for (MultipartUpload upload : this.storage.listUploads(ObjectUtils.pathToKey(jobOutput, true))) {
            if (nums++ > 10) {
                LOG.warn("There are more than 10 uncompleted multipart uploads under path {}.", (Object)jobOutput);
                break;
            }
            LOG.warn("Uncompleted multipart upload {} is under path {}, either jobs are running concurrently or failed jobs are not being cleaned up.", (Object)upload, (Object)jobOutput);
        }
    }

    public void commitJob(JobContext context) throws IOException {
        this.checkJobId(context);
        LOG.info("{}: committing job {}", (Object)this.role, (Object)this.jobId);
        String stage = null;
        Exception failure = null;
        SuccessData successData = null;
        ExecutorService threadPool = ThreadPools.newWorkerPool(THREADS_PREFIX, this.commitThreads());
        ArrayList pendingSets = Lists.newArrayList();
        try {
            stage = "preparing";
            CommitUtils.listFiles(this.destFs, CommitUtils.magicJobAttemptPath(context, this.outputPath), true, f -> {
                if (f.getPath().toString().endsWith(".pendingset")) {
                    pendingSets.add(f);
                }
            });
            stage = "commit";
            CommitContext commitCtxt = new CommitContext(pendingSets);
            this.loadAndCommitPendingSets(threadPool, commitCtxt);
            stage = "marker";
            successData = this.createSuccessData(commitCtxt.destKeys());
            CommitUtils.triggerError(() -> new IOException("Mock error of success marker."), stage);
            CommitUtils.save(this.destFs, CommitUtils.successMarker(this.outputPath), successData);
            stage = "clean";
            this.cleanup(threadPool, true);
            this.saveSummaryReportQuietly(stage, context, successData, failure);
        }
        catch (Exception e) {
            try {
                failure = e;
                LOG.warn("Commit failure for job {} stage {}", new Object[]{CommitUtils.buildJobId(context), stage, e});
                if (stage.equals("marker")) {
                    CommonUtils.runQuietly(() -> this.loadAndRevertPendingSets(threadPool, new CommitContext(pendingSets)));
                }
                CommonUtils.runQuietly(() -> this.cleanup(threadPool, true));
                throw e;
            }
            catch (Throwable throwable) {
                this.saveSummaryReportQuietly(stage, context, successData, failure);
                CommonUtils.runQuietly(threadPool::shutdown);
                this.cleanupResources();
                throw throwable;
            }
        }
        CommonUtils.runQuietly(threadPool::shutdown);
        this.cleanupResources();
    }

    private SuccessData createSuccessData(Iterable<String> filenames) {
        SuccessData data = SuccessData.builder().setName(SuccessData.class.getName()).setCommitter(CommitUtils.COMMITTER_NAME).setTimestamp(System.currentTimeMillis()).setHostname(NetUtils.getHostname()).setDescription(this.role).setJobId(this.jobId).addFileNames(filenames).build();
        data.addDiagnosticInfo(COMMITTER_THREADS, Integer.toString(this.commitThreads()));
        return data;
    }

    private void saveSummaryReportQuietly(String activeStage, JobContext context, SuccessData report, Throwable thrown) {
        Configuration jobConf = context.getConfiguration();
        String reportDir = jobConf.get(COMMITTER_SUMMARY_REPORT_DIR, "");
        if (reportDir.isEmpty()) {
            LOG.debug("Summary directory conf: {} is not set", (Object)COMMITTER_SUMMARY_REPORT_DIR);
            return;
        }
        Path path = CommitUtils.summaryReport(new Path(reportDir), this.jobId);
        LOG.debug("Summary report path is {}", (Object)path);
        try {
            if (report == null) {
                report = this.createSuccessData(null);
            }
            if (thrown != null) {
                report.recordJobFailure(thrown);
            }
            report.addDiagnosticInfo("stage", activeStage);
            CommitUtils.save(path.getFileSystem(jobConf), path, report);
            LOG.info("Job summary saved to {}", (Object)path);
        }
        catch (Exception e) {
            LOG.warn("Failed to save summary to {}", (Object)path, (Object)e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void loadAndCommitPendingSets(ExecutorService outerPool, CommitContext commitContext) {
        ExecutorService innerPool = ThreadPools.newWorkerPool("commit-pending-files-pool", this.commitThreads());
        try {
            Tasks.foreach(commitContext.pendingSets()).stopOnFailure().throwFailureWhenFinished().executeWith(outerPool).abortWith(pendingSet -> this.loadAndAbort(innerPool, (FileStatus)pendingSet)).revertWith(pendingSet -> this.loadAndRevert(innerPool, (FileStatus)pendingSet)).run(pendingSet -> this.loadAndCommit(commitContext, innerPool, (FileStatus)pendingSet));
        }
        finally {
            CommonUtils.runQuietly(innerPool::shutdown);
        }
    }

    private void loadAndRevertPendingSets(ExecutorService outerPool, CommitContext commitContext) {
        Tasks.foreach(commitContext.pendingSets()).throwFailureWhenFinished().executeWith(outerPool).run(pendingSet -> this.loadAndRevert(outerPool, (FileStatus)pendingSet));
    }

    private void loadAndAbort(ExecutorService pool, FileStatus pendingSetFile) {
        PendingSet pendingSet = PendingSet.deserialize(this.destFs, pendingSetFile);
        Tasks.foreach(pendingSet.commits()).suppressFailureWhenFinished().executeWith(pool).run(this.ops::abort);
    }

    private void loadAndRevert(ExecutorService pool, FileStatus pendingSetFile) {
        PendingSet pendingSet = PendingSet.deserialize(this.destFs, pendingSetFile);
        Tasks.foreach(pendingSet.commits()).suppressFailureWhenFinished().executeWith(pool).run(this.ops::revert);
    }

    private void loadAndCommit(CommitContext commitCtxt, ExecutorService pool, FileStatus pendingSetFile) {
        PendingSet pendingSet = PendingSet.deserialize(this.destFs, pendingSetFile);
        String jobID = pendingSet.jobId();
        if (!StringUtils.isNoneEmpty((CharSequence[])new CharSequence[]{jobID}) && !Objects.equals(jobID, this.jobId())) {
            throw new IllegalStateException(String.format("Mismatch in Job ID (%s) and commit job ID (%s)", this.jobId(), jobID));
        }
        Tasks.foreach(pendingSet.commits()).stopOnFailure().throwFailureWhenFinished().executeWith(pool).onFailure((pending, exception) -> this.ops.abort((Pending)pending)).abortWith(this.ops::abort).revertWith(this.ops::revert).run(pending -> {
            this.ops.commit((Pending)pending);
            commitCtxt.addDestKey(pending.destKey());
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void abortJob(JobContext context, JobStatus.State state) {
        this.checkJobId(context);
        LOG.info("{}: aborting job {} in state {}", new Object[]{this.role, this.jobId, state});
        ExecutorService service = ThreadPools.newWorkerPool(THREADS_PREFIX, this.commitThreads());
        try {
            this.cleanup(service, false);
        }
        finally {
            service.shutdown();
            this.cleanupResources();
        }
    }

    public void setupTask(TaskAttemptContext context) throws IOException {
        this.checkJobId((JobContext)context);
        LOG.info("Setup Task {}", (Object)context.getTaskAttemptID());
        Path taskAttemptBasePath = CommitUtils.magicTaskAttemptBasePath(context, this.outputPath);
        this.destFs.delete(taskAttemptBasePath, true);
        this.destFs.mkdirs(taskAttemptBasePath);
    }

    public boolean needsTaskCommit(TaskAttemptContext taskContext) {
        return true;
    }

    public void commitTask(TaskAttemptContext context) throws IOException {
        this.checkJobId((JobContext)context);
        LOG.info("Commit task {}", (Object)context);
        ExecutorService pool = ThreadPools.newWorkerPool(THREADS_PREFIX, this.commitThreads());
        try {
            PendingSet commits = this.innerCommitTask(pool, context);
            LOG.info("Task {} committed {} files", (Object)context.getTaskAttemptID(), (Object)commits.size());
        }
        catch (IOException e) {
            LOG.error("Failed to commit task {}", (Object)context.getTaskAttemptID(), (Object)e);
            throw e;
        }
        finally {
            CommonUtils.runQuietly(pool::shutdown);
            Path taskAttemptPath = CommitUtils.magicTaskAttemptPath(context, this.outputPath);
            LOG.info("Delete task attempt path {}", (Object)taskAttemptPath);
            CommonUtils.runQuietly(() -> this.destFs.delete(taskAttemptPath, true));
        }
    }

    private PendingSet innerCommitTask(ExecutorService pool, TaskAttemptContext context) throws IOException {
        Path taskAttemptBasePath = CommitUtils.magicTaskAttemptBasePath(context, this.outputPath);
        PendingSet pendingSet = new PendingSet(this.jobId);
        try {
            List<FileStatus> pendingFiles = CommitUtils.listPendingFiles(this.destFs, taskAttemptBasePath);
            List<Pending> pendings = Collections.synchronizedList(Lists.newArrayList());
            Tasks.foreach(pendingFiles).throwFailureWhenFinished().executeWith(pool).run(f -> {
                try {
                    byte[] data = CommitUtils.load(this.destFs, f.getPath());
                    pendings.add(Pending.deserialize(data));
                }
                catch (IOException e) {
                    LOG.warn("Failed to load .pending file {}", (Object)f.getPath(), (Object)e);
                    throw new UncheckedIOException(e);
                }
            });
            pendingSet.addAll(pendings);
            String taskId = String.valueOf(context.getTaskAttemptID());
            pendingSet.addExtraData("task.attempt.id", taskId);
            Path taskOutput = CommitUtils.magicTaskPendingSetPath(context, this.outputPath);
            LOG.info("Saving work of {} to {}", (Object)taskId, (Object)taskOutput);
            CommitUtils.save(this.destFs, taskOutput, pendingSet.serialize());
        }
        catch (Exception e) {
            LOG.error("Encounter error when loading pending set from {}", (Object)taskAttemptBasePath, (Object)e);
            if (!pendingSet.commits().isEmpty()) {
                Tasks.foreach(pendingSet.commits()).executeWith(pool).suppressFailureWhenFinished().run(this.ops::abort);
            }
            throw e;
        }
        return pendingSet;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void abortTask(TaskAttemptContext context) throws IOException {
        this.checkJobId((JobContext)context);
        Path taskAttemptBasePath = CommitUtils.magicTaskAttemptBasePath(context, this.outputPath);
        try {
            List<FileStatus> pendingFiles = CommitUtils.listPendingFiles(this.destFs, taskAttemptBasePath);
            Tasks.foreach(pendingFiles).throwFailureWhenFinished().run(f -> {
                try {
                    byte[] serializedData = CommitUtils.load(this.destFs, f.getPath());
                    this.ops.abort(Pending.deserialize(serializedData));
                }
                catch (FileNotFoundException e) {
                    LOG.debug("Listed file already deleted: {}", f);
                }
                catch (IOException e) {
                    throw new UncheckedIOException(e);
                }
                finally {
                    FileStatus pendingFile = f;
                    CommonUtils.runQuietly(() -> this.destFs.delete(pendingFile.getPath(), false));
                }
            });
        }
        finally {
            CommonUtils.runQuietly(() -> this.destFs.delete(taskAttemptBasePath, true));
        }
    }

    public void recoverTask(TaskAttemptContext context) {
        this.checkJobId((JobContext)context);
        String taskId = context.getTaskAttemptID().toString();
        throw new UnsupportedOperationException(String.format("Unable to recover task %s, output: %s", taskId, this.outputPath));
    }

    private int commitThreads() {
        return this.conf.getInt(COMMITTER_THREADS, DEFAULT_COMMITTER_THREADS);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void cleanup(ExecutorService pool, boolean suppress) {
        block6: {
            LOG.info("Cleanup the job by abort the multipart uploads and clean staging dir, suppress {}", (Object)suppress);
            try {
                Path jobOutput = this.getOutputPath();
                Iterable<MultipartUpload> pending = this.storage.listUploads(ObjectUtils.pathToKey(CommitUtils.magicJobPath(this.jobId, jobOutput), true));
                Tasks.foreach(pending).executeWith(pool).suppressFailureWhenFinished().run(u -> this.storage.abortMultipartUpload(u.key(), u.uploadId()));
            }
            catch (Exception e) {
                if (suppress) {
                    LOG.error("The following exception has been suppressed when cleanup job", (Throwable)e);
                    break block6;
                }
                throw e;
            }
            finally {
                CommonUtils.runQuietly(this::cleanupStagingDir);
            }
        }
    }

    private void cleanupStagingDir() throws IOException {
        this.destFs.delete(CommitUtils.magicJobPath(this.jobId, this.outputPath), true);
        Path magicPath = CommitUtils.magicPath(this.outputPath);
        if (this.destFs.listStatus(magicPath).length == 0) {
            this.destFs.delete(magicPath, true);
        }
    }

    public String jobId() {
        return this.jobId;
    }

    private void checkJobId(JobContext context) {
        String jobIdInContext = CommitUtils.buildJobId(context);
        Preconditions.checkArgument((boolean)Objects.equals(this.jobId, jobIdInContext), (Object)String.format("JobId set in the context: %s is not consistent with the initial jobId of the committer: %s, please check you settings in your taskAttemptContext.", jobIdInContext, this.jobId));
    }

    private void cleanupResources() {
        CommonUtils.runQuietly(this.storage::close);
    }

    public String toString() {
        return MoreObjects.toStringHelper((Object)((Object)this)).add("role", (Object)this.role).add("jobId", (Object)this.jobId).add("outputPath", (Object)this.outputPath).toString();
    }
}

