/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hive.ql.exec.spark;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
import java.util.Iterator;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.ql.exec.MapredContext;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.log.PerfLogger;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class SparkRecordHandler {
    protected static final String CLASS_NAME = SparkRecordHandler.class.getName();
    protected final PerfLogger perfLogger = SessionState.getPerfLogger();
    private static final Logger LOG = LoggerFactory.getLogger(SparkRecordHandler.class);
    private final MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
    protected JobConf jc;
    protected OutputCollector<?, ?> oc;
    protected Reporter rp;
    protected boolean abort = false;
    private volatile long rowNumber = 0L;
    private volatile long logThresholdInterval = 15000L;
    boolean anyRow = false;
    private final long maxLogThresholdInterval = 900000L;
    private ScheduledFuture memoryAndRowLogFuture;
    private final ScheduledThreadPoolExecutor memoryAndRowLogExecutor = this.getMemoryAndRowLogExecutor();

    private ScheduledThreadPoolExecutor getMemoryAndRowLogExecutor() {
        ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder().setNameFormat("MemoryAndRowInfoLogger").setDaemon(true).setUncaughtExceptionHandler((t, e) -> LOG.error(t + " throws exception: " + e)).build(), new ThreadPoolExecutor.DiscardPolicy());
        executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
        return executor;
    }

    public <K, V> void init(JobConf job, OutputCollector<K, V> output, Reporter reporter) throws Exception {
        this.jc = job;
        MapredContext.init(false, new JobConf((Configuration)this.jc));
        MapredContext.get().setReporter(reporter);
        this.oc = output;
        this.rp = reporter;
        LOG.info("maximum memory = " + this.memoryMXBean.getHeapMemoryUsage().getMax());
        MemoryInfoLogger memoryInfoLogger = new MemoryInfoLogger();
        memoryInfoLogger.run();
        Utilities.tryLoggingClassPaths(job, LOG);
    }

    public abstract void processRow(Object var1, Object var2) throws IOException;

    public abstract <E> void processRow(Object var1, Iterator<E> var2) throws IOException;

    void incrementRowNumber() {
        ++this.rowNumber;
    }

    public void close() {
        this.memoryAndRowLogExecutor.shutdown();
        this.memoryAndRowLogFuture.cancel(false);
        try {
            if (!this.memoryAndRowLogExecutor.awaitTermination(5L, TimeUnit.SECONDS)) {
                this.memoryAndRowLogExecutor.shutdownNow();
            }
        }
        catch (InterruptedException e) {
            this.memoryAndRowLogExecutor.shutdownNow();
            Thread.currentThread().interrupt();
        }
        if (LOG.isInfoEnabled()) {
            this.logMemoryInfo();
        }
    }

    public abstract boolean getDone();

    private void logMemoryInfo() {
        long usedMemory = this.memoryMXBean.getHeapMemoryUsage().getUsed();
        LOG.info("Processed " + this.rowNumber + " rows: used memory = " + usedMemory);
    }

    public boolean isAbort() {
        return this.abort;
    }

    public void setAbort(boolean abort) {
        this.abort = abort;
    }

    class MemoryInfoLogger
    implements Runnable {
        MemoryInfoLogger() {
        }

        @Override
        public void run() {
            if (SparkRecordHandler.this.anyRow) {
                SparkRecordHandler.this.logThresholdInterval = Math.min(900000L, 2L * SparkRecordHandler.this.logThresholdInterval);
                SparkRecordHandler.this.logMemoryInfo();
            }
            SparkRecordHandler.this.memoryAndRowLogFuture = SparkRecordHandler.this.memoryAndRowLogExecutor.schedule(new MemoryInfoLogger(), SparkRecordHandler.this.logThresholdInterval, TimeUnit.MILLISECONDS);
        }
    }
}

