package cascading.flow.hadoop.util;

import cascading.flow.FlowException;
import cascading.flow.hadoop.HadoopFlowProcess;
import cascading.scheme.hadoop.TextLine;
import cascading.tap.SinkMode;
import cascading.tap.hadoop.Hfs;
import cascading.tap.hadoop.Lfs;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntry;
import cascading.tuple.TupleEntryCollector;
import cascading.tuple.TupleEntryIterator;
import cascading.util.Util;
import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cascading/flow/hadoop/util/HadoopMRUtil.class */
public class HadoopMRUtil {
    private static final Logger LOG = LoggerFactory.getLogger(HadoopMRUtil.class);

    public static String writeStateToDistCache(JobConf jobConf, String str, String str2, String str3) {
        if (Util.isEmpty(str3)) {
            return null;
        }
        LOG.info("writing step state to dist cache, too large for job conf, size: {}", Integer.valueOf(str3.length()));
        String str4 = Hfs.getTempPath(jobConf) + "/" + str2 + "-state-" + str;
        Hfs hfs = new Hfs(new TextLine(), str4, SinkMode.REPLACE);
        try {
            TupleEntryCollector openForWrite = hfs.openForWrite(new HadoopFlowProcess(jobConf));
            openForWrite.add(new Tuple(new Object[]{str3}));
            openForWrite.close();
            URI uri = new Path(str4).toUri();
            DistributedCache.addCacheFile(uri, jobConf);
            LOG.info("using step state path: {}", uri);
            return str4;
        } catch (IOException e) {
            throw new FlowException("unable to write step state to Hadoop FS: " + hfs.getIdentifier());
        }
    }

    public static String readStateFromDistCache(JobConf jobConf, String str, String str2) throws IOException {
        Path[] localCacheFiles = DistributedCache.getLocalCacheFiles(jobConf);
        Path path = null;
        int length = localCacheFiles.length;
        int i = 0;
        while (true) {
            if (i >= length) {
                break;
            }
            Path path2 = localCacheFiles[i];
            if (path2.toString().contains(str2 + "-state-" + str)) {
                path = path2;
                break;
            }
            i++;
        }
        if (path == null) {
            throw new FlowException("unable to find step state from distributed cache");
        }
        LOG.info("reading step state from local path: {}", path);
        Lfs lfs = new Lfs(new TextLine(new Fields(new Comparable[]{"line"})), path.toString());
        TupleEntryIterator tupleEntryIterator = null;
        try {
            try {
                TupleEntryIterator openForRead = lfs.openForRead(new HadoopFlowProcess(jobConf));
                if (!openForRead.hasNext()) {
                    throw new FlowException("step state path is empty: " + lfs.getIdentifier());
                }
                String string = ((TupleEntry) openForRead.next()).getString(0);
                if (openForRead != null) {
                    try {
                        openForRead.close();
                    } catch (IOException e) {
                        LOG.warn("error closing state path reader", e);
                    }
                }
                return string;
            } catch (IOException e2) {
                throw new FlowException("unable to find state path: " + lfs.getIdentifier(), e2);
            }
        } catch (Throwable th) {
            if (0 != 0) {
                try {
                    tupleEntryIterator.close();
                } catch (IOException e3) {
                    LOG.warn("error closing state path reader", e3);
                    throw th;
                }
            }
            throw th;
        }
    }

    public static Map<Path, Path> addToClassPath(Configuration configuration, List<String> list) {
        if (list == null) {
            return null;
        }
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        HadoopUtil.resolvePaths(configuration, list, null, null, hashMap, hashMap2);
        try {
            LocalFileSystem localFS = HadoopUtil.getLocalFS(configuration);
            for (String str : hashMap.keySet()) {
                if (!hashMap2.containsKey(str)) {
                    DistributedCache.addFileToClassPath(((Path) hashMap.get(str)).makeQualified(localFS), configuration);
                }
            }
            FileSystem defaultFS = HadoopUtil.getDefaultFS(configuration);
            Iterator it = hashMap2.keySet().iterator();
            while (it.hasNext()) {
                DistributedCache.addFileToClassPath(((Path) hashMap2.get((String) it.next())).makeQualified(defaultFS), configuration);
            }
            return HadoopUtil.getCommonPaths(hashMap, hashMap2);
        } catch (IOException e) {
            throw new FlowException("unable to set distributed cache paths", e);
        }
    }

    public static boolean hasReducer(JobConf jobConf) {
        return jobConf.getReducerClass() != null;
    }
}
