/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.core.io;

import java.util.Calendar;
import java.util.HashSet;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
import org.apache.flink.core.io.LocatableInputSplit;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

class LocatableSplitAssignerTest {
    LocatableSplitAssignerTest() {
    }

    @Test
    void testSerialSplitAssignmentWithNullHost() {
        int NUM_SPLITS = 50;
        String[][] hosts = new String[][]{{"localhost"}, new String[0], null};
        HashSet<LocatableInputSplit> splits = new HashSet<LocatableInputSplit>();
        for (int i = 0; i < 50; ++i) {
            splits.add(new LocatableInputSplit(i, hosts[i % 3]));
        }
        LocatableInputSplitAssigner ia = new LocatableInputSplitAssigner(splits);
        LocatableInputSplit is = null;
        while ((is = ia.getNextInputSplit(null, 0)) != null) {
            Assertions.assertThat((boolean)splits.remove(is)).isTrue();
        }
        Assertions.assertThat(splits).isEmpty();
        Assertions.assertThat((Object)ia.getNextInputSplit("", 0)).isNull();
        Assertions.assertThat((int)ia.getNumberOfRemoteAssignments()).isEqualTo(50);
        Assertions.assertThat((int)ia.getNumberOfLocalAssignments()).isZero();
    }

    @Test
    void testSerialSplitAssignmentAllForSameHost() {
        int NUM_SPLITS = 50;
        HashSet<LocatableInputSplit> splits = new HashSet<LocatableInputSplit>();
        for (int i = 0; i < 50; ++i) {
            splits.add(new LocatableInputSplit(i, "testhost"));
        }
        LocatableInputSplitAssigner ia = new LocatableInputSplitAssigner(splits);
        LocatableInputSplit is = null;
        while ((is = ia.getNextInputSplit("testhost", 0)) != null) {
            Assertions.assertThat((boolean)splits.remove(is)).isTrue();
        }
        Assertions.assertThat(splits).isEmpty();
        Assertions.assertThat((Object)ia.getNextInputSplit("", 0)).isNull();
        Assertions.assertThat((int)ia.getNumberOfRemoteAssignments()).isZero();
        Assertions.assertThat((int)ia.getNumberOfLocalAssignments()).isEqualTo(50);
    }

    @Test
    void testSerialSplitAssignmentAllForRemoteHost() {
        String[] hosts = new String[]{"host1", "host1", "host1", "host2", "host2", "host3"};
        int NUM_SPLITS = 10 * hosts.length;
        HashSet<LocatableInputSplit> splits = new HashSet<LocatableInputSplit>();
        for (int i = 0; i < NUM_SPLITS; ++i) {
            splits.add(new LocatableInputSplit(i, hosts[i % hosts.length]));
        }
        LocatableInputSplitAssigner ia = new LocatableInputSplitAssigner(splits);
        LocatableInputSplit is = null;
        while ((is = ia.getNextInputSplit("testhost", 0)) != null) {
            Assertions.assertThat((boolean)splits.remove(is)).isTrue();
        }
        Assertions.assertThat(splits).isEmpty();
        Assertions.assertThat((Object)ia.getNextInputSplit("anotherHost", 0)).isNull();
        Assertions.assertThat((int)ia.getNumberOfRemoteAssignments()).isEqualTo(NUM_SPLITS);
        Assertions.assertThat((int)ia.getNumberOfLocalAssignments()).isZero();
    }

    @Test
    void testSerialSplitAssignmentSomeForRemoteHost() {
        int i;
        String[] hosts = new String[]{"host1", "host2", "host3"};
        int NUM_LOCAL_HOST1_SPLITS = 20;
        int NUM_LOCAL_HOST2_SPLITS = 10;
        int NUM_REMOTE_SPLITS = 30;
        int NUM_LOCAL_SPLITS = 30;
        int splitCnt = 0;
        HashSet<LocatableInputSplit> splits = new HashSet<LocatableInputSplit>();
        for (i = 0; i < 20; ++i) {
            splits.add(new LocatableInputSplit(splitCnt++, "host1"));
        }
        for (i = 0; i < 10; ++i) {
            splits.add(new LocatableInputSplit(splitCnt++, "host2"));
        }
        for (i = 0; i < 30; ++i) {
            splits.add(new LocatableInputSplit(splitCnt++, "remoteHost"));
        }
        LocatableInputSplitAssigner ia = new LocatableInputSplitAssigner(splits);
        LocatableInputSplit is = null;
        int i2 = 0;
        while ((is = ia.getNextInputSplit(hosts[i2++ % hosts.length], 0)) != null) {
            Assertions.assertThat((boolean)splits.remove(is)).isTrue();
        }
        Assertions.assertThat(splits).isEmpty();
        Assertions.assertThat((Object)ia.getNextInputSplit("anotherHost", 0)).isNull();
        Assertions.assertThat((int)ia.getNumberOfRemoteAssignments()).isEqualTo(30);
        Assertions.assertThat((int)ia.getNumberOfLocalAssignments()).isEqualTo(30);
    }

