package org.apache.hadoop.mapreduce.lib.output.committer.manifest;

import java.io.FileNotFoundException;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.statistics.IOStatisticsLogging;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.DiagnosticKeys;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.ManifestSuccessData;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.AuditingIntegration;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestStoreOperations;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestStoreOperationsThroughFileSystem;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.AbortTaskStage;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.CleanupJobStage;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.CommitJobStage;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.CommitTaskStage;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.SetupJobStage;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.SetupTaskStage;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.StageConfig;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.StageEventCallbacks;
import org.apache.hadoop.util.functional.CloseableTaskPoolSubmitter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceStability.Stable
@InterfaceAudience.Public
/* loaded from: input_file:org/apache/hadoop/mapreduce/lib/output/committer/manifest/ManifestCommitter.class */
public class ManifestCommitter extends PathOutputCommitter implements IOStatisticsSource, StageEventCallbacks, StreamCapabilities {
    public static final Logger LOG = LoggerFactory.getLogger(ManifestCommitter.class);
    public static final String TASK_COMMITTER = "task committer";
    public static final String JOB_COMMITTER = "job committer";
    private final ManifestCommitterConfig baseConfig;
    private final Path destinationDir;
    private final Path taskAttemptDir;
    private final IOStatisticsStore iostatistics;
    private ManifestSuccessData successReport;
    private String activeStage;
    private TaskManifest taskAttemptCommittedManifest;

    public ManifestCommitter(Path path, TaskAttemptContext taskAttemptContext) throws IOException {
        super(path, taskAttemptContext);
        this.destinationDir = resolveDestinationDirectory(path, taskAttemptContext.getConfiguration());
        this.iostatistics = ManifestCommitterSupport.createIOStatisticsStore().build();
        this.baseConfig = enterCommitter(taskAttemptContext.getTaskAttemptID() != null, taskAttemptContext);
        this.taskAttemptDir = this.baseConfig.getTaskAttemptDir();
        LOG.info("Created ManifestCommitter with JobID {}, Task Attempt {} and destination {}", new Object[]{taskAttemptContext.getJobID(), taskAttemptContext.getTaskAttemptID(), path});
    }

    private ManifestCommitterConfig enterCommitter(boolean z, JobContext jobContext) {
        ManifestCommitterConfig manifestCommitterConfig = new ManifestCommitterConfig(getOutputPath(), z ? TASK_COMMITTER : JOB_COMMITTER, jobContext, this.iostatistics, this);
        AuditingIntegration.updateCommonContextOnCommitterEntry(manifestCommitterConfig);
        return manifestCommitterConfig;
    }

    @Override // org.apache.hadoop.mapreduce.OutputCommitter
    public void setupJob(JobContext jobContext) throws IOException {
        ManifestCommitterConfig enterCommitter = enterCommitter(false, jobContext);
        new SetupJobStage(enterCommitter.createStageConfig().withOperations(createManifestStoreOperations()).build()).apply(Boolean.valueOf(enterCommitter.getCreateJobMarker()));
        logCommitterStatisticsAtDebug();
    }

    @Override // org.apache.hadoop.mapreduce.OutputCommitter
    public void setupTask(TaskAttemptContext taskAttemptContext) throws IOException {
        new SetupTaskStage(enterCommitter(true, taskAttemptContext).createStageConfig().withOperations(createManifestStoreOperations()).build()).apply("");
        logCommitterStatisticsAtDebug();
    }

