/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hive.druid.org.apache.druid.data.input.impl.prefetch;

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.net.SocketException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.filefilter.IOFileFilter;
import org.apache.commons.io.filefilter.TrueFileFilter;
import org.apache.hive.druid.com.google.common.base.Preconditions;
import org.apache.hive.druid.com.google.common.base.Predicate;
import org.apache.hive.druid.com.google.common.io.CountingOutputStream;
import org.apache.hive.druid.org.apache.druid.common.config.NullHandling;
import org.apache.hive.druid.org.apache.druid.data.input.FiniteFirehoseFactory;
import org.apache.hive.druid.org.apache.druid.data.input.Firehose;
import org.apache.hive.druid.org.apache.druid.data.input.InputSplit;
import org.apache.hive.druid.org.apache.druid.data.input.Row;
import org.apache.hive.druid.org.apache.druid.data.input.impl.CSVParseSpec;
import org.apache.hive.druid.org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.hive.druid.org.apache.druid.data.input.impl.ParseSpec;
import org.apache.hive.druid.org.apache.druid.data.input.impl.StringInputRowParser;
import org.apache.hive.druid.org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.hive.druid.org.apache.druid.data.input.impl.prefetch.PrefetchableTextFilesFirehoseFactory;
import org.apache.hive.druid.org.apache.druid.java.util.common.DateTimes;
import org.apache.hive.druid.org.apache.druid.java.util.common.StringUtils;
import org.hamcrest.CoreMatchers;
import org.joda.time.ReadableInstant;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;

public class PrefetchableTextFilesFirehoseFactoryTest {
    private static long FILE_SIZE = -1L;
    private static final StringInputRowParser PARSER = new StringInputRowParser((ParseSpec)new CSVParseSpec(new TimestampSpec("timestamp", "auto", null), new DimensionsSpec(DimensionsSpec.getDefaultSchemas(Arrays.asList("timestamp", "a", "b")), new ArrayList(), new ArrayList()), ",", Arrays.asList("timestamp", "a", "b"), false, 0), StandardCharsets.UTF_8.name());
    @ClassRule
    public static TemporaryFolder tempDir = new TemporaryFolder();
    private static File TEST_DIR;
    @Rule
    public ExpectedException expectedException = ExpectedException.none();

    @BeforeClass
    public static void setup() throws IOException {
        NullHandling.initializeForTests();
        TEST_DIR = tempDir.newFolder();
        for (int i = 0; i < 100; ++i) {
            try (CountingOutputStream cos = new CountingOutputStream(Files.newOutputStream(new File(TEST_DIR, "test_" + i).toPath(), new OpenOption[0]));
                 BufferedWriter writer = new BufferedWriter(new OutputStreamWriter((OutputStream)cos, StandardCharsets.UTF_8));){
                for (int j = 0; j < 100; ++j) {
                    String a = StringUtils.format((String)"%d,%03d,%03d\n", (Object[])new Object[]{20171220 + i, i, j});
                    writer.write(a);
                }
                ((Writer)writer).flush();
                if (FILE_SIZE == -1L) {
                    FILE_SIZE = cos.getCount();
                    continue;
                }
                Assert.assertEquals((long)FILE_SIZE, (long)cos.getCount());
                continue;
            }
        }
    }

    private static void assertResult(List<Row> rows) {
        Assert.assertEquals((long)10000L, (long)rows.size());
        rows.sort((r1, r2) -> {
            int c = r1.getTimestamp().compareTo((ReadableInstant)r2.getTimestamp());
            if (c != 0) {
                return c;
            }
            c = Integer.valueOf((String)r1.getDimension("a").get(0)).compareTo(Integer.valueOf((String)r2.getDimension("a").get(0)));
            if (c != 0) {
                return c;
            }
            return Integer.valueOf((String)r1.getDimension("b").get(0)).compareTo(Integer.valueOf((String)r2.getDimension("b").get(0)));
        });
        for (int i = 0; i < 100; ++i) {
            for (int j = 0; j < 100; ++j) {
                Row row = rows.get(i * 100 + j);
                Assert.assertEquals((Object)DateTimes.utc((long)(20171220 + i)), (Object)row.getTimestamp());
                Assert.assertEquals((long)i, (long)Integer.valueOf((String)row.getDimension("a").get(0)).intValue());
                Assert.assertEquals((long)j, (long)Integer.valueOf((String)row.getDimension("b").get(0)).intValue());
            }
        }
    }

