/*
 * Decompiled with CFR 0.152.
 */
package org.apache.phoenix.iterate;

import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.cache.ServerCacheClient;
import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.iterate.BaseResultIterators;
import org.apache.phoenix.iterate.DefaultParallelScanGrouper;
import org.apache.phoenix.iterate.ParallelIteratorFactory;
import org.apache.phoenix.iterate.ParallelScanGrouper;
import org.apache.phoenix.iterate.PeekingResultIterator;
import org.apache.phoenix.iterate.TableResultIterator;
import org.apache.phoenix.job.JobManager;
import org.apache.phoenix.monitoring.GlobalClientMetrics;
import org.apache.phoenix.monitoring.ReadMetricQueue;
import org.apache.phoenix.monitoring.ScanMetricsHolder;
import org.apache.phoenix.monitoring.TaskExecutionMetricsHolder;
import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
import org.apache.phoenix.trace.util.Tracing;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.LogUtil;
import org.apache.phoenix.util.ScanUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ParallelIterators
extends BaseResultIterators {
    private static final Logger LOGGER = LoggerFactory.getLogger(ParallelIterators.class);
    private static final String NAME = "PARALLEL";
    private final ParallelIteratorFactory iteratorFactory;
    private final boolean initFirstScanOnly;

    public ParallelIterators(QueryPlan plan, Integer perScanLimit, ParallelIteratorFactory iteratorFactory, ParallelScanGrouper scanGrouper, Scan scan, boolean initFirstScanOnly, Map<ImmutableBytesPtr, ServerCacheClient.ServerCache> caches, QueryPlan dataPlan) throws SQLException {
        super(plan, perScanLimit, null, scanGrouper, scan, caches, dataPlan);
        this.iteratorFactory = iteratorFactory;
        this.initFirstScanOnly = initFirstScanOnly;
    }

    public ParallelIterators(QueryPlan plan, Integer perScanLimit, ParallelIteratorFactory iteratorFactory, Scan scan, boolean initOneScanPerRegion, Map<ImmutableBytesPtr, ServerCacheClient.ServerCache> caches, QueryPlan dataPlan) throws SQLException {
        this(plan, perScanLimit, iteratorFactory, DefaultParallelScanGrouper.getInstance(), scan, initOneScanPerRegion, caches, dataPlan);
    }

    @Override
    protected boolean isSerial() {
        return false;
    }

    @Override
    protected void submitWork(List<List<Scan>> nestedScans, List<List<Pair<Scan, Future<PeekingResultIterator>>>> nestedFutures, final Queue<PeekingResultIterator> allIterators, int estFlattenedSize, final boolean isReverse, ParallelScanGrouper scanGrouper, long maxQueryEndTime) throws SQLException {
        int numScans;
        ThreadPoolExecutor executor = this.context.getConnection().getQueryServices().getExecutor();
        ArrayList scanLocations = Lists.newArrayListWithExpectedSize((int)estFlattenedSize);
        for (int i = 0; i < nestedScans.size(); ++i) {
            List<Scan> scans = nestedScans.get(i);
            numScans = scans.size();
            ArrayList futures = Lists.newArrayListWithExpectedSize((int)numScans);
            nestedFutures.add(futures);
            for (int j = 0; j < numScans; ++j) {
                Scan scan = nestedScans.get(i).get(j);
                scanLocations.add(new BaseResultIterators.ScanLocator(scan, i, j, j == 0, j == numScans - 1));
                futures.add(null);
            }
        }
        Collections.shuffle(scanLocations);
        ReadMetricQueue readMetrics = this.context.getReadMetricsQueue();
        final String physicalTableName = this.tableRef.getTable().getPhysicalName().getString();
        numScans = scanLocations.size();
        this.context.getOverallQueryMetrics().updateNumParallelScans(numScans);
        GlobalClientMetrics.GLOBAL_NUM_PARALLEL_SCANS.update(numScans);
        long renewLeaseThreshold = this.context.getConnection().getQueryServices().getRenewLeaseThresholdMilliSeconds();
        for (final BaseResultIterators.ScanLocator scanLocation : scanLocations) {
            final Scan scan = scanLocation.getScan();
            ScanMetricsHolder scanMetricsHolder = ScanMetricsHolder.getInstance(readMetrics, physicalTableName, scan, this.context.getConnection().getLogLevel());
            final TaskExecutionMetricsHolder taskMetrics = new TaskExecutionMetricsHolder(readMetrics, physicalTableName);
            final TableResultIterator tableResultItr = this.context.getConnection().getTableResultIteratorFactory().newIterator(this.mutationState, this.tableRef, scan, scanMetricsHolder, renewLeaseThreshold, this.plan, scanGrouper, this.caches, maxQueryEndTime);
            this.context.getConnection().addIteratorForLeaseRenewal(tableResultItr);
            Future<PeekingResultIterator> future = executor.submit(Tracing.wrap(new JobManager.JobCallable<PeekingResultIterator>(){

                @Override
                public PeekingResultIterator call() throws Exception {
                    long startTime = EnvironmentEdgeManager.currentTimeMillis();
                    PeekingResultIterator iterator = ParallelIterators.this.iteratorFactory.newIterator(ParallelIterators.this.context, tableResultItr, scan, physicalTableName, ParallelIterators.this.plan);
                    if (ParallelIterators.this.initFirstScanOnly) {
                        if (!isReverse && scanLocation.isFirstScan() || isReverse && scanLocation.isLastScan()) {
                            iterator.peek();
                        }
                    } else {
                        iterator.peek();
                    }
                    if (LOGGER.isDebugEnabled()) {
                        LOGGER.debug(LogUtil.addCustomAnnotations("Id: " + ParallelIterators.this.scanId + ", Time: " + (EnvironmentEdgeManager.currentTimeMillis() - startTime) + "ms, Table: " + physicalTableName + ", Scan: " + scan, ScanUtil.getCustomAnnotations(scan)));
                    }
                    allIterators.add(iterator);
                    return iterator;
                }

                @Override
                public Object getJobId() {
                    return ParallelIterators.this;
                }

                @Override
                public TaskExecutionMetricsHolder getTaskExecutionMetric() {
                    return taskMetrics;
                }
            }, "Parallel scanner for table: " + this.tableRef.getTable().getPhysicalName().getString()));
            nestedFutures.get(scanLocation.getOuterListIndex()).set(scanLocation.getInnerListIndex(), (Pair<Scan, Future<PeekingResultIterator>>)new Pair((Object)scan, future));
        }
    }

    @Override
    protected String getName() {
        return NAME;
    }
}

