/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.service.impl;

import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.DataInputStream;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.security.PrivilegedExceptionAction;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper;
import org.apache.tez.common.GuavaShim;
import org.apache.tez.common.Preconditions;
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.TezExecutors;
import org.apache.tez.common.TezTaskUmbilicalProtocol;
import org.apache.tez.common.security.JobTokenIdentifier;
import org.apache.tez.common.security.TokenCache;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.hadoop.shim.DefaultHadoopShim;
import org.apache.tez.hadoop.shim.HadoopShim;
import org.apache.tez.runtime.api.ExecutionContext;
import org.apache.tez.runtime.api.ObjectRegistry;
import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
import org.apache.tez.runtime.api.impl.TaskSpec;
import org.apache.tez.runtime.common.objectregistry.ObjectRegistryImpl;
import org.apache.tez.runtime.internals.api.TaskReporterInterface;
import org.apache.tez.runtime.task.TaskReporter;
import org.apache.tez.runtime.task.TaskRunner2Result;
import org.apache.tez.runtime.task.TezChild;
import org.apache.tez.runtime.task.TezTaskRunner2;
import org.apache.tez.service.ContainerRunner;
import org.apache.tez.shufflehandler.ShuffleHandler;
import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos;
import org.apache.tez.util.ProtoConverters;
import org.apache.tez.util.StopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ContainerRunnerImpl
extends AbstractService
implements ContainerRunner {
    private static final Logger LOG = LoggerFactory.getLogger(ContainerRunnerImpl.class);
    public static final String DAG_NAME_INSTRUMENTED_FAILURES = "InstrumentedFailures";
    private final ListeningExecutorService executorService;
    private final AtomicReference<InetSocketAddress> localAddress;
    private final String[] localDirsBase;
    private final Map<String, String> localEnv = new HashMap<String, String>();
    private volatile FileSystem localFs;
    private final long memoryPerExecutor;
    private final TezExecutors sharedExecutor;

    public ContainerRunnerImpl(int numExecutors, String[] localDirsBase, AtomicReference<InetSocketAddress> localAddress, long totalMemoryAvailableBytes, TezExecutors sharedExecutor) {
        super("ContainerRunnerImpl");
        Preconditions.checkState((numExecutors > 0 ? 1 : 0) != 0, (Object)("Invalid number of executors: " + numExecutors + ". Must be > 0"));
        this.localDirsBase = localDirsBase;
        this.localAddress = localAddress;
        ExecutorService raw = Executors.newFixedThreadPool(numExecutors, new ThreadFactoryBuilder().setNameFormat("ContainerExecutor %d").build());
        this.executorService = MoreExecutors.listeningDecorator((ExecutorService)raw);
        this.memoryPerExecutor = (long)((double)totalMemoryAvailableBytes * 0.8 / (double)numExecutors);
        LOG.info("ContainerRunnerImpl config: memoryPerExecutorDerived=" + this.memoryPerExecutor + ", numExecutors=" + numExecutors);
        this.sharedExecutor = sharedExecutor;
    }

    public void serviceInit(Configuration conf) {
        try {
            this.localFs = FileSystem.getLocal((Configuration)conf);
        }
        catch (IOException e) {
            throw new RuntimeException("Failed to setup local filesystem instance", e);
        }
    }

    public void serviceStart() {
    }

    public void setShufflePort(String auxiliaryService, int shufflePort) {
        AuxiliaryServiceHelper.setServiceDataIntoEnv((String)auxiliaryService, (ByteBuffer)ByteBuffer.allocate(4).putInt(shufflePort), this.localEnv);
    }

    protected void serviceStop() throws Exception {
        super.serviceStop();
    }

    private static String createAppSpecificLocalDir(String baseDir, String applicationIdString, String user) {
        return baseDir + File.separator + "usercache" + File.separator + user + File.separator + "appcache" + File.separator + applicationIdString;
    }

    @Override
    public void queueContainer(TezTestServiceProtocolProtos.RunContainerRequestProto request) throws TezException {
        LOG.info("Queuing container for execution: " + request);
        HashMap<String, String> env = new HashMap<String, String>();
        env.putAll(this.localEnv);
        env.put(ApplicationConstants.Environment.USER.name(), request.getUser());
        Object[] localDirs = new String[this.localDirsBase.length];
        for (int i = 0; i < this.localDirsBase.length; ++i) {
            localDirs[i] = ContainerRunnerImpl.createAppSpecificLocalDir(this.localDirsBase[i], request.getApplicationIdString(), request.getUser());
            try {
                this.localFs.mkdirs(new Path((String)localDirs[i]));
                continue;
            }
            catch (IOException e) {
                throw new TezException((Throwable)e);
            }
        }
        LOG.info("Dirs for {} are {}", (Object)request.getContainerIdString(), (Object)Arrays.toString(localDirs));
        Object workingDir = localDirs[0];
        Credentials credentials = new Credentials();
        DataInputBuffer dib = new DataInputBuffer();
        byte[] tokenBytes = request.getCredentialsBinary().toByteArray();
        dib.reset(tokenBytes, tokenBytes.length);
        try {
            credentials.readTokenStorageStream((DataInputStream)dib);
        }
        catch (IOException e) {
            throw new TezException((Throwable)e);
        }
        Token jobToken = TokenCache.getSessionToken((Credentials)credentials);
        LOG.info("Registering request with the ShuffleHandler for containerId {}", (Object)request.getContainerIdString());
        ShuffleHandler.get().registerApplication(request.getApplicationIdString(), (Token<JobTokenIdentifier>)jobToken, request.getUser());
        ContainerRunnerCallable callable = new ContainerRunnerCallable(request, new Configuration(this.getConfig()), (ExecutionContext)new ExecutionContextImpl(this.localAddress.get().getHostName()), env, (String[])localDirs, (String)workingDir, credentials, this.memoryPerExecutor);
        ListenableFuture future = this.executorService.submit((Callable)callable);
        Futures.addCallback((ListenableFuture)future, (FutureCallback)new ContainerRunnerCallback(request, callable), (Executor)GuavaShim.directExecutor());
    }

    @Override
    public void submitWork(TezTestServiceProtocolProtos.SubmitWorkRequestProto request) throws TezException {
        LOG.info("Queuing work for execution: " + request);
        this.checkAndThrowExceptionForTests(request);
        HashMap<String, String> env = new HashMap<String, String>();
        env.putAll(this.localEnv);
        env.put(ApplicationConstants.Environment.USER.name(), request.getUser());
        Object[] localDirs = new String[this.localDirsBase.length];
        for (int i = 0; i < this.localDirsBase.length; ++i) {
            localDirs[i] = ContainerRunnerImpl.createAppSpecificLocalDir(this.localDirsBase[i], request.getApplicationIdString(), request.getUser());
            try {
                this.localFs.mkdirs(new Path((String)localDirs[i]));
                continue;
            }
            catch (IOException e) {
                throw new TezException((Throwable)e);
            }
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Dirs are: " + Arrays.toString(localDirs));
        }
        String workingDir = localDirs[0];
        Credentials credentials = new Credentials();
        DataInputBuffer dib = new DataInputBuffer();
        byte[] tokenBytes = request.getCredentialsBinary().toByteArray();
        dib.reset(tokenBytes, tokenBytes.length);
        try {
            credentials.readTokenStorageStream((DataInputStream)dib);
        }
        catch (IOException e) {
            throw new TezException((Throwable)e);
        }
        Token jobToken = TokenCache.getSessionToken((Credentials)credentials);
        LOG.info("Registering request with the ShuffleHandler for containerId {}", (Object)request.getContainerIdString());
        ShuffleHandler.get().registerApplication(request.getApplicationIdString(), (Token<JobTokenIdentifier>)jobToken, request.getUser());
        TezCommonUtils.logCredentials((Logger)LOG, (Credentials)credentials, (String)"taskCallable");
        TaskRunnerCallable callable = new TaskRunnerCallable(request, new Configuration(this.getConfig()), (ExecutionContext)new ExecutionContextImpl(this.localAddress.get().getHostName()), env, (String[])localDirs, workingDir, credentials, this.memoryPerExecutor, this.sharedExecutor);
        ListenableFuture future = this.executorService.submit((Callable)callable);
        Futures.addCallback((ListenableFuture)future, (FutureCallback)new TaskRunnerCallback(request, callable), (Executor)GuavaShim.directExecutor());
    }

    private void checkAndThrowExceptionForTests(TezTestServiceProtocolProtos.SubmitWorkRequestProto request) throws TezException {
        if (!request.getTaskSpec().getDagName().equals(DAG_NAME_INSTRUMENTED_FAILURES)) {
            return;
        }
        TaskSpec taskSpec = ProtoConverters.getTaskSpecfromProto(request.getTaskSpec());
        if (taskSpec.getTaskID().getId() == 0 && taskSpec.getTaskAttemptID().getId() == 0) {
            LOG.info("Simulating Rejected work");
            throw new RejectedExecutionException("Simulating Rejected work for taskAttemptId=" + taskSpec.getTaskAttemptID());
        }
        if (taskSpec.getTaskID().getId() == 1 && taskSpec.getTaskAttemptID().getId() == 0) {
            LOG.info("Simulating Task Setup Failure during launch");
            throw new TezException("Simulating Task Setup Failure during launch for taskAttemptId=" + taskSpec.getTaskAttemptID());
        }
    }

    static class ContainerRunnerCallable
    implements Callable<TezChild.ContainerExecutionResult> {
        private final TezTestServiceProtocolProtos.RunContainerRequestProto request;
        private final Configuration conf;
        private final String workingDir;
        private final String[] localDirs;
        private final Map<String, String> envMap;
        private final String pid;
        private final ObjectRegistryImpl objectRegistry;
        private final ExecutionContext executionContext;
        private final Credentials credentials;
        private final long memoryAvailable;
        private volatile TezChild tezChild;

        ContainerRunnerCallable(TezTestServiceProtocolProtos.RunContainerRequestProto request, Configuration conf, ExecutionContext executionContext, Map<String, String> envMap, String[] localDirs, String workingDir, Credentials credentials, long memoryAvailable) {
            this.pid = null;
            this.request = request;
            this.conf = conf;
            this.executionContext = executionContext;
            this.envMap = envMap;
            this.workingDir = workingDir;
            this.localDirs = localDirs;
            this.objectRegistry = new ObjectRegistryImpl();
            this.credentials = credentials;
            this.memoryAvailable = memoryAvailable;
        }

        @Override
        public TezChild.ContainerExecutionResult call() throws Exception {
            StopWatch sw = new StopWatch().start();
            this.tezChild = new TezChild(this.conf, this.request.getAmHost(), this.request.getAmPort(), this.request.getContainerIdString(), this.request.getTokenIdentifier(), this.request.getAppAttemptNumber(), this.workingDir, this.localDirs, this.envMap, this.objectRegistry, this.pid, this.executionContext, this.credentials, this.memoryAvailable, this.request.getUser(), null, false, (HadoopShim)new DefaultHadoopShim());
            TezChild.ContainerExecutionResult result = this.tezChild.run();
            LOG.info("ExecutionTime for Container: " + this.request.getContainerIdString() + "=" + sw.stop().now(TimeUnit.MILLISECONDS));
            return result;
        }

        public TezChild getTezChild() {
            return this.tezChild;
        }
    }

    final class ContainerRunnerCallback
    implements FutureCallback<TezChild.ContainerExecutionResult> {
        private final TezTestServiceProtocolProtos.RunContainerRequestProto request;
        private final ContainerRunnerCallable containerRunnerCallable;

        ContainerRunnerCallback(TezTestServiceProtocolProtos.RunContainerRequestProto request, ContainerRunnerCallable containerRunnerCallable) {
            this.request = request;
            this.containerRunnerCallable = containerRunnerCallable;
        }

        public void onSuccess(TezChild.ContainerExecutionResult result) {
            switch (result.getExitStatus()) {
                case SUCCESS: {
                    LOG.info("Successfully finished: " + this.request.getApplicationIdString() + ", containerId=" + this.request.getContainerIdString());
                    break;
                }
                case EXECUTION_FAILURE: {
                    LOG.info("Failed to run: " + this.request.getApplicationIdString() + ", containerId=" + this.request.getContainerIdString(), result.getThrowable());
                    break;
                }
                case INTERRUPTED: {
                    LOG.info("Interrupted while running: " + this.request.getApplicationIdString() + ", containerId=" + this.request.getContainerIdString(), result.getThrowable());
                    break;
                }
                case ASKED_TO_DIE: {
                    LOG.info("Asked to die while running: " + this.request.getApplicationIdString() + ", containerId=" + this.request.getContainerIdString());
                }
            }
        }

        public void onFailure(Throwable t) {
            LOG.error("TezChild execution failed for : " + this.request.getApplicationIdString() + ", containerId=" + this.request.getContainerIdString(), t);
            TezChild tezChild = this.containerRunnerCallable.getTezChild();
            if (tezChild != null) {
                tezChild.shutdown();
            }
        }
    }

    static class TaskRunnerCallable
    implements Callable<TezChild.ContainerExecutionResult> {
        private final TezTestServiceProtocolProtos.SubmitWorkRequestProto request;
        private final Configuration conf;
        private final String workingDir;
        private final String[] localDirs;
        private final Map<String, String> envMap;
        private final String pid;
        private final ObjectRegistryImpl objectRegistry;
        private final ExecutionContext executionContext;
        private final Credentials credentials;
        private final long memoryAvailable;
        private final ListeningExecutorService executor;
        private volatile TezTaskRunner2 taskRunner;
        private volatile TaskReporter taskReporter;
        private TezTaskUmbilicalProtocol umbilical;
        private final TezExecutors sharedExecutor;

        TaskRunnerCallable(TezTestServiceProtocolProtos.SubmitWorkRequestProto request, Configuration conf, ExecutionContext executionContext, Map<String, String> envMap, String[] localDirs, String workingDir, Credentials credentials, long memoryAvailable, TezExecutors sharedExecutor) {
            this.pid = null;
            this.request = request;
            this.conf = conf;
            this.executionContext = executionContext;
            this.envMap = envMap;
            this.workingDir = workingDir;
            this.localDirs = localDirs;
            this.objectRegistry = new ObjectRegistryImpl();
            this.credentials = credentials;
            this.memoryAvailable = memoryAvailable;
            ExecutorService executorReal = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("TezTaskRunner_" + request.getTaskSpec().getTaskAttemptIdString()).build());
            this.executor = MoreExecutors.listeningDecorator((ExecutorService)executorReal);
            this.sharedExecutor = sharedExecutor;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public TezChild.ContainerExecutionResult call() throws Exception {
            StopWatch sw = new StopWatch().start();
            UserGroupInformation taskUgi = UserGroupInformation.createRemoteUser((String)this.request.getUser());
            taskUgi.addCredentials(this.credentials);
            Token jobToken = TokenCache.getSessionToken((Credentials)this.credentials);
            HashMap<String, ByteBuffer> serviceConsumerMetadata = new HashMap<String, ByteBuffer>();
            String auxiliaryService = this.conf.get("tez.am.shuffle.auxiliary-service.id", "mapreduce_shuffle");
            serviceConsumerMetadata.put(auxiliaryService, TezCommonUtils.convertJobTokenToBytes((Token)jobToken));
            HashMultimap startedInputsMap = HashMultimap.create();
            UserGroupInformation taskOwner = UserGroupInformation.createRemoteUser((String)this.request.getTokenIdentifier());
            final InetSocketAddress address = NetUtils.createSocketAddrForHost((String)this.request.getAmHost(), (int)this.request.getAmPort());
            SecurityUtil.setTokenService((Token)jobToken, (InetSocketAddress)address);
            taskOwner.addToken(jobToken);
            this.umbilical = (TezTaskUmbilicalProtocol)taskOwner.doAs((PrivilegedExceptionAction)new PrivilegedExceptionAction<TezTaskUmbilicalProtocol>(){

                @Override
                public TezTaskUmbilicalProtocol run() throws Exception {
                    return (TezTaskUmbilicalProtocol)RPC.getProxy(TezTaskUmbilicalProtocol.class, (long)19L, (InetSocketAddress)address, (Configuration)conf);
                }
            });
            this.taskReporter = new TaskReporter(this.umbilical, (long)this.conf.getInt("tez.task.am.heartbeat.interval-ms.max", 100), this.conf.getLong("tez.task.am.heartbeat.counter.interval-ms.max", 4000L), this.conf.getInt("tez.task.max-events-per-heartbeat", 500), new AtomicLong(0L), this.request.getContainerIdString());
            TezCommonUtils.logCredentials((Logger)LOG, (Credentials)taskUgi.getCredentials(), (String)"taskUgi");
            this.taskRunner = new TezTaskRunner2(this.conf, taskUgi, this.localDirs, ProtoConverters.getTaskSpecfromProto(this.request.getTaskSpec()), this.request.getAppAttemptNumber(), serviceConsumerMetadata, this.envMap, (Multimap)startedInputsMap, (TaskReporterInterface)this.taskReporter, (ExecutorService)this.executor, (ObjectRegistry)this.objectRegistry, this.pid, this.executionContext, this.memoryAvailable, false, (HadoopShim)new DefaultHadoopShim(), this.sharedExecutor);
            try {
                TaskRunner2Result result = this.taskRunner.run();
                LOG.info("TaskRunner2Result: {}", (Object)result);
                boolean shouldDie = result.isContainerShutdownRequested();
                if (shouldDie) {
                    LOG.info("Got a shouldDie notification via heartbeats. Shutting down");
                    TezChild.ContainerExecutionResult containerExecutionResult = new TezChild.ContainerExecutionResult(TezChild.ContainerExecutionResult.ExitStatus.SUCCESS, null, "Asked to die by the AM");
                    return containerExecutionResult;
                }
                if (result.getError() != null) {
                    Throwable e = result.getError();
                    TezChild.ContainerExecutionResult containerExecutionResult = new TezChild.ContainerExecutionResult(TezChild.ContainerExecutionResult.ExitStatus.EXECUTION_FAILURE, e, "TaskExecutionFailure: " + e.getMessage());
                    return containerExecutionResult;
                }
            }
            finally {
                FileSystem.closeAllForUGI((UserGroupInformation)taskUgi);
            }
            LOG.info("ExecutionTime for Container: " + this.request.getContainerIdString() + "=" + sw.stop().now(TimeUnit.MILLISECONDS));
            return new TezChild.ContainerExecutionResult(TezChild.ContainerExecutionResult.ExitStatus.SUCCESS, null, null);
        }

        public void shutdown() {
            this.executor.shutdownNow();
            if (this.taskReporter != null) {
                this.taskReporter.shutdown();
            }
            if (this.umbilical != null) {
                RPC.stopProxy((Object)this.umbilical);
            }
        }
    }

    final class TaskRunnerCallback
    implements FutureCallback<TezChild.ContainerExecutionResult> {
        private final TezTestServiceProtocolProtos.SubmitWorkRequestProto request;
        private final TaskRunnerCallable taskRunnerCallable;

        TaskRunnerCallback(TezTestServiceProtocolProtos.SubmitWorkRequestProto request, TaskRunnerCallable containerRunnerCallable) {
            this.request = request;
            this.taskRunnerCallable = containerRunnerCallable;
        }

        public void onSuccess(TezChild.ContainerExecutionResult result) {
            switch (result.getExitStatus()) {
                case SUCCESS: {
                    LOG.info("Successfully finished: " + this.request.getApplicationIdString() + ", containerId=" + this.request.getContainerIdString());
                    break;
                }
                case EXECUTION_FAILURE: {
                    LOG.info("Failed to run: " + this.request.getApplicationIdString() + ", containerId=" + this.request.getContainerIdString(), result.getThrowable());
                    break;
                }
                case INTERRUPTED: {
                    LOG.info("Interrupted while running: " + this.request.getApplicationIdString() + ", containerId=" + this.request.getContainerIdString(), result.getThrowable());
                    break;
                }
                case ASKED_TO_DIE: {
                    LOG.info("Asked to die while running: " + this.request.getApplicationIdString() + ", containerId=" + this.request.getContainerIdString());
                }
            }
            this.taskRunnerCallable.shutdown();
        }

        public void onFailure(Throwable t) {
            LOG.error("TezTaskRunner execution failed for : " + this.request.getApplicationIdString() + ", containerId=" + this.request.getContainerIdString(), t);
            this.taskRunnerCallable.shutdown();
        }
    }
}

