/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.mr.hive;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobContext;
import org.apache.hadoop.mapred.JobContextImpl;
import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.OutputCommitter;
import org.apache.hadoop.mapred.TaskAttemptContext;
import org.apache.hadoop.mapred.TaskAttemptContextImpl;
import org.apache.hadoop.mapred.TaskAttemptID;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.mr.TestHelper;
import org.apache.iceberg.mr.hive.HiveIcebergOutputCommitter;
import org.apache.iceberg.mr.hive.HiveIcebergStorageHandler;
import org.apache.iceberg.mr.hive.HiveIcebergTestUtils;
import org.apache.iceberg.mr.hive.TezUtil;
import org.apache.iceberg.mr.hive.writer.HiveIcebergWriter;
import org.apache.iceberg.mr.hive.writer.WriterBuilder;
import org.apache.iceberg.mr.hive.writer.WriterRegistry;
import org.apache.iceberg.mr.mapred.Container;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.SerializationUtil;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;

public class TestHiveIcebergOutputCommitter {
    private static final int RECORD_NUM = 5;
    private static final String QUERY_ID = "query_id";
    private static final JobID JOB_ID = new JobID("test", 0);
    private static final TaskAttemptID MAP_TASK_ID = new TaskAttemptID(JOB_ID.getJtIdentifier(), JOB_ID.getId(), TaskType.MAP, 0, 0);
    private static final TaskAttemptID REDUCE_TASK_ID = new TaskAttemptID(JOB_ID.getJtIdentifier(), JOB_ID.getId(), TaskType.REDUCE, 0, 0);
    private static final Schema CUSTOMER_SCHEMA = new Schema(new Types.NestedField[]{Types.NestedField.required((int)1, (String)"customer_id", (Type)Types.LongType.get()), Types.NestedField.required((int)2, (String)"first_name", (Type)Types.StringType.get())});
    private static final PartitionSpec PARTITIONED_SPEC = PartitionSpec.builderFor((Schema)CUSTOMER_SCHEMA).identity("customer_id").build();
    @Rule
    public TemporaryFolder temp = new TemporaryFolder();

    @Test
    public void testNeedsTaskCommit() {
        HiveIcebergOutputCommitter committer = new HiveIcebergOutputCommitter();
        JobConf mapOnlyJobConf = new JobConf();
        mapOnlyJobConf.setNumMapTasks(10);
        mapOnlyJobConf.setNumReduceTasks(0);
        Assert.assertTrue((boolean)committer.needsTaskCommit((TaskAttemptContext)new TaskAttemptContextImpl(mapOnlyJobConf, MAP_TASK_ID)));
        JobConf mapReduceJobConf = new JobConf();
        mapReduceJobConf.setNumMapTasks(10);
        mapReduceJobConf.setNumReduceTasks(10);
        Assert.assertFalse((boolean)committer.needsTaskCommit((TaskAttemptContext)new TaskAttemptContextImpl(mapReduceJobConf, MAP_TASK_ID)));
        Assert.assertTrue((boolean)committer.needsTaskCommit((TaskAttemptContext)new TaskAttemptContextImpl(mapReduceJobConf, REDUCE_TASK_ID)));
    }

    @Test
    public void testSuccessfulUnpartitionedWrite() throws IOException {
        HiveIcebergOutputCommitter committer = new HiveIcebergOutputCommitter();
        Table table = this.table(this.temp.getRoot().getPath(), false);
        JobConf conf = this.jobConf(table, 1);
        List<Record> expected = this.writeRecords(table.name(), 1, 0, true, false, conf);
        committer.commitJob((JobContext)new JobContextImpl(conf, (org.apache.hadoop.mapreduce.JobID)JOB_ID));
        HiveIcebergTestUtils.validateFiles(table, (Configuration)conf, JOB_ID, 1);
        HiveIcebergTestUtils.validateData(table, expected, 0);
    }

    @Test
    public void testSuccessfulPartitionedWrite() throws IOException {
        HiveIcebergOutputCommitter committer = new HiveIcebergOutputCommitter();
        Table table = this.table(this.temp.getRoot().getPath(), true);
        JobConf conf = this.jobConf(table, 1);
        List<Record> expected = this.writeRecords(table.name(), 1, 0, true, false, conf);
        committer.commitJob((JobContext)new JobContextImpl(conf, (org.apache.hadoop.mapreduce.JobID)JOB_ID));
        HiveIcebergTestUtils.validateFiles(table, (Configuration)conf, JOB_ID, 2);
        HiveIcebergTestUtils.validateData(table, expected, 0);
    }