    private static void assertNumRemainingCacheFiles(File firehoseTmpDir, int expectedNumFiles) {
        String[] files = firehoseTmpDir.list();
        Assert.assertNotNull((Object)files);
        Assert.assertEquals((long)expectedNumFiles, (long)files.length);
    }

    private static File createFirehoseTmpDir(String dirPrefix) throws IOException {
        return Files.createTempDirectory(tempDir.getRoot().toPath(), dirPrefix, new FileAttribute[0]).toFile();
    }

    @Test
    public void testWithoutCacheAndFetch() throws IOException {
        TestPrefetchableTextFilesFirehoseFactory factory = TestPrefetchableTextFilesFirehoseFactory.with(TEST_DIR, 0L, 0L);
        ArrayList<Row> rows = new ArrayList<Row>();
        File firehoseTmpDir = PrefetchableTextFilesFirehoseFactoryTest.createFirehoseTmpDir("testWithoutCacheAndFetch");
        try (Firehose firehose = factory.connect(PARSER, firehoseTmpDir);){
            while (firehose.hasMore()) {
                rows.add((Row)firehose.nextRow());
            }
        }
        Assert.assertEquals((long)0L, (long)factory.getCacheManager().getTotalCachedBytes());
        PrefetchableTextFilesFirehoseFactoryTest.assertResult(rows);
        PrefetchableTextFilesFirehoseFactoryTest.assertNumRemainingCacheFiles(firehoseTmpDir, 0);
    }

    @Test
    public void testWithoutCacheAndFetchAgainstConnectionReset() throws IOException {
        TestPrefetchableTextFilesFirehoseFactory factory = TestPrefetchableTextFilesFirehoseFactory.withConnectionResets(TEST_DIR, 0L, 0L, 2);
        ArrayList<Row> rows = new ArrayList<Row>();
        File firehoseTmpDir = PrefetchableTextFilesFirehoseFactoryTest.createFirehoseTmpDir("testWithoutCacheAndFetch");
        try (Firehose firehose = factory.connect(PARSER, firehoseTmpDir);){
            while (firehose.hasMore()) {
                rows.add((Row)firehose.nextRow());
            }
        }
        Assert.assertEquals((long)0L, (long)factory.getCacheManager().getTotalCachedBytes());
        PrefetchableTextFilesFirehoseFactoryTest.assertResult(rows);
        PrefetchableTextFilesFirehoseFactoryTest.assertNumRemainingCacheFiles(firehoseTmpDir, 0);
    }

    @Test
    public void testWithoutCache() throws IOException {
        TestPrefetchableTextFilesFirehoseFactory factory = TestPrefetchableTextFilesFirehoseFactory.with(TEST_DIR, 0L, 2048L);
        ArrayList<Row> rows = new ArrayList<Row>();
        File firehoseTmpDir = PrefetchableTextFilesFirehoseFactoryTest.createFirehoseTmpDir("testWithoutCache");
        try (Firehose firehose = factory.connect(PARSER, firehoseTmpDir);){
            while (firehose.hasMore()) {
                rows.add((Row)firehose.nextRow());
            }
        }
        Assert.assertEquals((long)0L, (long)factory.getCacheManager().getTotalCachedBytes());
        PrefetchableTextFilesFirehoseFactoryTest.assertResult(rows);
        PrefetchableTextFilesFirehoseFactoryTest.assertNumRemainingCacheFiles(firehoseTmpDir, 0);
    }

    @Test
    public void testWithZeroFetchCapacity() throws IOException {
        TestPrefetchableTextFilesFirehoseFactory factory = TestPrefetchableTextFilesFirehoseFactory.with(TEST_DIR, 2048L, 0L);
        ArrayList<Row> rows = new ArrayList<Row>();
        File firehoseTmpDir = PrefetchableTextFilesFirehoseFactoryTest.createFirehoseTmpDir("testWithZeroFetchCapacity");
        try (Firehose firehose = factory.connect(PARSER, firehoseTmpDir);){
            while (firehose.hasMore()) {
                rows.add((Row)firehose.nextRow());
            }
        }
        PrefetchableTextFilesFirehoseFactoryTest.assertResult(rows);
        PrefetchableTextFilesFirehoseFactoryTest.assertNumRemainingCacheFiles(firehoseTmpDir, 2);
    }