    @Test
    void testSerialSplitAssignmentMultiLocalHost() {
        int i;
        String[] localHosts = new String[]{"local1", "local2", "local3"};
        String[] remoteHosts = new String[]{"remote1", "remote2", "remote3"};
        String[] requestingHosts = new String[]{"local3", "local2", "local1", "other"};
        int NUM_THREE_LOCAL_SPLITS = 10;
        int NUM_TWO_LOCAL_SPLITS = 10;
        int NUM_ONE_LOCAL_SPLITS = 10;
        int NUM_LOCAL_SPLITS = 30;
        int NUM_REMOTE_SPLITS = 10;
        int NUM_SPLITS = 40;
        String[] threeLocalHosts = localHosts;
        String[] twoLocalHosts = new String[]{localHosts[0], localHosts[1], remoteHosts[0]};
        String[] oneLocalHost = new String[]{localHosts[0], remoteHosts[0], remoteHosts[1]};
        String[] noLocalHost = remoteHosts;
        int splitCnt = 0;
        HashSet<LocatableInputSplit> splits = new HashSet<LocatableInputSplit>();
        for (i = 0; i < 10; ++i) {
            splits.add(new LocatableInputSplit(splitCnt++, threeLocalHosts));
        }
        for (i = 0; i < 10; ++i) {
            splits.add(new LocatableInputSplit(splitCnt++, twoLocalHosts));
        }
        for (i = 0; i < 10; ++i) {
            splits.add(new LocatableInputSplit(splitCnt++, oneLocalHost));
        }
        for (i = 0; i < 10; ++i) {
            splits.add(new LocatableInputSplit(splitCnt++, noLocalHost));
        }
        LocatableInputSplitAssigner ia = new LocatableInputSplitAssigner(splits);
        LocatableInputSplit is = null;
        for (int i2 = 0; i2 < 40; ++i2) {
            String host = requestingHosts[i2 % requestingHosts.length];
            is = ia.getNextInputSplit(host, 0);
            Assertions.assertThat((Object)is).isNotNull();
            Assertions.assertThat((boolean)splits.remove(is)).isTrue();
            if (host.equals(localHosts[0])) {
                Assertions.assertThat((Object[])is.getHostnames()).isEqualTo((Object)oneLocalHost);
                continue;
            }
            if (host.equals(localHosts[1])) {
                Assertions.assertThat((Object[])is.getHostnames()).isEqualTo((Object)twoLocalHosts);
                continue;
            }
            if (host.equals(localHosts[2])) {
                Assertions.assertThat((Object[])is.getHostnames()).isEqualTo((Object)threeLocalHosts);
                continue;
            }
            Assertions.assertThat((Object[])is.getHostnames()).isEqualTo((Object)noLocalHost);
        }
        Assertions.assertThat(splits).isEmpty();
        Assertions.assertThat((Object)ia.getNextInputSplit("anotherHost", 0)).isNull();
        Assertions.assertThat((int)ia.getNumberOfRemoteAssignments()).isEqualTo(10);
        Assertions.assertThat((int)ia.getNumberOfLocalAssignments()).isEqualTo(30);
    }