    @Test
    public void testSuccessfulMultipleTasksUnpartitionedWrite() throws IOException {
        HiveIcebergOutputCommitter committer = new HiveIcebergOutputCommitter();
        Table table = this.table(this.temp.getRoot().getPath(), false);
        JobConf conf = this.jobConf(table, 2);
        List<Record> expected = this.writeRecords(table.name(), 2, 0, true, false, conf);
        committer.commitJob((JobContext)new JobContextImpl(conf, (org.apache.hadoop.mapreduce.JobID)JOB_ID));
        HiveIcebergTestUtils.validateFiles(table, (Configuration)conf, JOB_ID, 2);
        HiveIcebergTestUtils.validateData(table, expected, 1);
    }

    @Test
    public void testSuccessfulMultipleTasksPartitionedWrite() throws IOException {
        HiveIcebergOutputCommitter committer = new HiveIcebergOutputCommitter();
        Table table = this.table(this.temp.getRoot().getPath(), true);
        JobConf conf = this.jobConf(table, 2);
        List<Record> expected = this.writeRecords(table.name(), 2, 0, true, false, conf);
        committer.commitJob((JobContext)new JobContextImpl(conf, (org.apache.hadoop.mapreduce.JobID)JOB_ID));
        HiveIcebergTestUtils.validateFiles(table, (Configuration)conf, JOB_ID, 4);
        HiveIcebergTestUtils.validateData(table, expected, 1);
    }

    @Test
    public void testRetryTask() throws IOException {
        HiveIcebergOutputCommitter committer = new HiveIcebergOutputCommitter();
        Table table = this.table(this.temp.getRoot().getPath(), false);
        JobConf conf = this.jobConf(table, 2);
        this.writeRecords(table.name(), 2, 0, false, true, conf);
        HiveIcebergTestUtils.validateFiles(table, (Configuration)conf, JOB_ID, 0);
        HiveIcebergTestUtils.validateData(table, Collections.emptyList(), 0);
        this.writeRecords(table.name(), 2, 1, false, false, conf);
        HiveIcebergTestUtils.validateFiles(table, (Configuration)conf, JOB_ID, 2);
        HiveIcebergTestUtils.validateData(table, Collections.emptyList(), 0);
        List<Record> expected = this.writeRecords(table.name(), 2, 2, true, false, conf);
        committer.commitJob((JobContext)new JobContextImpl(conf, (org.apache.hadoop.mapreduce.JobID)JOB_ID));
        HiveIcebergTestUtils.validateFiles(table, (Configuration)conf, JOB_ID, 4);
        HiveIcebergTestUtils.validateData(table, expected, 1);
    }

    @Test
    public void testAbortJob() throws IOException {
        HiveIcebergOutputCommitter committer = new HiveIcebergOutputCommitter();
        Table table = this.table(this.temp.getRoot().getPath(), false);
        JobConf conf = this.jobConf(table, 1);
        this.writeRecords(table.name(), 1, 0, true, false, conf);
        committer.abortJob((org.apache.hadoop.mapreduce.JobContext)new JobContextImpl(conf, (org.apache.hadoop.mapreduce.JobID)JOB_ID), JobStatus.State.FAILED);
        HiveIcebergTestUtils.validateFiles(table, (Configuration)conf, JOB_ID, 0);
        HiveIcebergTestUtils.validateData(table, Collections.emptyList(), 0);
    }

    @Test
    public void writerIsClosedAfterTaskCommitFailure() throws IOException {
        HiveIcebergOutputCommitter committer = new HiveIcebergOutputCommitter();
        HiveIcebergOutputCommitter failingCommitter = (HiveIcebergOutputCommitter)Mockito.spy((Object)committer);
        ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(TaskAttemptContextImpl.class);
        String exceptionMessage = "Commit task failed!";
        ((HiveIcebergOutputCommitter)Mockito.doThrow((Throwable[])new Throwable[]{new RuntimeException(exceptionMessage)}).when((Object)failingCommitter)).commitTask((TaskAttemptContext)argumentCaptor.capture());
        Table table = this.table(this.temp.getRoot().getPath(), false);
        JobConf conf = this.jobConf(table, 1);
        try {
            this.writeRecords(table.name(), 1, 0, true, false, conf, (OutputCommitter)failingCommitter);
            Assert.fail();
        }
        catch (RuntimeException e) {
            Assert.assertTrue((boolean)e.getMessage().contains(exceptionMessage));
        }
        Assert.assertEquals((long)1L, (long)argumentCaptor.getAllValues().size());
        TaskAttemptID capturedId = TezUtil.taskAttemptWrapper((TaskAttemptID)((TaskAttemptContextImpl)argumentCaptor.getValue()).getTaskAttemptID());
        Assert.assertNotNull((Object)WriterRegistry.writers((TaskAttemptID)capturedId));
        failingCommitter.abortTask((TaskAttemptContext)new TaskAttemptContextImpl(conf, capturedId));
        Assert.assertNull((Object)WriterRegistry.writers((TaskAttemptID)capturedId));
    }