    @Test
    public void testWithCacheAndFetch() throws IOException {
        TestPrefetchableTextFilesFirehoseFactory factory = TestPrefetchableTextFilesFirehoseFactory.of(TEST_DIR);
        ArrayList<Row> rows = new ArrayList<Row>();
        File firehoseTmpDir = PrefetchableTextFilesFirehoseFactoryTest.createFirehoseTmpDir("testWithCacheAndFetch");
        try (Firehose firehose = factory.connect(PARSER, firehoseTmpDir);){
            while (firehose.hasMore()) {
                rows.add((Row)firehose.nextRow());
            }
        }
        PrefetchableTextFilesFirehoseFactoryTest.assertResult(rows);
        PrefetchableTextFilesFirehoseFactoryTest.assertNumRemainingCacheFiles(firehoseTmpDir, 2);
    }

    @Test
    public void testWithLargeCacheAndSmallFetch() throws IOException {
        TestPrefetchableTextFilesFirehoseFactory factory = TestPrefetchableTextFilesFirehoseFactory.with(TEST_DIR, 2048L, 1024L);
        ArrayList<Row> rows = new ArrayList<Row>();
        File firehoseTmpDir = PrefetchableTextFilesFirehoseFactoryTest.createFirehoseTmpDir("testWithLargeCacheAndSmallFetch");
        try (Firehose firehose = factory.connect(PARSER, firehoseTmpDir);){
            while (firehose.hasMore()) {
                rows.add((Row)firehose.nextRow());
            }
        }
        PrefetchableTextFilesFirehoseFactoryTest.assertResult(rows);
        PrefetchableTextFilesFirehoseFactoryTest.assertNumRemainingCacheFiles(firehoseTmpDir, 2);
    }

    @Test
    public void testWithSmallCacheAndLargeFetch() throws IOException {
        TestPrefetchableTextFilesFirehoseFactory factory = TestPrefetchableTextFilesFirehoseFactory.with(TEST_DIR, 1024L, 2048L);
        ArrayList<Row> rows = new ArrayList<Row>();
        File firehoseTmpDir = PrefetchableTextFilesFirehoseFactoryTest.createFirehoseTmpDir("testWithSmallCacheAndLargeFetch");
        try (Firehose firehose = factory.connect(PARSER, firehoseTmpDir);){
            while (firehose.hasMore()) {
                rows.add((Row)firehose.nextRow());
            }
        }
        PrefetchableTextFilesFirehoseFactoryTest.assertResult(rows);
        PrefetchableTextFilesFirehoseFactoryTest.assertNumRemainingCacheFiles(firehoseTmpDir, 1);
    }

    @Test
    public void testRetry() throws IOException {
        TestPrefetchableTextFilesFirehoseFactory factory = TestPrefetchableTextFilesFirehoseFactory.withOpenExceptions(TEST_DIR, 1);
        ArrayList<Row> rows = new ArrayList<Row>();
        File firehoseTmpDir = PrefetchableTextFilesFirehoseFactoryTest.createFirehoseTmpDir("testRetry");
        try (Firehose firehose = factory.connect(PARSER, firehoseTmpDir);){
            while (firehose.hasMore()) {
                rows.add((Row)firehose.nextRow());
            }
        }
        PrefetchableTextFilesFirehoseFactoryTest.assertResult(rows);
        PrefetchableTextFilesFirehoseFactoryTest.assertNumRemainingCacheFiles(firehoseTmpDir, 2);
    }

    @Test
    public void testMaxRetry() throws IOException {
        this.expectedException.expect(RuntimeException.class);
        this.expectedException.expectCause(CoreMatchers.instanceOf(ExecutionException.class));
        this.expectedException.expectMessage("Exception for retry test");
        TestPrefetchableTextFilesFirehoseFactory factory = TestPrefetchableTextFilesFirehoseFactory.withOpenExceptions(TEST_DIR, 5);
        try (Firehose firehose = factory.connect(PARSER, PrefetchableTextFilesFirehoseFactoryTest.createFirehoseTmpDir("testMaxRetry"));){
            while (firehose.hasMore()) {
                firehose.nextRow();
            }
        }
    }

