/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hive.ql.exec.repl.util;

import java.io.BufferedWriter;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.repl.util.FileList;
import org.apache.hadoop.hive.ql.exec.repl.util.FileListStreamer;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.slf4j.LoggerFactory;

@RunWith(value=PowerMockRunner.class)
@PrepareForTest(value={LoggerFactory.class})
public class TestFileList {
    @Mock
    private BufferedWriter bufferedWriter;

    @Test
    public void testNoStreaming() throws Exception {
        Object[] tuple = this.setupAndGetTuple(100, false);
        FileList fileList = (FileList)tuple[0];
        FileListStreamer fileListStreamer = (FileListStreamer)tuple[1];
        fileList.add("Entry1");
        fileList.add("Entry2");
        Assert.assertFalse((boolean)this.isStreamingToFile(fileListStreamer));
    }

    @Test
    public void testAlwaysStreaming() throws Exception {
        Object[] tuple = this.setupAndGetTuple(100, true);
        FileList fileList = (FileList)tuple[0];
        FileListStreamer fileListStreamer = (FileListStreamer)tuple[1];
        Assert.assertFalse((boolean)fileListStreamer.isInitialized());
        fileList.add("Entry1");
        this.waitForStreamingInitialization(fileListStreamer);
        Assert.assertTrue((boolean)this.isStreamingToFile(fileListStreamer));
        fileList.close();
        this.waitForStreamingClosure(fileListStreamer);
    }

    @Test
    public void testStreaminOnCacheHit() throws Exception {
        Object[] tuple = this.setupAndGetTuple(5, false);
        FileList fileList = (FileList)tuple[0];
        FileListStreamer fileListStreamer = (FileListStreamer)tuple[1];
        fileList.add("Entry1");
        fileList.add("Entry2");
        fileList.add("Entry3");
        Thread.sleep(5000L);
        Assert.assertFalse((boolean)fileListStreamer.isInitialized());
        fileList.add("Entry4");
        fileList.add("Entry5");
        this.waitForStreamingInitialization(fileListStreamer);
        fileList.close();
        this.waitForStreamingClosure(fileListStreamer);
    }

    @Test
    public void testConcurrentAdd() throws Exception {
        Object[] tuple = this.setupAndGetTuple(100, false);
        FileList fileList = (FileList)tuple[0];
        FileListStreamer fileListStreamer = (FileListStreamer)tuple[1];
        int numOfEntries = 1000;
        int numOfThreads = 10;
        ExecutorService executorService = Executors.newFixedThreadPool(numOfThreads);
        for (int i = 1; i <= numOfEntries; ++i) {
            executorService.submit(() -> {
                try {
                    fileList.add("someEntry");
                }
                catch (SemanticException e) {
                    throw new RuntimeException("Unbale to add to file list.");
                }
            });
        }
        executorService.awaitTermination(1L, TimeUnit.MINUTES);
        this.waitForStreamingInitialization(fileListStreamer);
        fileList.close();
        this.waitForStreamingClosure(fileListStreamer);
        ArgumentCaptor entryArgs = ArgumentCaptor.forClass(String.class);
        ((BufferedWriter)Mockito.verify((Object)this.bufferedWriter, (VerificationMode)Mockito.times((int)numOfEntries))).write((String)entryArgs.capture());
    }

    private void waitForStreamingInitialization(FileListStreamer fileListStreamer) throws InterruptedException {
        long sleepTime = 1000L;
        int iter = 0;
        while (!fileListStreamer.isInitialized()) {
            Thread.sleep(sleepTime);
            if (++iter != 5) continue;
            throw new IllegalStateException("File Streamer not initialized till 5s.");
        }
    }

    private void waitForStreamingClosure(FileListStreamer fileListStreamer) throws InterruptedException {
        long sleepTime = 1000L;
        int iter = 0;
        while (!this.isStreamingClosedProperly(fileListStreamer)) {
            Thread.sleep(sleepTime);
            if (++iter != 5) continue;
            throw new IllegalStateException("File Streamer not getting closed till 5s.");
        }
    }

    private Object[] setupAndGetTuple(int cacheSize, boolean lazyDataCopy) throws Exception {
        HiveConf hiveConf = (HiveConf)Mockito.mock(HiveConf.class);
        Mockito.when((Object)hiveConf.getBoolVar(HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET)).thenReturn((Object)lazyDataCopy);
        Path backingFile = new Path("/tmp/backingFile");
        LinkedBlockingQueue cache = new LinkedBlockingQueue(cacheSize);
        FileListStreamer fileListStreamer = (FileListStreamer)Mockito.spy((Object)new FileListStreamer(cache, backingFile, (Configuration)hiveConf));
        FileList fileList = new FileList(backingFile, fileListStreamer, cache, hiveConf);
        ((FileListStreamer)Mockito.doReturn((Object)this.bufferedWriter).when((Object)fileListStreamer)).lazyInitWriter();
        Object[] tuple = new Object[]{fileList, fileListStreamer};
        return tuple;
    }

    private boolean isStreamingToFile(FileListStreamer fileListStreamer) {
        return fileListStreamer.isInitialized() && fileListStreamer.isAlive();
    }

    private boolean isStreamingClosedProperly(FileListStreamer fileListStreamer) {
        return fileListStreamer.isInitialized() && !fileListStreamer.isAlive() && fileListStreamer.isValid();
    }
}