    private Table table(String location, boolean partitioned) {
        HadoopTables tables = new HadoopTables();
        return tables.create(CUSTOMER_SCHEMA, partitioned ? PARTITIONED_SPEC : PartitionSpec.unpartitioned(), (Map)ImmutableMap.of((Object)"iceberg.catalog", (Object)"location_based_table"), location);
    }

    private JobConf jobConf(Table table, int taskNum) {
        JobConf conf = new JobConf();
        conf.setNumMapTasks(taskNum);
        conf.setNumReduceTasks(0);
        conf.set(HiveConf.ConfVars.HIVE_QUERY_ID.varname, QUERY_ID);
        conf.set("iceberg.mr.output.tables", table.name());
        conf.set("iceberg.mr.operation.type." + table.name(), Context.Operation.OTHER.name());
        conf.set("iceberg.mr.table.catalog." + table.name(), (String)table.properties().get("iceberg.catalog"));
        conf.set("iceberg.mr.serialized.table." + table.name(), SerializationUtil.serializeToBase64((Object)table));
        HashMap propMap = Maps.newHashMap();
        TableDesc tableDesc = new TableDesc();
        tableDesc.setProperties(new Properties());
        tableDesc.getProperties().setProperty("name", table.name());
        tableDesc.getProperties().setProperty("location", table.location());
        tableDesc.getProperties().setProperty("iceberg.catalog", (String)table.properties().get("iceberg.catalog"));
        HiveIcebergStorageHandler.overlayTableProperties((Configuration)conf, (TableDesc)tableDesc, (Map)propMap);
        propMap.forEach((key, value) -> conf.set(key, value));
        return conf;
    }

    private List<Record> writeRecords(String name, int taskNum, int attemptNum, boolean commitTasks, boolean abortTasks, JobConf conf, OutputCommitter committer) throws IOException {
        ArrayList expected = Lists.newArrayListWithExpectedSize((int)(5 * taskNum));
        Table table = HiveIcebergStorageHandler.table((Configuration)conf, (String)name);
        Schema schema = HiveIcebergStorageHandler.schema((Configuration)conf);
        for (int i = 0; i < taskNum; ++i) {
            List<Record> records = TestHelper.generateRandomRecords(schema, 5, i + attemptNum);
            for (int j = 0; j < 5; ++j) {
                records.get(j).setField("customer_id", (Object)((long)j / 3L));
            }
            TaskAttemptID taskId = new TaskAttemptID(JOB_ID.getJtIdentifier(), JOB_ID.getId(), TaskType.MAP, i, attemptNum);
            HiveIcebergWriter testWriter = WriterBuilder.builderFor((Table)table).attemptID(TezUtil.taskAttemptWrapper((TaskAttemptID)taskId)).queryId("Q_ID").tableName(conf.get("name")).operation(Context.Operation.OTHER).build();
            Container container = new Container();
            for (Record record : records) {
                container.set((Object)record);
                testWriter.write((Writable)container);
            }
            testWriter.close(false);
            if (commitTasks) {
                committer.commitTask((TaskAttemptContext)new TaskAttemptContextImpl(conf, taskId));
                expected.addAll(records);
                continue;
            }
            if (!abortTasks) continue;
            committer.abortTask((TaskAttemptContext)new TaskAttemptContextImpl(conf, taskId));
        }
        return expected;
    }

    private List<Record> writeRecords(String name, int taskNum, int attemptNum, boolean commitTasks, boolean abortTasks, JobConf conf) throws IOException {
        return this.writeRecords(name, taskNum, attemptNum, commitTasks, abortTasks, conf, (OutputCommitter)new HiveIcebergOutputCommitter());
    }
}