    @Test
    public void testTimeout() throws IOException {
        this.expectedException.expect(RuntimeException.class);
        this.expectedException.expectCause(CoreMatchers.instanceOf(TimeoutException.class));
        TestPrefetchableTextFilesFirehoseFactory factory = TestPrefetchableTextFilesFirehoseFactory.withSleepMillis(TEST_DIR, 1000L);
        try (Firehose firehose = factory.connect(PARSER, PrefetchableTextFilesFirehoseFactoryTest.createFirehoseTmpDir("testTimeout"));){
            while (firehose.hasMore()) {
                firehose.nextRow();
            }
        }
    }

    @Test
    public void testReconnectWithCacheAndPrefetch() throws IOException {
        TestPrefetchableTextFilesFirehoseFactory factory = TestPrefetchableTextFilesFirehoseFactory.of(TEST_DIR);
        File firehoseTmpDir = PrefetchableTextFilesFirehoseFactoryTest.createFirehoseTmpDir("testReconnectWithCacheAndPrefetch");
        for (int i = 0; i < 5; ++i) {
            ArrayList<Row> rows = new ArrayList<Row>();
            try (Firehose firehose = factory.connect(PARSER, firehoseTmpDir);){
                if (i > 0) {
                    Assert.assertEquals((long)(FILE_SIZE * 2L), (long)factory.getCacheManager().getTotalCachedBytes());
                }
                while (firehose.hasMore()) {
                    rows.add((Row)firehose.nextRow());
                }
            }
            PrefetchableTextFilesFirehoseFactoryTest.assertResult(rows);
            PrefetchableTextFilesFirehoseFactoryTest.assertNumRemainingCacheFiles(firehoseTmpDir, 2);
        }
    }

    @Test
    public void testReconnectWithCache() throws IOException {
        TestPrefetchableTextFilesFirehoseFactory factory = TestPrefetchableTextFilesFirehoseFactory.with(TEST_DIR, 2048L, 0L);
        File firehoseTmpDir = PrefetchableTextFilesFirehoseFactoryTest.createFirehoseTmpDir("testReconnectWithCache");
        for (int i = 0; i < 5; ++i) {
            ArrayList<Row> rows = new ArrayList<Row>();
            try (Firehose firehose = factory.connect(PARSER, firehoseTmpDir);){
                if (i > 0) {
                    Assert.assertEquals((long)(FILE_SIZE * 2L), (long)factory.getCacheManager().getTotalCachedBytes());
                }
                while (firehose.hasMore()) {
                    rows.add((Row)firehose.nextRow());
                }
            }
            PrefetchableTextFilesFirehoseFactoryTest.assertResult(rows);
            PrefetchableTextFilesFirehoseFactoryTest.assertNumRemainingCacheFiles(firehoseTmpDir, 2);
        }
    }