    @Override // org.apache.hadoop.mapreduce.OutputCommitter
    public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) throws IOException {
        LOG.info("Probe for needsTaskCommit({})", taskAttemptContext.getTaskAttemptID());
        return true;
    }

    @Override // org.apache.hadoop.mapreduce.OutputCommitter
    public boolean isCommitJobRepeatable(JobContext jobContext) throws IOException {
        LOG.info("Probe for isCommitJobRepeatable({}): returning false", jobContext.getJobID());
        return false;
    }

    @Override // org.apache.hadoop.mapreduce.OutputCommitter
    public boolean isRecoverySupported(JobContext jobContext) throws IOException {
        LOG.info("Probe for isRecoverySupported({}): returning false", jobContext.getJobID());
        return false;
    }

    @Override // org.apache.hadoop.mapreduce.OutputCommitter
    public void recoverTask(TaskAttemptContext taskAttemptContext) throws IOException {
        LOG.warn("Rejecting recoverTask({}) call", taskAttemptContext.getTaskAttemptID());
        throw new IOException("Cannot recover task " + taskAttemptContext.getTaskAttemptID());
    }

    @Override // org.apache.hadoop.mapreduce.OutputCommitter
    public void commitTask(TaskAttemptContext taskAttemptContext) throws IOException {
        try {
            try {
                this.taskAttemptCommittedManifest = new CommitTaskStage(enterCommitter(true, taskAttemptContext).createStageConfig().withOperations(createManifestStoreOperations()).build()).apply(null).getTaskManifest();
                this.iostatistics.incrementCounter(ManifestCommitterStatisticNames.COMMITTER_TASKS_COMPLETED_COUNT, 1L);
                logCommitterStatisticsAtDebug();
                AuditingIntegration.updateCommonContextOnCommitterExit();
            } catch (IOException e) {
                this.iostatistics.incrementCounter(ManifestCommitterStatisticNames.COMMITTER_TASKS_FAILED_COUNT, 1L);
                throw e;
            }
        } catch (Throwable th) {
            logCommitterStatisticsAtDebug();
            AuditingIntegration.updateCommonContextOnCommitterExit();
            throw th;
        }
    }

    @Override // org.apache.hadoop.mapreduce.OutputCommitter
    public void abortTask(TaskAttemptContext taskAttemptContext) throws IOException {
        try {
            new AbortTaskStage(enterCommitter(true, taskAttemptContext).createStageConfig().withOperations(createManifestStoreOperations()).build()).apply(false);
        } finally {
            logCommitterStatisticsAtDebug();
            AuditingIntegration.updateCommonContextOnCommitterExit();
        }
    }

    private ManifestSuccessData getOrCreateSuccessData(ManifestCommitterConfig manifestCommitterConfig) {
        if (this.successReport == null) {
            this.successReport = ManifestCommitterSupport.createManifestOutcome(manifestCommitterConfig.createStageConfig(), this.activeStage);
        }
        return this.successReport;
    }

    @Override // org.apache.hadoop.mapreduce.OutputCommitter
    public void commitJob(JobContext jobContext) throws IOException {
        ManifestCommitterConfig enterCommitter = enterCommitter(false, jobContext);
        ManifestSuccessData orCreateSuccessData = getOrCreateSuccessData(enterCommitter);
        try {
            try {
                CloseableTaskPoolSubmitter createSubmitter = enterCommitter.createSubmitter();
                try {
                    ManifestStoreOperations createManifestStoreOperations = createManifestStoreOperations();
                    try {
                        StageConfig build = enterCommitter.createStageConfig().withOperations(createManifestStoreOperations).withIOProcessors(createSubmitter).build();
                        Configuration configuration = jobContext.getConfiguration();
                        ManifestSuccessData jobSuccessData = new CommitJobStage(build).apply(new CommitJobStage.Arguments(enterCommitter.getCreateJobMarker(), enterCommitter.getValidateOutput(), configuration.getTrimmed(ManifestCommitterConstants.OPT_DIAGNOSTICS_MANIFEST_DIR, ""), CleanupJobStage.cleanupStageOptionsFromConfig(ManifestCommitterStatisticNames.OP_STAGE_JOB_CLEANUP, configuration))).getJobSuccessData();
                        setSuccessReport(jobSuccessData);
                        jobSuccessData.putDiagnostic(ManifestCommitterConstants.OPT_IO_PROCESSORS, configuration.get(ManifestCommitterConstants.OPT_IO_PROCESSORS, Long.toString(32L)));
                        if (createManifestStoreOperations != null) {
                            createManifestStoreOperations.close();
                        }
                        if (createSubmitter != null) {
                            createSubmitter.close();
                        }
                        maybeSaveSummary(this.activeStage, enterCommitter, jobSuccessData, null, true, true);
                        LOG.info("{}: Job Commit statistics {}", enterCommitter.getName(), IOStatisticsLogging.ioStatisticsToPrettyString(this.iostatistics));
                        Long l = this.iostatistics.counters().get(ManifestCommitterStatisticNames.OP_COMMIT_FILE_RENAME_RECOVERED);
                        if (l != null && l.longValue() > 0) {
                            LOG.warn("{}: rename failures were recovered from. Number of recoveries: {}", enterCommitter.getName(), l);
                        }
                        AuditingIntegration.updateCommonContextOnCommitterExit();
                    } catch (Throwable th) {
                        if (createManifestStoreOperations != null) {
                            try {
                                createManifestStoreOperations.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        }
                        throw th;
                    }
                } catch (Throwable th3) {
                    if (createSubmitter != null) {
                        try {
                            createSubmitter.close();
                        } catch (Throwable th4) {
                            th3.addSuppressed(th4);
                        }
                    }
                    throw th3;
                }
            } catch (IOException e) {
                throw e;
            }
        } catch (Throwable th5) {
            maybeSaveSummary(this.activeStage, enterCommitter, orCreateSuccessData, null, true, true);
            LOG.info("{}: Job Commit statistics {}", enterCommitter.getName(), IOStatisticsLogging.ioStatisticsToPrettyString(this.iostatistics));
            Long l2 = this.iostatistics.counters().get(ManifestCommitterStatisticNames.OP_COMMIT_FILE_RENAME_RECOVERED);
            if (l2 != null && l2.longValue() > 0) {
                LOG.warn("{}: rename failures were recovered from. Number of recoveries: {}", enterCommitter.getName(), l2);
            }
            AuditingIntegration.updateCommonContextOnCommitterExit();
            throw th5;
        }
    }

    @Override // org.apache.hadoop.mapreduce.OutputCommitter
    public void abortJob(JobContext jobContext, JobStatus.State state) throws IOException {
        LOG.info("Aborting Job {} in state {}", jobContext.getJobID(), state);
        ManifestCommitterConfig enterCommitter = enterCommitter(false, jobContext);
        ManifestSuccessData orCreateSuccessData = getOrCreateSuccessData(enterCommitter);
        IOException iOException = null;
        try {
            executeCleanup(ManifestCommitterStatisticNames.OP_STAGE_JOB_ABORT, jobContext, enterCommitter);
        } catch (IOException e) {
            iOException = e;
        }
        orCreateSuccessData.setSuccess(false);
        maybeSaveSummary(this.activeStage, enterCommitter, orCreateSuccessData, iOException, true, false);
        LOG.info("Job Abort statistics {}", IOStatisticsLogging.ioStatisticsToPrettyString(this.iostatistics));
        AuditingIntegration.updateCommonContextOnCommitterExit();
    }

    @Override // org.apache.hadoop.mapreduce.OutputCommitter
    public void cleanupJob(JobContext jobContext) throws IOException {
        try {
            executeCleanup(ManifestCommitterStatisticNames.OP_STAGE_JOB_CLEANUP, jobContext, enterCommitter(false, jobContext));
        } finally {
            logCommitterStatisticsAtDebug();
            AuditingIntegration.updateCommonContextOnCommitterExit();
        }
    }

    private CleanupJobStage.Result executeCleanup(String str, JobContext jobContext, ManifestCommitterConfig manifestCommitterConfig) throws IOException {
        CloseableTaskPoolSubmitter createSubmitter = manifestCommitterConfig.createSubmitter();
        try {
            CleanupJobStage.Result apply = new CleanupJobStage(manifestCommitterConfig.createStageConfig().withOperations(createManifestStoreOperations()).withIOProcessors(createSubmitter).build()).apply(CleanupJobStage.cleanupStageOptionsFromConfig(str, jobContext.getConfiguration()));
            if (createSubmitter != null) {
                createSubmitter.close();
            }
            return apply;
        } catch (Throwable th) {
            if (createSubmitter != null) {
                try {
                    createSubmitter.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Override // org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter
    public Path getOutputPath() {
        return getDestinationDir();
    }

    @Override // org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter
    public Path getWorkPath() {
        return getTaskAttemptDir();
    }

    private Path getDestinationDir() {
        return this.destinationDir;
    }

    private Path getTaskAttemptDir() {
        return this.taskAttemptDir;
    }

    @Override // org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.StageEventCallbacks
    public void enterStage(String str) {
        this.activeStage = str;
        AuditingIntegration.enterStage(str);
    }

    @Override // org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.StageEventCallbacks
    public void exitStage(String str) {
        AuditingIntegration.exitStage();
    }

    public String getJobUniqueId() {
        return this.baseConfig.getJobUniqueId();
    }

    public Configuration getConf() {
        return this.baseConfig.getConf();
    }

    public ManifestSuccessData getSuccessReport() {
        return this.successReport;
    }

    private void setSuccessReport(ManifestSuccessData manifestSuccessData) {
        this.successReport = manifestSuccessData;
    }

    @VisibleForTesting
    TaskManifest getTaskAttemptCommittedManifest() {
        return this.taskAttemptCommittedManifest;
    }

    @VisibleForTesting
    public Path getTaskAttemptPath(TaskAttemptContext taskAttemptContext) {
        return enterCommitter(false, taskAttemptContext).getTaskAttemptDir();
    }

    @VisibleForTesting
    public Path getTaskManifestPath(TaskAttemptContext taskAttemptContext) {
        return ManifestCommitterSupport.manifestPathForTask(enterCommitter(false, taskAttemptContext).getTaskManifestDir(), taskAttemptContext.getTaskAttemptID().getTaskID().toString());
    }

    @VisibleForTesting
    public Path getJobAttemptPath(JobContext jobContext) {
        return enterCommitter(false, jobContext).getJobAttemptDir();
    }

    private Path resolveDestinationDirectory(Path path, Configuration configuration) throws IOException {
        return FileSystem.get(path.toUri(), configuration).makeQualified(path);
    }

    protected ManifestStoreOperations createManifestStoreOperations() throws IOException {
        return ManifestCommitterSupport.createManifestStoreOperations(this.baseConfig.getConf(), this.baseConfig.getDestinationFileSystem(), this.baseConfig.getDestinationDir());
    }

    private void logCommitterStatisticsAtDebug() {
        IOStatisticsLogging.logIOStatisticsAtDebug(LOG, "Committer Statistics", this);
    }

    @Override // org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter
    public String toString() {
        StringBuilder sb = new StringBuilder("ManifestCommitter{");
        sb.append(this.baseConfig);
        sb.append(", iostatistics=").append(IOStatisticsLogging.ioStatisticsToPrettyString(this.iostatistics));
        sb.append('}');
        return sb.toString();
    }

    private static Path maybeSaveSummary(String str, ManifestCommitterConfig manifestCommitterConfig, ManifestSuccessData manifestSuccessData, Throwable th, boolean z, boolean z2) throws IOException {
        Configuration conf = manifestCommitterConfig.getConf();
        String trimmed = conf.getTrimmed(ManifestCommitterConstants.OPT_SUMMARY_REPORT_DIR, "");
        if (trimmed.isEmpty()) {
            LOG.debug("No summary directory set in mapreduce.manifest.committer.summary.report.directory");
            return null;
        }
        LOG.debug("Summary directory set in to {}mapreduce.manifest.committer.summary.report.directory", trimmed);
        manifestSuccessData.snapshotIOStatistics(manifestCommitterConfig.getIOStatistics());
        Path path = new Path(new Path(trimmed), ManifestCommitterSupport.createJobSummaryFilename(manifestCommitterConfig.getJobUniqueId()));
        if (th != null) {
            manifestSuccessData.recordJobFailure(th);
        }
        manifestSuccessData.putDiagnostic(DiagnosticKeys.STAGE, str);
        try {
            ManifestStoreOperationsThroughFileSystem manifestStoreOperationsThroughFileSystem = new ManifestStoreOperationsThroughFileSystem(path.getFileSystem(conf));
            if (!z2) {
                try {
                    try {
                        LOG.debug("Report already exists: {}", manifestStoreOperationsThroughFileSystem.getFileStatus(path));
                        manifestStoreOperationsThroughFileSystem.close();
                        return null;
                    } catch (FileNotFoundException e) {
                    }
                } finally {
                }
            }
            manifestStoreOperationsThroughFileSystem.save(manifestSuccessData, path, z2);
            LOG.info("Job summary saved to {}", path);
            manifestStoreOperationsThroughFileSystem.close();
            return path;
        } catch (IOException e2) {
            LOG.debug("Failed to save summary to {}", path, e2);
            if (z) {
                return null;
            }
            throw e2;
        }
    }

    @Override // org.apache.hadoop.fs.statistics.IOStatisticsSource
    public IOStatisticsStore getIOStatistics() {
        return this.iostatistics;
    }

    @Override // org.apache.hadoop.fs.StreamCapabilities
    public boolean hasCapability(String str) {
        return ManifestCommitterConstants.CAPABILITY_DYNAMIC_PARTITIONING.equals(str);
    }
}
