/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hive.druid.org.apache.druid.client;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import org.apache.hive.druid.com.fasterxml.jackson.core.JsonFactory;
import org.apache.hive.druid.com.fasterxml.jackson.core.ObjectCodec;
import org.apache.hive.druid.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.hive.druid.com.fasterxml.jackson.dataformat.smile.SmileFactory;
import org.apache.hive.druid.com.fasterxml.jackson.dataformat.smile.SmileGenerator;
import org.apache.hive.druid.com.google.common.base.Function;
import org.apache.hive.druid.com.google.common.base.Predicates;
import org.apache.hive.druid.com.google.common.collect.ImmutableList;
import org.apache.hive.druid.com.google.common.collect.ImmutableMap;
import org.apache.hive.druid.com.google.common.collect.Iterables;
import org.apache.hive.druid.com.google.common.collect.Lists;
import org.apache.hive.druid.org.apache.druid.client.BatchServerInventoryView;
import org.apache.hive.druid.org.apache.druid.client.BrokerSegmentWatcherConfig;
import org.apache.hive.druid.org.apache.druid.client.BrokerServerView;
import org.apache.hive.druid.org.apache.druid.client.DruidServer;
import org.apache.hive.druid.org.apache.druid.client.FilteredServerInventoryView;
import org.apache.hive.druid.org.apache.druid.client.ServerView;
import org.apache.hive.druid.org.apache.druid.client.selector.HighestPriorityTierSelectorStrategy;
import org.apache.hive.druid.org.apache.druid.client.selector.RandomServerSelectorStrategy;
import org.apache.hive.druid.org.apache.druid.client.selector.ServerSelector;
import org.apache.hive.druid.org.apache.druid.client.selector.ServerSelectorStrategy;
import org.apache.hive.druid.org.apache.druid.client.selector.TierSelectorStrategy;
import org.apache.hive.druid.org.apache.druid.curator.CuratorTestBase;
import org.apache.hive.druid.org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.hive.druid.org.apache.druid.java.util.common.Intervals;
import org.apache.hive.druid.org.apache.druid.java.util.common.Pair;
import org.apache.hive.druid.org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.hive.druid.org.apache.druid.java.util.http.client.HttpClient;
import org.apache.hive.druid.org.apache.druid.query.DataSource;
import org.apache.hive.druid.org.apache.druid.query.QueryToolChestWarehouse;
import org.apache.hive.druid.org.apache.druid.query.QueryWatcher;
import org.apache.hive.druid.org.apache.druid.query.TableDataSource;
import org.apache.hive.druid.org.apache.druid.segment.TestHelper;
import org.apache.hive.druid.org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.hive.druid.org.apache.druid.server.coordination.ServerType;
import org.apache.hive.druid.org.apache.druid.server.initialization.ZkPathsConfig;
import org.apache.hive.druid.org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.hive.druid.org.apache.druid.timeline.DataSegment;
import org.apache.hive.druid.org.apache.druid.timeline.TimelineObjectHolder;
import org.apache.hive.druid.org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.hive.druid.org.apache.druid.timeline.partition.NoneShardSpec;
import org.apache.hive.druid.org.apache.druid.timeline.partition.PartitionChunk;
import org.apache.hive.druid.org.apache.druid.timeline.partition.PartitionHolder;
import org.apache.hive.druid.org.apache.druid.timeline.partition.ShardSpec;
import org.apache.hive.druid.org.apache.druid.timeline.partition.SingleElementPartitionChunk;
import org.easymock.EasyMock;
import org.joda.time.Interval;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class BrokerServerViewTest
extends CuratorTestBase {
    private final ObjectMapper jsonMapper = TestHelper.makeJsonMapper();
    private final ZkPathsConfig zkPathsConfig = new ZkPathsConfig();
    private CountDownLatch segmentViewInitLatch;
    private CountDownLatch segmentAddedLatch;
    private CountDownLatch segmentRemovedLatch;
    private BatchServerInventoryView baseView;
    private BrokerServerView brokerServerView;

    @Before
    public void setUp() throws Exception {
        this.setupServerAndCurator();
        this.curator.start();
        this.curator.blockUntilConnected();
    }

    @Test
    public void testSingleServerAddedRemovedSegment() throws Exception {
        this.segmentViewInitLatch = new CountDownLatch(1);
        this.segmentAddedLatch = new CountDownLatch(1);
        this.segmentRemovedLatch = new CountDownLatch(1);
        this.setupViews();
        DruidServer druidServer = new DruidServer("localhost:1234", "localhost:1234", null, 10000000L, ServerType.HISTORICAL, "default_tier", 0);
        this.setupZNodeForServer(druidServer, this.zkPathsConfig, this.jsonMapper);
        DataSegment segment = this.dataSegmentWithIntervalAndVersion("2014-10-20T00:00:00Z/P1D", "v1");
        this.announceSegmentForServer(druidServer, segment, this.zkPathsConfig, this.jsonMapper);
        Assert.assertTrue((boolean)this.timing.forWaiting().awaitLatch(this.segmentViewInitLatch));
        Assert.assertTrue((boolean)this.timing.forWaiting().awaitLatch(this.segmentAddedLatch));
        VersionedIntervalTimeline timeline = this.brokerServerView.getTimeline((DataSource)new TableDataSource("test_broker_server_view"));
        List serverLookupRes = timeline.lookup(Intervals.of((String)"2014-10-20T00:00:00Z/P1D"));
        Assert.assertEquals((long)1L, (long)serverLookupRes.size());
        TimelineObjectHolder actualTimelineObjectHolder = (TimelineObjectHolder)serverLookupRes.get(0);
        Assert.assertEquals((Object)Intervals.of((String)"2014-10-20T00:00:00Z/P1D"), (Object)actualTimelineObjectHolder.getInterval());
        Assert.assertEquals((Object)"v1", (Object)actualTimelineObjectHolder.getVersion());
        PartitionHolder actualPartitionHolder = actualTimelineObjectHolder.getObject();
        Assert.assertTrue((boolean)actualPartitionHolder.isComplete());
        Assert.assertEquals((long)1L, (long)Iterables.size((Iterable)actualPartitionHolder));
        ServerSelector selector = (ServerSelector)((PartitionChunk)actualPartitionHolder.iterator().next()).getObject();
        Assert.assertFalse((boolean)selector.isEmpty());
        Assert.assertEquals((Object)segment, (Object)selector.getSegment());
        Assert.assertEquals((Object)druidServer, (Object)selector.pick().getServer());
        this.unannounceSegmentForServer(druidServer, segment, this.zkPathsConfig);
        Assert.assertTrue((boolean)this.timing.forWaiting().awaitLatch(this.segmentRemovedLatch));
        Assert.assertEquals((long)0L, (long)timeline.lookup(Intervals.of((String)"2014-10-20T00:00:00Z/P1D")).size());
        Assert.assertNull((Object)timeline.findEntry(Intervals.of((String)"2014-10-20T00:00:00Z/P1D"), (Object)"v1"));
    }

    @Test
    public void testMultipleServerAddedRemovedSegment() throws Exception {
        this.segmentViewInitLatch = new CountDownLatch(1);
        this.segmentAddedLatch = new CountDownLatch(5);
        this.segmentRemovedLatch = new CountDownLatch(1);
        this.setupViews();
        List druidServers = Lists.transform((List)ImmutableList.of((Object)"locahost:0", (Object)"localhost:1", (Object)"localhost:2", (Object)"localhost:3", (Object)"localhost:4"), (Function)new Function<String, DruidServer>(){

            public DruidServer apply(String input) {
                return new DruidServer(input, input, null, 10000000L, ServerType.HISTORICAL, "default_tier", 0);
            }
        });
        for (DruidServer druidServer : druidServers) {
            this.setupZNodeForServer(druidServer, this.zkPathsConfig, this.jsonMapper);
        }
        List segments = Lists.transform((List)ImmutableList.of((Object)Pair.of((Object)"2011-04-01/2011-04-03", (Object)"v1"), (Object)Pair.of((Object)"2011-04-03/2011-04-06", (Object)"v1"), (Object)Pair.of((Object)"2011-04-01/2011-04-09", (Object)"v2"), (Object)Pair.of((Object)"2011-04-06/2011-04-09", (Object)"v3"), (Object)Pair.of((Object)"2011-04-01/2011-04-02", (Object)"v3")), (Function)new Function<Pair<String, String>, DataSegment>(){

            public DataSegment apply(Pair<String, String> input) {
                return BrokerServerViewTest.this.dataSegmentWithIntervalAndVersion((String)input.lhs, (String)input.rhs);
            }
        });
        for (int i = 0; i < 5; ++i) {
            this.announceSegmentForServer((DruidServer)druidServers.get(i), (DataSegment)segments.get(i), this.zkPathsConfig, this.jsonMapper);
        }
        Assert.assertTrue((boolean)this.timing.forWaiting().awaitLatch(this.segmentViewInitLatch));
        Assert.assertTrue((boolean)this.timing.forWaiting().awaitLatch(this.segmentAddedLatch));
        VersionedIntervalTimeline timeline = this.brokerServerView.getTimeline((DataSource)new TableDataSource("test_broker_server_view"));
        this.assertValues(Arrays.asList(this.createExpected("2011-04-01/2011-04-02", "v3", (DruidServer)druidServers.get(4), (DataSegment)segments.get(4)), this.createExpected("2011-04-02/2011-04-06", "v2", (DruidServer)druidServers.get(2), (DataSegment)segments.get(2)), this.createExpected("2011-04-06/2011-04-09", "v3", (DruidServer)druidServers.get(3), (DataSegment)segments.get(3))), timeline.lookup(Intervals.of((String)"2011-04-01/2011-04-09")));
        this.unannounceSegmentForServer((DruidServer)druidServers.get(2), (DataSegment)segments.get(2), this.zkPathsConfig);
        Assert.assertTrue((boolean)this.timing.forWaiting().awaitLatch(this.segmentRemovedLatch));
        this.segmentRemovedLatch = new CountDownLatch(4);
        timeline = this.brokerServerView.getTimeline((DataSource)new TableDataSource("test_broker_server_view"));
        this.assertValues(Arrays.asList(this.createExpected("2011-04-01/2011-04-02", "v3", (DruidServer)druidServers.get(4), (DataSegment)segments.get(4)), this.createExpected("2011-04-02/2011-04-03", "v1", (DruidServer)druidServers.get(0), (DataSegment)segments.get(0)), this.createExpected("2011-04-03/2011-04-06", "v1", (DruidServer)druidServers.get(1), (DataSegment)segments.get(1)), this.createExpected("2011-04-06/2011-04-09", "v3", (DruidServer)druidServers.get(3), (DataSegment)segments.get(3))), timeline.lookup(Intervals.of((String)"2011-04-01/2011-04-09")));
        for (int i = 0; i < 5; ++i) {
            if (i == 2) continue;
            this.unannounceSegmentForServer((DruidServer)druidServers.get(i), (DataSegment)segments.get(i), this.zkPathsConfig);
        }
        Assert.assertTrue((boolean)this.timing.forWaiting().awaitLatch(this.segmentRemovedLatch));
        Assert.assertEquals((long)0L, (long)timeline.lookup(Intervals.of((String)"2011-04-01/2011-04-09")).size());
    }

    private Pair<Interval, Pair<String, Pair<DruidServer, DataSegment>>> createExpected(String intervalStr, String version, DruidServer druidServer, DataSegment segment) {
        return Pair.of((Object)Intervals.of((String)intervalStr), (Object)Pair.of((Object)version, (Object)Pair.of((Object)druidServer, (Object)segment)));
    }

    private void assertValues(List<Pair<Interval, Pair<String, Pair<DruidServer, DataSegment>>>> expected, List<TimelineObjectHolder> actual) {
        Assert.assertEquals((long)expected.size(), (long)actual.size());
        for (int i = 0; i < expected.size(); ++i) {
            Pair<Interval, Pair<String, Pair<DruidServer, DataSegment>>> expectedPair = expected.get(i);
            TimelineObjectHolder actualTimelineObjectHolder = actual.get(i);
            Assert.assertEquals((Object)expectedPair.lhs, (Object)actualTimelineObjectHolder.getInterval());
            Assert.assertEquals((Object)((Pair)expectedPair.rhs).lhs, (Object)actualTimelineObjectHolder.getVersion());
            PartitionHolder actualPartitionHolder = actualTimelineObjectHolder.getObject();
            Assert.assertTrue((boolean)actualPartitionHolder.isComplete());
            Assert.assertEquals((long)1L, (long)Iterables.size((Iterable)actualPartitionHolder));
            ServerSelector selector = (ServerSelector)((SingleElementPartitionChunk)actualPartitionHolder.iterator().next()).getObject();
            Assert.assertFalse((boolean)selector.isEmpty());
            Assert.assertEquals((Object)((Pair)((Pair)expectedPair.rhs).rhs).lhs, (Object)selector.pick().getServer());
            Assert.assertEquals((Object)((Pair)((Pair)expectedPair.rhs).rhs).rhs, (Object)selector.getSegment());
        }
    }

    private void setupViews() throws Exception {
        this.baseView = new BatchServerInventoryView(this.zkPathsConfig, this.curator, this.jsonMapper, Predicates.alwaysTrue()){

            public void registerSegmentCallback(Executor exec, final ServerView.SegmentCallback callback) {
                super.registerSegmentCallback(exec, new ServerView.SegmentCallback(){

                    public ServerView.CallbackAction segmentAdded(DruidServerMetadata server, DataSegment segment) {
                        ServerView.CallbackAction res = callback.segmentAdded(server, segment);
                        BrokerServerViewTest.this.segmentAddedLatch.countDown();
                        return res;
                    }

                    public ServerView.CallbackAction segmentRemoved(DruidServerMetadata server, DataSegment segment) {
                        ServerView.CallbackAction res = callback.segmentRemoved(server, segment);
                        BrokerServerViewTest.this.segmentRemovedLatch.countDown();
                        return res;
                    }

                    public ServerView.CallbackAction segmentViewInitialized() {
                        ServerView.CallbackAction res = callback.segmentViewInitialized();
                        BrokerServerViewTest.this.segmentViewInitLatch.countDown();
                        return res;
                    }
                });
            }
        };
        this.brokerServerView = new BrokerServerView((QueryToolChestWarehouse)EasyMock.createMock(QueryToolChestWarehouse.class), (QueryWatcher)EasyMock.createMock(QueryWatcher.class), this.getSmileMapper(), (HttpClient)EasyMock.createMock(HttpClient.class), (FilteredServerInventoryView)this.baseView, (TierSelectorStrategy)new HighestPriorityTierSelectorStrategy((ServerSelectorStrategy)new RandomServerSelectorStrategy()), (ServiceEmitter)new NoopServiceEmitter(), new BrokerSegmentWatcherConfig());
        this.baseView.start();
    }

    private DataSegment dataSegmentWithIntervalAndVersion(String intervalStr, String version) {
        return DataSegment.builder().dataSource("test_broker_server_view").interval(Intervals.of((String)intervalStr)).loadSpec((Map)ImmutableMap.of((Object)"type", (Object)"local", (Object)"path", (Object)"somewhere")).version(version).dimensions((List)ImmutableList.of()).metrics((List)ImmutableList.of()).shardSpec((ShardSpec)NoneShardSpec.instance()).binaryVersion(Integer.valueOf(9)).size(0L).build();
    }

    public ObjectMapper getSmileMapper() {
        SmileFactory smileFactory = new SmileFactory();
        smileFactory.configure(SmileGenerator.Feature.ENCODE_BINARY_AS_7BIT, false);
        smileFactory.delegateToTextual(true);
        DefaultObjectMapper retVal = new DefaultObjectMapper((JsonFactory)smileFactory);
        retVal.getFactory().setCodec((ObjectCodec)retVal);
        return retVal;
    }

    @After
    public void tearDown() throws Exception {
        this.baseView.stop();
        this.tearDownServerAndCurator();
    }
}