    @Test
    void testSerialSplitAssignmentMixedLocalHost() {
        String[] hosts = new String[]{"host1", "host1", "host1", "host2", "host2", "host3"};
        int NUM_SPLITS = 10 * hosts.length;
        HashSet<LocatableInputSplit> splits = new HashSet<LocatableInputSplit>();
        for (int i = 0; i < NUM_SPLITS; ++i) {
            splits.add(new LocatableInputSplit(i, hosts[i % hosts.length]));
        }
        LocatableInputSplitAssigner ia = new LocatableInputSplitAssigner(splits);
        LocatableInputSplit is = null;
        int i = 0;
        while ((is = ia.getNextInputSplit(hosts[i++ % hosts.length], 0)) != null) {
            Assertions.assertThat((boolean)splits.remove(is)).isTrue();
        }
        Assertions.assertThat(splits).isEmpty();
        Assertions.assertThat((Object)ia.getNextInputSplit("anotherHost", 0)).isNull();
        Assertions.assertThat((int)ia.getNumberOfRemoteAssignments()).isZero();
        Assertions.assertThat((int)ia.getNumberOfLocalAssignments()).isEqualTo(NUM_SPLITS);
    }

    @Test
    void testConcurrentSplitAssignmentNullHost() throws InterruptedException {
        int i;
        int NUM_THREADS = 10;
        int NUM_SPLITS = 500;
        int SUM_OF_IDS = 124750;
        String[][] hosts = new String[][]{{"localhost"}, new String[0], null};
        HashSet<LocatableInputSplit> splits = new HashSet<LocatableInputSplit>();
        for (int i2 = 0; i2 < 500; ++i2) {
            splits.add(new LocatableInputSplit(i2, hosts[i2 % 3]));
        }
        LocatableInputSplitAssigner ia = new LocatableInputSplitAssigner(splits);
        AtomicInteger splitsRetrieved = new AtomicInteger(0);
        AtomicInteger sumOfIds = new AtomicInteger(0);
        Runnable retriever = () -> {
            LocatableInputSplit split;
            while ((split = ia.getNextInputSplit(null, 0)) != null) {
                splitsRetrieved.incrementAndGet();
                sumOfIds.addAndGet(split.getSplitNumber());
            }
        };
        Thread[] threads = new Thread[10];
        for (i = 0; i < 10; ++i) {
            threads[i] = new Thread(retriever);
            threads[i].setDaemon(true);
        }
        for (i = 0; i < 10; ++i) {
            threads[i].start();
        }
        for (i = 0; i < 10; ++i) {
            threads[i].join(5000L);
        }
        for (i = 0; i < 10; ++i) {
            Assertions.assertThat((boolean)threads[i].isAlive()).isFalse();
        }
        Assertions.assertThat((AtomicInteger)splitsRetrieved).hasValue(500);
        Assertions.assertThat((AtomicInteger)sumOfIds).hasValue(124750);
        Assertions.assertThat((Object)ia.getNextInputSplit("", 0)).isNull();
        Assertions.assertThat((int)ia.getNumberOfRemoteAssignments()).isEqualTo(500);
        Assertions.assertThat((int)ia.getNumberOfLocalAssignments()).isZero();
    }

    @Test
    void testConcurrentSplitAssignmentForSingleHost() throws InterruptedException {
        int i;
        int NUM_THREADS = 10;
        int NUM_SPLITS = 500;
        int SUM_OF_IDS = 124750;
        HashSet<LocatableInputSplit> splits = new HashSet<LocatableInputSplit>();
        for (int i2 = 0; i2 < 500; ++i2) {
            splits.add(new LocatableInputSplit(i2, "testhost"));
        }
        LocatableInputSplitAssigner ia = new LocatableInputSplitAssigner(splits);
        AtomicInteger splitsRetrieved = new AtomicInteger(0);
        AtomicInteger sumOfIds = new AtomicInteger(0);
        Runnable retriever = () -> {
            LocatableInputSplit split;
            while ((split = ia.getNextInputSplit("testhost", 0)) != null) {
                splitsRetrieved.incrementAndGet();
                sumOfIds.addAndGet(split.getSplitNumber());
            }
        };
        Thread[] threads = new Thread[10];
        for (i = 0; i < 10; ++i) {
            threads[i] = new Thread(retriever);
            threads[i].setDaemon(true);
        }
        for (i = 0; i < 10; ++i) {
            threads[i].start();
        }
        for (i = 0; i < 10; ++i) {
            threads[i].join(5000L);
        }
        for (i = 0; i < 10; ++i) {
            Assertions.assertThat((boolean)threads[i].isAlive()).isFalse();
        }
        Assertions.assertThat((AtomicInteger)splitsRetrieved).hasValue(500);
        Assertions.assertThat((AtomicInteger)sumOfIds).hasValue(124750);
        Assertions.assertThat((Object)ia.getNextInputSplit("testhost", 0)).isNull();
        Assertions.assertThat((int)ia.getNumberOfRemoteAssignments()).isZero();
        Assertions.assertThat((int)ia.getNumberOfLocalAssignments()).isEqualTo(500);
    }