    static class TestPrefetchableTextFilesFirehoseFactory
    extends PrefetchableTextFilesFirehoseFactory<File> {
        private final long sleepMillis;
        private final File baseDir;
        private int numOpenExceptions;
        private int maxConnectionResets;
        private int readCount;
        private int numConnectionResets;

        static TestPrefetchableTextFilesFirehoseFactory with(File baseDir, long cacheCapacity, long fetchCapacity) {
            return new TestPrefetchableTextFilesFirehoseFactory(baseDir, 1024L, cacheCapacity, fetchCapacity, 60000L, 3, 0, 0, 0L);
        }

        static TestPrefetchableTextFilesFirehoseFactory of(File baseDir) {
            return new TestPrefetchableTextFilesFirehoseFactory(baseDir, 1024L, 2048L, 2048L, 3, 0, 0, 0L);
        }

        static TestPrefetchableTextFilesFirehoseFactory withOpenExceptions(File baseDir, int count) {
            return new TestPrefetchableTextFilesFirehoseFactory(baseDir, 1024L, 2048L, 2048L, 3, count, 0, 0L);
        }

        static TestPrefetchableTextFilesFirehoseFactory withConnectionResets(File baseDir, long cacheCapacity, long fetchCapacity, int numConnectionResets) {
            return new TestPrefetchableTextFilesFirehoseFactory(baseDir, fetchCapacity / 2L, cacheCapacity, fetchCapacity, 3, 0, numConnectionResets, 0L);
        }

        static TestPrefetchableTextFilesFirehoseFactory withSleepMillis(File baseDir, long ms) {
            return new TestPrefetchableTextFilesFirehoseFactory(baseDir, 1024L, 2048L, 2048L, 100L, 3, 0, 0, ms);
        }

        private static long computeTimeout(int maxRetry) {
            double maxFuzzyMultiplier = 2.0;
            return (long)Math.min(60000.0, 1000.0 * Math.pow(2.0, maxRetry - 1) * 2.0);
        }

        TestPrefetchableTextFilesFirehoseFactory(File baseDir, long prefetchTriggerThreshold, long maxCacheCapacityBytes, long maxFetchCapacityBytes, int maxRetry, int numOpenExceptions, int numConnectionResets, long sleepMillis) {
            this(baseDir, prefetchTriggerThreshold, maxCacheCapacityBytes, maxFetchCapacityBytes, TestPrefetchableTextFilesFirehoseFactory.computeTimeout(maxRetry), maxRetry, numOpenExceptions, numConnectionResets, sleepMillis);
        }

        TestPrefetchableTextFilesFirehoseFactory(File baseDir, long prefetchTriggerThreshold, long maxCacheCapacityBytes, long maxFetchCapacityBytes, long fetchTimeout, int maxRetry, int numOpenExceptions, int maxConnectionResets, long sleepMillis) {
            super(Long.valueOf(maxCacheCapacityBytes), Long.valueOf(maxFetchCapacityBytes), Long.valueOf(prefetchTriggerThreshold), Long.valueOf(fetchTimeout), Integer.valueOf(maxRetry));
            this.numOpenExceptions = numOpenExceptions;
            this.maxConnectionResets = maxConnectionResets;
            this.sleepMillis = sleepMillis;
            this.baseDir = baseDir;
        }

        protected Collection<File> initObjects() {
            return FileUtils.listFiles((File)((File)Preconditions.checkNotNull((Object)this.baseDir)).getAbsoluteFile(), (IOFileFilter)TrueFileFilter.INSTANCE, (IOFileFilter)TrueFileFilter.INSTANCE);
        }

        protected InputStream openObjectStream(File object) throws IOException {
            if (this.numOpenExceptions > 0) {
                --this.numOpenExceptions;
                throw new IOException("Exception for retry test");
            }
            if (this.sleepMillis > 0L) {
                try {
                    Thread.sleep(this.sleepMillis);
                }
                catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
            return this.maxConnectionResets > 0 ? new TestInputStream(FileUtils.openInputStream((File)object), this.maxConnectionResets) : FileUtils.openInputStream((File)object);
        }

        protected InputStream wrapObjectStream(File object, InputStream stream) {
            return stream;
        }

        protected Predicate<Throwable> getRetryCondition() {
            return e -> e instanceof IOException;
        }

        protected InputStream openObjectStream(File object, long start) throws IOException {
            if (this.numOpenExceptions > 0) {
                --this.numOpenExceptions;
                throw new IOException("Exception for retry test");
            }
            if (this.sleepMillis > 0L) {
                try {
                    Thread.sleep(this.sleepMillis);
                }
                catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
            FileInputStream in = FileUtils.openInputStream((File)object);
            ((InputStream)in).skip(start);
            return this.maxConnectionResets > 0 ? new TestInputStream(in, this.maxConnectionResets) : in;
        }

        public FiniteFirehoseFactory<StringInputRowParser, File> withSplit(InputSplit<File> split) {
            throw new UnsupportedOperationException();
        }

        private class TestInputStream
        extends InputStream {
            private static final int NUM_READ_COUNTS_BEFORE_ERROR = 10;
            private final InputStream delegate;
            private final int maxConnectionResets;

            TestInputStream(InputStream delegate, int maxConnectionResets) {
                this.delegate = delegate;
                this.maxConnectionResets = maxConnectionResets;
            }

            @Override
            public int read() throws IOException {
                if (TestPrefetchableTextFilesFirehoseFactory.this.readCount++ % 10 == 0 && TestPrefetchableTextFilesFirehoseFactory.this.numConnectionResets++ < this.maxConnectionResets) {
                    throw new SocketException("Test Connection reset");
                }
                return this.delegate.read();
            }

            @Override
            public int read(byte[] b, int off, int len) throws IOException {
                if (TestPrefetchableTextFilesFirehoseFactory.this.readCount++ % 10 == 0 && TestPrefetchableTextFilesFirehoseFactory.this.numConnectionResets++ < this.maxConnectionResets) {
                    throw new SocketException("Test Connection reset");
                }
                return this.delegate.read(b, off, len);
            }
        }
    }
}

