/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hive.ql.exec.tez;

import java.io.IOException;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.tez.HiveSplitGenerator;
import org.apache.hadoop.hive.ql.io.HiveInputFormat;
import org.apache.hadoop.hive.ql.io.NullRowsInputFormat;
import org.apache.hadoop.hive.ql.io.ZeroRowsInputFormat;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.DataSourceDescriptor;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.event.VertexState;
import org.apache.tez.mapreduce.hadoop.InputSplitInfoMem;
import org.apache.tez.mapreduce.input.MRInput;
import org.apache.tez.mapreduce.protos.MRRuntimeProtos;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.InputInitializerContext;
import org.apache.tez.runtime.api.events.InputDataInformationEvent;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestHiveSplitGenerator {
    private static final Logger LOG = LoggerFactory.getLogger(TestHiveSplitGenerator.class);
    private static final String QUERY_ID = "hive_" + System.currentTimeMillis();
    private static final ApplicationId APP_ID = ApplicationId.newInstance((long)1000L, (int)200);
    private static final String INPUT_NAME = "MRInput";
    private static final int VERTEX_ID = 0;
    private static final String PATH_FORMAT = "events/%s/%d_%s_InputDataInformationEvent_%d";

    @Test
    public void testSplitSerializationUsingFileSystem() throws Exception {
        JobConf conf = new JobConf((Configuration)new HiveConf());
        HiveConf.setIntVar((Configuration)conf, (HiveConf.ConfVars)HiveConf.ConfVars.HIVE_TEZ_INPUT_FS_SERIALIZATION_THRESHOLD, (int)0);
        InputSplit[] splits = this.getSplits(1);
        String serializedPath = ((InputDataInformationEvent)this.generateEvents(conf, splits).get(1)).getSerializedPath();
        Assert.assertTrue((String)("Serialized path should contain a pattern like 'events/$queryid/$vertexid_$inputname_InputDataInformationEvent_$count', but found: " + serializedPath), (boolean)serializedPath.contains(this.expectedSerializedPathFormat(0)));
    }

    @Test
    public void testSplitSerializationInMemoryLimitExceeded() throws Exception {
        JobConf conf = new JobConf((Configuration)new HiveConf());
        HiveConf.setIntVar((Configuration)conf, (HiveConf.ConfVars)HiveConf.ConfVars.HIVE_TEZ_INPUT_FS_SERIALIZATION_THRESHOLD, (int)10);
        InputSplit[] splits = this.getSplits(2);
        List<Event> events = this.generateEvents(conf, splits);
        String serializedPath1 = ((InputDataInformationEvent)events.get(1)).getSerializedPath();
        String serializedPath2 = ((InputDataInformationEvent)events.get(2)).getSerializedPath();
        Assert.assertNull((Object)serializedPath1);
        Assert.assertTrue((String)("Serialized path should contain a pattern like 'events/$queryid/$vertexid_$inputname_InputDataInformationEvent_$count', but found: " + serializedPath2), (boolean)serializedPath2.contains(this.expectedSerializedPathFormat(1)));
    }

    @Test
    public void testSplitSerializationNoFileSystemBecauseOfDefaultConfig() throws Exception {
        JobConf conf = new JobConf((Configuration)new HiveConf());
        InputSplit[] splits = this.getSplits(1);
        String serializedPath = ((InputDataInformationEvent)this.generateEvents(conf, splits).get(1)).getSerializedPath();
        Assert.assertNull((Object)serializedPath);
    }

    @Test
    public void testSplitSerializationDisabledFilesystem() throws Exception {
        JobConf conf = new JobConf((Configuration)new HiveConf());
        HiveConf.setIntVar((Configuration)conf, (HiveConf.ConfVars)HiveConf.ConfVars.HIVE_TEZ_INPUT_FS_SERIALIZATION_THRESHOLD, (int)-1);
        InputSplit[] splits = this.getSplits(1);
        String serializedPath = ((InputDataInformationEvent)this.generateEvents(conf, splits).get(1)).getSerializedPath();
        Assert.assertNull((Object)serializedPath);
    }

    @Test
    public void testExceptionIsPropagatedFromSplitSerializer() throws Exception {
        JobConf conf = new JobConf((Configuration)new HiveConf());
        HiveConf.setIntVar((Configuration)conf, (HiveConf.ConfVars)HiveConf.ConfVars.HIVE_TEZ_INPUT_FS_SERIALIZATION_THRESHOLD, (int)0);
        InputSplit[] splits = this.getSplits(3);
        InputInitializerContext context = this.getInputInitializerContext(conf);
        HiveSplitGeneratorSerializerException splitGenerator = new HiveSplitGeneratorSerializerException(context);
        splitGenerator.prepare(context);
        try {
            String serializedPath = ((InputDataInformationEvent)this.generateEvents(conf, splits, splitGenerator).get(1)).getSerializedPath();
            Assert.fail((String)"HiveSplitGeneratorSerializerException should fail");
        }
        catch (Exception e) {
            Assert.assertEquals(RuntimeException.class, e.getClass());
            Throwable wrapperRuntimeException = e.getCause();
            Assert.assertEquals(RuntimeException.class, wrapperRuntimeException.getClass());
            Throwable originalException = wrapperRuntimeException.getCause();
            Assert.assertEquals(IOException.class, originalException.getClass());
            Assert.assertTrue((boolean)originalException.getMessage().contains("Cannot write file to path"));
            Assert.assertTrue((String)"Already running future in not supposed to be cancelled with the current implementation", (boolean)splitGenerator.split0Finished.get());
            Assert.assertFalse((String)"A future started after a failure is not supposed to run at all", (boolean)splitGenerator.split2Finished.get());
        }
    }

    private InputSplit[] getSplits(int numSplits) {
        InputSplit[] splits = new InputSplit[numSplits];
        for (int i = 0; i < numSplits; ++i) {
            splits[i] = new HiveInputFormat.HiveInputSplit((InputSplit)new NullRowsInputFormat.DummyInputSplit("/fake" + i), ZeroRowsInputFormat.class.getName());
        }
        return splits;
    }

    private String expectedSerializedPathFormat(int count) {
        return String.format(PATH_FORMAT, QUERY_ID, 0, INPUT_NAME, count);
    }

    private List<Event> generateEvents(JobConf conf, InputSplit[] splits) throws Exception {
        return this.generateEvents(conf, splits, null);
    }

    private List<Event> generateEvents(JobConf conf, InputSplit[] splits, HiveSplitGenerator splitGenerator) throws Exception {
        if (splitGenerator == null) {
            InputInitializerContext inputInitializerContext = this.getInputInitializerContext(conf);
            splitGenerator = new HiveSplitGenerator(inputInitializerContext);
            splitGenerator.prepare(inputInitializerContext);
        }
        InputSplitInfoMem splitInfo = new InputSplitInfoMem(splits, null, 1, null, (Configuration)conf);
        return splitGenerator.createEventList(true, splitInfo);
    }

    private InputInitializerContext getInputInitializerContext(JobConf conf) {
        HiveConf.setVar((Configuration)conf, (HiveConf.ConfVars)HiveConf.ConfVars.HIVE_EXECUTION_ENGINE, (String)"tez");
        conf.set("_hive.hdfs.session.path", "/tmp");
        conf.set("_hive.local.session.path", "/tmp");
        HiveConf.setVar((Configuration)conf, (HiveConf.ConfVars)HiveConf.ConfVars.HIVE_QUERY_ID, (String)QUERY_ID);
        MapWork mapWork = new MapWork("Map1");
        mapWork.deriveLlap((Configuration)conf, false);
        HiveConf.setVar((Configuration)conf, (HiveConf.ConfVars)HiveConf.ConfVars.PLAN, (String)"file:/tmp");
        Utilities.setMapWork((Configuration)conf, (MapWork)mapWork);
        DataSourceDescriptor dataSource = MRInput.createConfigBuilder((Configuration)conf, HiveInputFormat.class).build();
        UserPayload userPayload = dataSource.getInputDescriptor().getUserPayload();
        return new InputInitializerContextForTest(userPayload, (Configuration)conf);
    }

    public static class HiveSplitGeneratorSerializerException
    extends HiveSplitGenerator {
        private static final String EXCEPTION_MESSAGE = "Cannot write file to path";
        private final AtomicBoolean split0Finished = new AtomicBoolean(false);
        private final AtomicBoolean split2Finished = new AtomicBoolean(false);

        public HiveSplitGeneratorSerializerException(InputInitializerContext initializerContext) {
            super(initializerContext);
        }

        HiveSplitGenerator.SplitSerializer getSplitSerializer() throws IOException {
            return new SplitSerializerWithException();
        }

        class SplitSerializerWithException
        extends HiveSplitGenerator.SplitSerializer {
            SplitSerializerWithException() throws IOException {
                super((HiveSplitGenerator)HiveSplitGeneratorSerializerException.this);
            }

            InputDataInformationEvent write(int count, MRRuntimeProtos.MRSplitProto mrSplit) {
                if (count == 2) {
                    try {
                        Thread.sleep(1000L);
                    }
                    catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }
                return super.write(count, mrSplit);
            }

            void writeSplit(int count, MRRuntimeProtos.MRSplitProto mrSplit, Path filePath) throws IOException {
                LOG.info("Write split #{}", (Object)count);
                if (count == 0) {
                    try {
                        Thread.sleep(1000L);
                        HiveSplitGeneratorSerializerException.this.split0Finished.set(true);
                        LOG.info("Split #0 finished");
                    }
                    catch (InterruptedException e) {
                        LOG.info("Split #0 catches InterruptedException, this is not supposed to happen");
                        throw new IOException(e);
                    }
                }
                if (count == 1) {
                    LOG.info("Split #1 is about to throw exception");
                    throw new IOException("Cannot write file to path: " + filePath);
                }
                if (count == 2) {
                    HiveSplitGeneratorSerializerException.this.split2Finished.set(true);
                    LOG.info("Split #2 finished");
                }
            }
        }
    }

    public static class InputInitializerContextForTest
    implements InputInitializerContext {
        private final UserPayload payload;
        private final Configuration vertexConfig;

        public InputInitializerContextForTest(UserPayload payload, Configuration vertexConfig) {
            this.payload = payload;
            this.vertexConfig = vertexConfig;
        }

        public ApplicationId getApplicationId() {
            return APP_ID;
        }

        public String getDAGName() {
            return "FakeDAG";
        }

        public Configuration getVertexConfiguration() {
            return this.vertexConfig;
        }

        public String getInputName() {
            return TestHiveSplitGenerator.INPUT_NAME;
        }

        public UserPayload getInputUserPayload() {
            return this.payload;
        }

        public int getNumTasks() {
            return 100;
        }

        public Resource getVertexTaskResource() {
            return Resource.newInstance((int)1024, (int)1);
        }

        public int getVertexId() {
            return 0;
        }

        public Resource getTotalAvailableResource() {
            return Resource.newInstance((int)10240, (int)10);
        }

        public int getNumClusterNodes() {
            return 10;
        }

        public int getDAGAttemptNumber() {
            return 1;
        }

        public int getVertexNumTasks(String vertexName) {
            throw new UnsupportedOperationException("getVertexNumTasks not implemented in this mock");
        }

        public void registerForVertexStateUpdates(String vertexName, Set<VertexState> stateSet) {
            throw new UnsupportedOperationException("getVertexNumTasks not implemented in this mock");
        }

        public void addCounters(TezCounters tezCounters) {
            throw new UnsupportedOperationException("addCounters not implemented in this mock");
        }

        public UserPayload getUserPayload() {
            throw new UnsupportedOperationException("getUserPayload not implemented in this mock");
        }
    }
}