    @Test
    void testConcurrentSplitAssignmentForMultipleHosts() throws InterruptedException {
        int i;
        int NUM_THREADS = 10;
        int NUM_SPLITS = 500;
        int SUM_OF_IDS = 124750;
        String[] hosts = new String[]{"host1", "host1", "host1", "host2", "host2", "host3"};
        HashSet<LocatableInputSplit> splits = new HashSet<LocatableInputSplit>();
        for (int i2 = 0; i2 < 500; ++i2) {
            splits.add(new LocatableInputSplit(i2, hosts[i2 % hosts.length]));
        }
        LocatableInputSplitAssigner ia = new LocatableInputSplitAssigner(splits);
        AtomicInteger splitsRetrieved = new AtomicInteger(0);
        AtomicInteger sumOfIds = new AtomicInteger(0);
        Runnable retriever = () -> {
            LocatableInputSplit split;
            String threadHost = hosts[(int)(Math.random() * (double)hosts.length)];
            while ((split = ia.getNextInputSplit(threadHost, 0)) != null) {
                splitsRetrieved.incrementAndGet();
                sumOfIds.addAndGet(split.getSplitNumber());
            }
        };
        Thread[] threads = new Thread[10];
        for (i = 0; i < 10; ++i) {
            threads[i] = new Thread(retriever);
            threads[i].setDaemon(true);
        }
        for (i = 0; i < 10; ++i) {
            threads[i].start();
        }
        for (i = 0; i < 10; ++i) {
            threads[i].join(5000L);
        }
        for (i = 0; i < 10; ++i) {
            Assertions.assertThat((boolean)threads[i].isAlive()).isFalse();
        }
        Assertions.assertThat((AtomicInteger)splitsRetrieved).hasValue(500);
        Assertions.assertThat((AtomicInteger)sumOfIds).hasValue(124750);
        Assertions.assertThat((Object)ia.getNextInputSplit("testhost", 0)).isNull();
        Assertions.assertThat((int)ia.getNumberOfLocalAssignments()).isGreaterThanOrEqualTo(500 / hosts.length);
    }

    @Test
    void testAssignmentOfManySplitsRandomly() {
        int i;
        long seed = Calendar.getInstance().getTimeInMillis();
        int NUM_SPLITS = 65536;
        String[] splitHosts = new String[256];
        String[] requestingHosts = new String[256];
        Random rand = new Random(seed);
        for (i = 0; i < splitHosts.length; ++i) {
            splitHosts[i] = "localHost" + i;
        }
        for (i = 0; i < requestingHosts.length; ++i) {
            requestingHosts[i] = i % 2 == 0 ? "localHost" + i : "remoteHost" + i;
        }
        String[] stringArray = new String[]{};
        HashSet<String> hosts = new HashSet<String>();
        HashSet<LocatableInputSplit> splits = new HashSet<LocatableInputSplit>();
        for (int i2 = 0; i2 < 65536; ++i2) {
            while (hosts.size() < 3) {
                hosts.add(splitHosts[rand.nextInt(splitHosts.length)]);
            }
            splits.add(new LocatableInputSplit(i2, hosts.toArray(stringArray)));
            hosts.clear();
        }
        LocatableInputSplitAssigner ia = new LocatableInputSplitAssigner(splits);
        for (int i3 = 0; i3 < 65536; ++i3) {
            LocatableInputSplit split = ia.getNextInputSplit(requestingHosts[rand.nextInt(requestingHosts.length)], 0);
            Assertions.assertThat((Object)split).isNotNull();
            Assertions.assertThat((boolean)splits.remove(split)).isTrue();
        }
        Assertions.assertThat(splits).isEmpty();
        Assertions.assertThat((Object)ia.getNextInputSplit("testHost", 0)).isNull();
    }
}

