/*
 * Decompiled with CFR 0.152.
 */
package org.apache.impala.catalog;

import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.impala.catalog.CatalogException;
import org.apache.impala.catalog.CatalogObjectImpl;
import org.apache.impala.catalog.CatalogServiceCatalog;
import org.apache.impala.catalog.DatabaseNotFoundException;
import org.apache.impala.catalog.Db;
import org.apache.impala.catalog.IncompleteTable;
import org.apache.impala.catalog.Table;
import org.apache.impala.catalog.TableLoader;
import org.apache.impala.catalog.TableLoadingException;
import org.apache.impala.common.Pair;
import org.apache.impala.thrift.TTableName;
import org.apache.impala.util.EventSequence;
import org.apache.impala.util.HdfsCachingUtil;
import org.apache.impala.util.NoOpEventSequence;
import org.apache.log4j.Logger;

public class TableLoadingMgr {
    private static final Logger LOG = Logger.getLogger(TableLoadingMgr.class);
    private final LinkedBlockingDeque<TTableName> tableLoadingDeque_ = new LinkedBlockingDeque();
    private final Map<TTableName, AtomicBoolean> tableLoadingBarrier_ = new ConcurrentHashMap<TTableName, AtomicBoolean>();
    private final Map<TTableName, FutureTask<Table>> loadingTables_ = new ConcurrentHashMap<TTableName, FutureTask<Table>>();
    private final Map<TTableName, List<Long>> pendingTableCacheDirs_ = new HashMap<TTableName, List<Long>>();
    private final int numLoadingThreads_;
    private final ExecutorService tblLoadingPool_;
    ExecutorService asyncRefreshThread_ = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("TableAsyncRefreshThread").build());
    private final LinkedBlockingQueue<Pair<TTableName, String>> refreshThreadWork_ = new LinkedBlockingQueue();
    private final CatalogServiceCatalog catalog_;
    private final TableLoader tblLoader_;

    public TableLoadingMgr(CatalogServiceCatalog catalog, int numLoadingThreads) {
        this.catalog_ = catalog;
        this.tblLoader_ = new TableLoader(this.catalog_);
        this.numLoadingThreads_ = numLoadingThreads;
        this.tblLoadingPool_ = Executors.newFixedThreadPool(this.numLoadingThreads_, new ThreadFactoryBuilder().setNameFormat("TableLoadingThread-%d").build());
        this.startTableLoadingSubmitterThreads();
        this.asyncRefreshThread_.submit(new Callable<Void>(){

            @Override
            public Void call() throws Exception {
                while (true) {
                    Pair work = (Pair)TableLoadingMgr.this.refreshThreadWork_.take();
                    TableLoadingMgr.this.execAsyncRefreshWork((TTableName)work.first, (String)work.second);
                }
            }
        });
    }

    public void prioritizeLoad(TTableName tblName) {
        AtomicBoolean isLoading = this.tableLoadingBarrier_.putIfAbsent(tblName, new AtomicBoolean(false));
        if (isLoading != null && isLoading.get()) {
            return;
        }
        this.tableLoadingDeque_.offerFirst(tblName);
    }

    public void backgroundLoad(TTableName tblName) {
        if (this.tableLoadingBarrier_.putIfAbsent(tblName, new AtomicBoolean(false)) == null) {
            this.tableLoadingDeque_.offerLast(tblName);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void watchCacheDirs(List<Long> cacheDirIds, TTableName tblName, String reason) {
        Map<TTableName, List<Long>> map = this.pendingTableCacheDirs_;
        synchronized (map) {
            List<Long> existingCacheReqIds = this.pendingTableCacheDirs_.get(tblName);
            if (existingCacheReqIds == null) {
                existingCacheReqIds = cacheDirIds;
                this.pendingTableCacheDirs_.put(tblName, cacheDirIds);
                this.refreshThreadWork_.add(Pair.create(tblName, reason));
            } else {
                existingCacheReqIds.addAll(cacheDirIds);
            }
        }
    }

    public LoadRequest loadAsync(final TTableName tblName, final long createdEventId, final String reason, final EventSequence catalogTimeline) throws DatabaseNotFoundException {
        final Db parentDb = this.catalog_.getDb(tblName.getDb_name());
        if (parentDb == null) {
            throw new DatabaseNotFoundException("Database '" + tblName.getDb_name() + "' was not found.");
        }
        FutureTask<Table> tableLoadTask = new FutureTask<Table>(new Callable<Table>(){

            @Override
            public Table call() throws Exception {
                catalogTimeline.markEvent("Start loading table");
                return TableLoadingMgr.this.tblLoader_.load(parentDb, tblName.table_name, createdEventId, reason, catalogTimeline);
            }
        });
        FutureTask<Table> existingValue = this.loadingTables_.putIfAbsent(tblName, tableLoadTask);
        if (existingValue == null) {
            this.tblLoadingPool_.execute(tableLoadTask);
        } else {
            tableLoadTask = existingValue;
        }
        return new LoadRequest(tblName, tableLoadTask);
    }

    private void startTableLoadingSubmitterThreads() {
        ExecutorService submitterLoadingPool = Executors.newFixedThreadPool(this.numLoadingThreads_, new ThreadFactoryBuilder().setNameFormat("TableLoadingSubmitterThread-%d").build());
        try {
            for (int i = 0; i < this.numLoadingThreads_; ++i) {
                submitterLoadingPool.execute(new Runnable(){

                    @Override
                    public void run() {
                        while (true) {
                            try {
                                while (true) {
                                    TableLoadingMgr.this.loadNextTable();
                                }
                            }
                            catch (Exception e) {
                                LOG.error((Object)"Error loading table: ", (Throwable)e);
                                continue;
                            }
                            break;
                        }
                    }
                });
            }
        }
        finally {
            submitterLoadingPool.shutdown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void loadNextTable() throws InterruptedException {
        TTableName tblName = this.tableLoadingDeque_.takeFirst();
        AtomicBoolean isLoading = this.tableLoadingBarrier_.get(tblName);
        if (isLoading == null || !isLoading.compareAndSet(false, true)) {
            LOG.info((Object)("Metadata load request already in progress for table: " + tblName.db_name + "." + tblName.table_name));
            return;
        }
        try {
            this.catalog_.getOrLoadTable(tblName.getDb_name(), tblName.getTable_name(), "background load", null);
        }
        catch (CatalogException catalogException) {
        }
        finally {
            this.tableLoadingBarrier_.remove(tblName);
        }
    }

    private void execAsyncRefreshWork(TTableName tblName, String reason) {
        if (!this.waitForCacheDirs(tblName)) {
            return;
        }
        try {
            Table tbl = this.catalog_.getTable(tblName.getDb_name(), tblName.getTable_name());
            if (tbl == null || tbl instanceof IncompleteTable || !tbl.isLoaded()) {
                return;
            }
            this.catalog_.reloadTable(tbl, reason, NoOpEventSequence.INSTANCE);
        }
        catch (CatalogException e) {
            LOG.error((Object)"Error reloading cached table: ", (Throwable)e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean waitForCacheDirs(TTableName tblName) {
        boolean isRefreshNeeded = false;
        block5: while (true) {
            List<Long> cacheDirIds = null;
            Object object = this.pendingTableCacheDirs_;
            synchronized (object) {
                cacheDirIds = this.pendingTableCacheDirs_.remove(tblName);
            }
            if (cacheDirIds == null || cacheDirIds.size() == 0) {
                return isRefreshNeeded;
            }
            isRefreshNeeded = true;
            object = cacheDirIds.iterator();
            while (true) {
                if (!object.hasNext()) continue block5;
                Long dirId = (Long)object.next();
                if (dirId == null) continue;
                try {
                    HdfsCachingUtil.waitForDirective(dirId);
                    continue;
                }
                catch (Exception e) {
                    LOG.error((Object)String.format("Error waiting for cache request %d to complete: ", dirId), (Throwable)e);
                    continue;
                }
                break;
            }
            break;
        }
    }

    public int numRemainingItems() {
        return this.tableLoadingDeque_.size();
    }

    public int numLoadsInProgress() {
        return this.loadingTables_.size();
    }

    public class LoadRequest {
        private final Future<Table> tblTask_;
        private final TTableName tblName_;

        private LoadRequest(TTableName tblName, Future<Table> tblTask) {
            this.tblTask_ = tblTask;
            this.tblName_ = tblName;
        }

        public Table get() {
            Table tbl;
            try {
                LOG.info((Object)("Loading metadata for table: " + this.tblName_.db_name + "." + this.tblName_.table_name));
                LOG.info((Object)String.format("Remaining items in queue: %s. Loads in progress: %s", TableLoadingMgr.this.tableLoadingDeque_.size(), TableLoadingMgr.this.loadingTables_.size()));
                tbl = this.tblTask_.get();
            }
            catch (Exception e) {
                tbl = IncompleteTable.createFailedMetadataLoadTable(TableLoadingMgr.this.catalog_.getDb(this.tblName_.getDb_name()), this.tblName_.getTable_name(), new TableLoadingException(e.getMessage(), e));
            }
            Preconditions.checkState((boolean)((CatalogObjectImpl)tbl).isLoaded());
            return tbl;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void close() {
            Map map = TableLoadingMgr.this.loadingTables_;
            synchronized (map) {
                if (TableLoadingMgr.this.loadingTables_.get(this.tblName_) == this.tblTask_) {
                    TableLoadingMgr.this.loadingTables_.remove(this.tblName_);
                }
            }
        }
    }
}

