package cascading.flow;

import cascading.CascadingException;
import cascading.cascade.Cascade;
import cascading.flow.planner.BaseFlowStep;
import cascading.flow.planner.ElementGraph;
import cascading.flow.planner.FlowStepGraph;
import cascading.flow.planner.FlowStepJob;
import cascading.flow.planner.PlatformInfo;
import cascading.flow.planner.Scope;
import cascading.management.CascadingServices;
import cascading.management.UnitOfWorkExecutorStrategy;
import cascading.management.UnitOfWorkSpawnStrategy;
import cascading.management.state.ClientState;
import cascading.property.AppProps;
import cascading.property.PropertyUtil;
import cascading.stats.FlowStats;
import cascading.tap.Tap;
import cascading.tuple.Fields;
import cascading.tuple.TupleEntryCollector;
import cascading.tuple.TupleEntryIterator;
import cascading.util.ShutdownUtil;
import cascading.util.Update;
import cascading.util.Util;
import cascading.util.Version;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import org.jgrapht.Graphs;
import org.jgrapht.traverse.TopologicalOrderIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import riffle.process.DependencyIncoming;
import riffle.process.DependencyOutgoing;
import riffle.process.Process;
import riffle.process.ProcessCleanup;
import riffle.process.ProcessComplete;
import riffle.process.ProcessPrepare;
import riffle.process.ProcessStart;
import riffle.process.ProcessStop;

@Process
/* loaded from: input_file:cascading/flow/BaseFlow.class */
public abstract class BaseFlow<Config> implements Flow<Config> {
    private static final Logger LOG = LoggerFactory.getLogger(Flow.class);
    private PlatformInfo platformInfo;
    private String id;
    private String name;
    private String runID;
    private List<String> classPath;
    private String tags;
    private List<BaseFlow<Config>.SafeFlowListener> listeners;
    private FlowSkipStrategy flowSkipStrategy;
    protected FlowStats flowStats;
    protected Map<String, Tap> sources;
    protected Map<String, Tap> sinks;
    private Map<String, Tap> traps;
    private Map<String, Tap> checkpoints;
    protected boolean stopJobsOnExit;
    private int submitPriority;
    private FlowStepGraph<Config> flowStepGraph;
    protected transient Thread thread;
    private Throwable throwable;
    protected boolean stop;
    private ElementGraph pipeGraph;
    private transient CascadingServices cascadingServices;
    private FlowStepStrategy<Config> flowStepStrategy;
    private transient List<FlowStep<Config>> steps;
    private transient Map<String, FlowStepJob<Config>> jobsMap;
    private transient UnitOfWorkSpawnStrategy spawnStrategy;
    private transient ReentrantLock stopLock;
    protected ShutdownUtil.Hook shutdownHook;
    private HashMap<String, String> flowDescriptor;

    /* loaded from: input_file:cascading/flow/BaseFlow$FlowHolder.class */
    public static class FlowHolder {
        public Flow flow;

        public FlowHolder() {
        }

        public FlowHolder(Flow flow) {
            this.flow = flow;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:cascading/flow/BaseFlow$SafeFlowListener.class */
    public class SafeFlowListener implements FlowListener {
        final FlowListener flowListener;
        Throwable throwable;

        private SafeFlowListener(FlowListener flowListener) {
            this.flowListener = flowListener;
        }

        @Override // cascading.flow.FlowListener
        public void onStarting(Flow flow) {
            try {
                this.flowListener.onStarting(flow);
            } catch (Throwable th) {
                handleThrowable(th);
            }
        }

        @Override // cascading.flow.FlowListener
        public void onStopping(Flow flow) {
            try {
                this.flowListener.onStopping(flow);
            } catch (Throwable th) {
                handleThrowable(th);
            }
        }

        @Override // cascading.flow.FlowListener
        public void onCompleted(Flow flow) {
            try {
                this.flowListener.onCompleted(flow);
            } catch (Throwable th) {
                handleThrowable(th);
            }
        }

        @Override // cascading.flow.FlowListener
        public boolean onThrowable(Flow flow, Throwable th) {
            try {
                return this.flowListener.onThrowable(flow, th);
            } catch (Throwable th2) {
                handleThrowable(th2);
                return false;
            }
        }

        private void handleThrowable(Throwable th) {
            this.throwable = th;
            BaseFlow.this.logWarn(String.format("flow listener %s threw throwable", this.flowListener), th);
            BaseFlow.this.stop();
        }

        public boolean equals(Object obj) {
            return obj instanceof SafeFlowListener ? this.flowListener.equals(((SafeFlowListener) obj).flowListener) : this.flowListener.equals(obj);
        }

        public int hashCode() {
            return this.flowListener.hashCode();
        }
    }

    static boolean getStopJobsOnExit(Map<Object, Object> map) {
        return Boolean.parseBoolean((String) PropertyUtil.getProperty(map, FlowProps.STOP_JOBS_ON_EXIT, "true"));
    }

    protected BaseFlow() {
        this.flowSkipStrategy = new FlowSkipIfSinkNotStale();
        this.sources = Collections.EMPTY_MAP;
        this.sinks = Collections.EMPTY_MAP;
        this.traps = Collections.EMPTY_MAP;
        this.checkpoints = Collections.EMPTY_MAP;
        this.stopJobsOnExit = true;
        this.submitPriority = 5;
        this.flowStepStrategy = null;
        this.spawnStrategy = new UnitOfWorkExecutorStrategy();
        this.stopLock = new ReentrantLock(true);
        this.name = "NA";
        this.flowStats = createPrepareFlowStats();
    }

    protected BaseFlow(PlatformInfo platformInfo, Map<Object, Object> map, Config config, String str) {
        this(platformInfo, map, config, str, new LinkedHashMap());
    }

    protected BaseFlow(PlatformInfo platformInfo, Map<Object, Object> map, Config config, String str, Map<String, String> map2) {
        this.flowSkipStrategy = new FlowSkipIfSinkNotStale();
        this.sources = Collections.EMPTY_MAP;
        this.sinks = Collections.EMPTY_MAP;
        this.traps = Collections.EMPTY_MAP;
        this.checkpoints = Collections.EMPTY_MAP;
        this.stopJobsOnExit = true;
        this.submitPriority = 5;
        this.flowStepStrategy = null;
        this.spawnStrategy = new UnitOfWorkExecutorStrategy();
        this.stopLock = new ReentrantLock(true);
        this.platformInfo = platformInfo;
        this.name = str;
        if (map2 != null) {
            this.flowDescriptor = new LinkedHashMap(map2);
        }
        addSessionProperties(map);
        initConfig(map, config);
        this.flowStats = createPrepareFlowStats();
    }

    protected BaseFlow(PlatformInfo platformInfo, Map<Object, Object> map, Config config, FlowDef flowDef) {
        this.flowSkipStrategy = new FlowSkipIfSinkNotStale();
        this.sources = Collections.EMPTY_MAP;
        this.sinks = Collections.EMPTY_MAP;
        this.traps = Collections.EMPTY_MAP;
        this.checkpoints = Collections.EMPTY_MAP;
        this.stopJobsOnExit = true;
        this.submitPriority = 5;
        this.flowStepStrategy = null;
        this.spawnStrategy = new UnitOfWorkExecutorStrategy();
        this.stopLock = new ReentrantLock(true);
        this.platformInfo = platformInfo;
        this.name = flowDef.getName();
        this.tags = flowDef.getTags();
        this.runID = flowDef.getRunID();
        this.classPath = flowDef.getClassPath();
        if (!flowDef.getFlowDescriptor().isEmpty()) {
            this.flowDescriptor = new LinkedHashMap(flowDef.getFlowDescriptor());
        }
        addSessionProperties(map);
        initConfig(map, config);
        setSources(flowDef.getSourcesCopy());
        setSinks(flowDef.getSinksCopy());
        setTraps(flowDef.getTrapsCopy());
        setCheckpoints(flowDef.getCheckpointsCopy());
        initFromTaps();
        retrieveSourceFields();
        retrieveSinkFields();
    }

    @Override // cascading.flow.Flow
    public PlatformInfo getPlatformInfo() {
        return this.platformInfo;
    }

    public void initialize(ElementGraph elementGraph, FlowStepGraph<Config> flowStepGraph) {
        this.pipeGraph = elementGraph;
        this.flowStepGraph = flowStepGraph;
        initSteps();
        this.flowStats = createPrepareFlowStats();
        initializeNewJobsMap();
    }

    public ElementGraph updateSchemes(ElementGraph elementGraph) {
        presentSourceFields(elementGraph);
        presentSinkFields(elementGraph);
        return new ElementGraph(elementGraph);
    }

    protected void retrieveSourceFields() {
        Iterator<Tap> it = this.sources.values().iterator();
        while (it.hasNext()) {
            it.next().retrieveSourceFields(getFlowProcess());
        }
    }

    protected void presentSourceFields(ElementGraph elementGraph) {
        for (Tap tap : this.sources.values()) {
            if (elementGraph.containsVertex(tap)) {
                tap.presentSourceFields(getFlowProcess(), getFieldsFor(elementGraph, tap));
            }
        }
        for (Tap tap2 : this.checkpoints.values()) {
            if (elementGraph.containsVertex(tap2)) {
                tap2.presentSourceFields(getFlowProcess(), getFieldsFor(elementGraph, tap2));
            }
        }
    }

    protected void retrieveSinkFields() {
        Iterator<Tap> it = this.sinks.values().iterator();
        while (it.hasNext()) {
            it.next().retrieveSinkFields(getFlowProcess());
        }
    }

    protected void presentSinkFields(ElementGraph elementGraph) {
        for (Tap tap : this.sinks.values()) {
            if (elementGraph.containsVertex(tap)) {
                tap.presentSinkFields(getFlowProcess(), getFieldsFor(elementGraph, tap));
            }
        }
        for (Tap tap2 : this.checkpoints.values()) {
            if (elementGraph.containsVertex(tap2)) {
                tap2.presentSinkFields(getFlowProcess(), getFieldsFor(elementGraph, tap2));
            }
        }
    }

    protected Fields getFieldsFor(ElementGraph elementGraph, Tap tap) {
        return ((Scope) elementGraph.outgoingEdgesOf(tap).iterator().next()).getOutValuesFields();
    }

    private void addSessionProperties(Map<Object, Object> map) {
        if (map == null) {
            return;
        }
        PropertyUtil.setProperty(map, Flow.CASCADING_FLOW_ID, getID());
        PropertyUtil.setProperty(map, "cascading.flow.tags", getTags());
        AppProps.setApplicationID(map);
        PropertyUtil.setProperty(map, AppProps.APP_NAME, makeAppName(map));
        PropertyUtil.setProperty(map, AppProps.APP_VERSION, makeAppVersion(map));
    }

    private String makeAppName(Map<Object, Object> map) {
        if (map == null) {
            return null;
        }
        String applicationName = AppProps.getApplicationName(map);
        return applicationName != null ? applicationName : Util.findName(AppProps.getApplicationJarPath(map));
    }

    private String makeAppVersion(Map<Object, Object> map) {
        if (map == null) {
            return null;
        }
        String applicationVersion = AppProps.getApplicationVersion(map);
        return applicationVersion != null ? applicationVersion : Util.findVersion(AppProps.getApplicationJarPath(map));
    }

    private FlowStats createPrepareFlowStats() {
        FlowStats flowStats = new FlowStats(this, getClientState());
        flowStats.prepare();
        flowStats.markPending();
        return flowStats;
    }

    public CascadingServices getCascadingServices() {
        if (this.cascadingServices == null) {
            this.cascadingServices = new CascadingServices(getConfigAsProperties());
        }
        return this.cascadingServices;
    }

    private ClientState getClientState() {
        return getFlowSession().getCascadingServices().createClientState(getID());
    }

    protected void initSteps() {
        if (this.flowStepGraph == null) {
            return;
        }
        Iterator it = this.flowStepGraph.vertexSet().iterator();
        while (it.hasNext()) {
            ((BaseFlowStep) ((FlowStep) it.next())).setFlow(this);
        }
    }

    private void initFromTaps() {
        initFromTaps(this.sources);
        initFromTaps(this.sinks);
        initFromTaps(this.traps);
    }

    private void initFromTaps(Map<String, Tap> map) {
        Iterator<Tap> it = map.values().iterator();
        while (it.hasNext()) {
            it.next().flowConfInit(this);
        }
    }

    @Override // cascading.flow.Flow, cascading.management.UnitOfWork
    public String getName() {
        return this.name;
    }

    protected void setName(String str) {
        this.name = str;
    }

    @Override // cascading.flow.Flow, cascading.management.UnitOfWork
    public String getID() {
        if (this.id == null) {
            this.id = Util.createUniqueID();
        }
        return this.id;
    }

    @Override // cascading.flow.Flow, cascading.management.UnitOfWork
    public String getTags() {
        return this.tags;
    }

    @Override // cascading.flow.Flow
    public int getSubmitPriority() {
        return this.submitPriority;
    }

    @Override // cascading.flow.Flow
    public void setSubmitPriority(int i) {
        if (i < 1 || i > 10) {
            throw new IllegalArgumentException("submitPriority must be between 1 and 10 inclusive, was: " + i);
        }
        this.submitPriority = i;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ElementGraph getPipeGraph() {
        return this.pipeGraph;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public FlowStepGraph getFlowStepGraph() {
        return this.flowStepGraph;
    }

    protected void setSources(Map<String, Tap> map) {
        addListeners(map.values());
        this.sources = map;
    }

    protected void setSinks(Map<String, Tap> map) {
        addListeners(map.values());
        this.sinks = map;
    }

    protected void setTraps(Map<String, Tap> map) {
        addListeners(map.values());
        this.traps = map;
    }

    protected void setCheckpoints(Map<String, Tap> map) {
        addListeners(map.values());
        this.checkpoints = map;
    }

    protected void setFlowStepGraph(FlowStepGraph flowStepGraph) {
        this.flowStepGraph = flowStepGraph;
    }

    protected abstract void initConfig(Map<Object, Object> map, Config config);

    public Config createConfig(Map<Object, Object> map, Config config) {
        Config newConfig = newConfig(config);
        if (map == null) {
            return newConfig;
        }
        HashSet hashSet = new HashSet(map.keySet());
        if (map instanceof Properties) {
            hashSet.addAll(((Properties) map).stringPropertyNames());
        }
        for (Object obj : hashSet) {
            Object obj2 = map.get(obj);
            if (obj2 == null && (map instanceof Properties) && (obj instanceof String)) {
                obj2 = ((Properties) map).getProperty((String) obj);
            }
            if (obj2 != null) {
                setConfigProperty(newConfig, obj, obj2);
            }
        }
        return newConfig;
    }

    protected abstract void setConfigProperty(Config config, Object obj, Object obj2);

    protected abstract Config newConfig(Config config);

    protected void initFromProperties(Map<Object, Object> map) {
        this.stopJobsOnExit = getStopJobsOnExit(map);
    }

    public FlowSession getFlowSession() {
        return new FlowSession(getCascadingServices());
    }

    @Override // cascading.flow.Flow
    public FlowStats getFlowStats() {
        return this.flowStats;
    }

    @Override // cascading.flow.Flow
    public Map<String, String> getFlowDescriptor() {
        return this.flowDescriptor == null ? Collections.emptyMap() : Collections.unmodifiableMap(this.flowDescriptor);
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // cascading.management.UnitOfWork
    public FlowStats getStats() {
        return getFlowStats();
    }

    void addListeners(Collection collection) {
        for (Object obj : collection) {
            if (obj instanceof FlowListener) {
                addListener((FlowListener) obj);
            }
        }
    }

    List<BaseFlow<Config>.SafeFlowListener> getListeners() {
        if (this.listeners == null) {
            this.listeners = new LinkedList();
        }
        return this.listeners;
    }

    @Override // cascading.flow.Flow
    public boolean hasListeners() {
        return (this.listeners == null || this.listeners.isEmpty()) ? false : true;
    }

    @Override // cascading.flow.Flow
    public void addListener(FlowListener flowListener) {
        getListeners().add(new SafeFlowListener(flowListener));
    }

    @Override // cascading.flow.Flow
    public boolean removeListener(FlowListener flowListener) {
        return getListeners().remove(new SafeFlowListener(flowListener));
    }

    @Override // cascading.flow.Flow
    public boolean hasStepListeners() {
        boolean z = false;
        Iterator<FlowStep<Config>> it = getFlowSteps().iterator();
        while (it.hasNext()) {
            z |= it.next().hasListeners();
        }
        return z;
    }

    @Override // cascading.flow.Flow
    public void addStepListener(FlowStepListener flowStepListener) {
        Iterator<FlowStep<Config>> it = getFlowSteps().iterator();
        while (it.hasNext()) {
            it.next().addListener(flowStepListener);
        }
    }

    @Override // cascading.flow.Flow
    public boolean removeStepListener(FlowStepListener flowStepListener) {
        boolean z = true;
        Iterator<FlowStep<Config>> it = getFlowSteps().iterator();
        while (it.hasNext()) {
            z &= it.next().removeListener(flowStepListener);
        }
        return z;
    }

    @Override // cascading.flow.Flow
    public Map<String, Tap> getSources() {
        return Collections.unmodifiableMap(this.sources);
    }

    @Override // cascading.flow.Flow
    public List<String> getSourceNames() {
        return new ArrayList(this.sources.keySet());
    }

    @Override // cascading.flow.Flow
    public Tap getSource(String str) {
        return this.sources.get(str);
    }

    @Override // cascading.flow.Flow
    @DependencyIncoming
    public Collection<Tap> getSourcesCollection() {
        return getSources().values();
    }

    @Override // cascading.flow.Flow
    public Map<String, Tap> getSinks() {
        return Collections.unmodifiableMap(this.sinks);
    }

    @Override // cascading.flow.Flow
    public List<String> getSinkNames() {
        return new ArrayList(this.sinks.keySet());
    }

    @Override // cascading.flow.Flow
    public Tap getSink(String str) {
        return this.sinks.get(str);
    }

    @Override // cascading.flow.Flow
    @DependencyOutgoing
    public Collection<Tap> getSinksCollection() {
        return getSinks().values();
    }

    @Override // cascading.flow.Flow
    public Tap getSink() {
        return this.sinks.values().iterator().next();
    }

    @Override // cascading.flow.Flow
    public Map<String, Tap> getTraps() {
        return Collections.unmodifiableMap(this.traps);
    }

    @Override // cascading.flow.Flow
    public List<String> getTrapNames() {
        return new ArrayList(this.traps.keySet());
    }

    @Override // cascading.flow.Flow
    public Collection<Tap> getTrapsCollection() {
        return getTraps().values();
    }

    @Override // cascading.flow.Flow
    public Map<String, Tap> getCheckpoints() {
        return Collections.unmodifiableMap(this.checkpoints);
    }

    @Override // cascading.flow.Flow
    public List<String> getCheckpointNames() {
        return new ArrayList(this.checkpoints.keySet());
    }

    @Override // cascading.flow.Flow
    public Collection<Tap> getCheckpointsCollection() {
        return getCheckpoints().values();
    }

    @Override // cascading.flow.Flow
    public boolean isStopJobsOnExit() {
        return this.stopJobsOnExit;
    }

    @Override // cascading.flow.Flow
    public FlowSkipStrategy getFlowSkipStrategy() {
        return this.flowSkipStrategy;
    }

    @Override // cascading.flow.Flow
    public FlowSkipStrategy setFlowSkipStrategy(FlowSkipStrategy flowSkipStrategy) {
        if (flowSkipStrategy == null) {
            throw new IllegalArgumentException("flowSkipStrategy may not be null");
        }
        try {
            FlowSkipStrategy flowSkipStrategy2 = this.flowSkipStrategy;
            this.flowSkipStrategy = flowSkipStrategy;
            return flowSkipStrategy2;
        } catch (Throwable th) {
            this.flowSkipStrategy = flowSkipStrategy;
            throw th;
        }
    }

    @Override // cascading.flow.Flow
    public boolean isSkipFlow() throws IOException {
        return this.flowSkipStrategy.skipFlow(this);
    }

    @Override // cascading.flow.Flow
    public boolean areSinksStale() throws IOException {
        return areSourcesNewer(getSinkModified());
    }

    @Override // cascading.flow.Flow
    public boolean areSourcesNewer(long j) throws IOException {
        try {
            long sourceModified = Util.getSourceModified(getConfigCopy(), this.sources.values().iterator(), j);
            if (j < sourceModified) {
                if (LOG.isInfoEnabled()) {
                    logInfo("source modification date at: " + new Date(sourceModified));
                }
                return true;
            }
            if (LOG.isInfoEnabled()) {
                logInfo("source modification date at: " + new Date(sourceModified));
            }
            return false;
        } catch (Throwable th) {
            if (LOG.isInfoEnabled()) {
                logInfo("source modification date at: " + new Date(0L));
            }
            throw th;
        }
    }

    @Override // cascading.flow.Flow
    public long getSinkModified() throws IOException {
        long sinkModified = Util.getSinkModified(getConfigCopy(), this.sinks.values());
        if (LOG.isInfoEnabled()) {
            if (sinkModified == -1) {
                logInfo("at least one sink is marked for delete");
            }
            if (sinkModified == 0) {
                logInfo("at least one sink does not exist");
            } else {
                logInfo("sink oldest modified date: " + new Date(sinkModified));
            }
        }
        return sinkModified;
    }

    @Override // cascading.flow.Flow
    public FlowStepStrategy getFlowStepStrategy() {
        return this.flowStepStrategy;
    }

    @Override // cascading.flow.Flow
    public void setFlowStepStrategy(FlowStepStrategy flowStepStrategy) {
        this.flowStepStrategy = flowStepStrategy;
    }

    @Override // cascading.flow.Flow
    public List<FlowStep<Config>> getFlowSteps() {
        if (this.steps != null) {
            return this.steps;
        }
        if (this.flowStepGraph == null) {
            return Collections.EMPTY_LIST;
        }
        TopologicalOrderIterator<FlowStep<Config>, Integer> topologicalIterator = this.flowStepGraph.getTopologicalIterator();
        this.steps = new ArrayList();
        while (topologicalIterator.hasNext()) {
            this.steps.add(topologicalIterator.next());
        }
        return this.steps;
    }

    @Override // cascading.flow.Flow, cascading.management.UnitOfWork
    @ProcessPrepare
    public void prepare() {
        try {
            deleteSinksIfNotUpdate();
            deleteTrapsIfNotUpdate();
            deleteCheckpointsIfNotUpdate();
        } catch (IOException e) {
            throw new FlowException("unable to prepare flow", e);
        }
    }

    @Override // cascading.flow.Flow, cascading.management.UnitOfWork
    @ProcessStart
    public synchronized void start() {
        if (this.thread == null && !this.stop) {
            registerShutdownHook();
            internalStart();
            this.thread = createFlowThread(("flow " + Util.toNull(getName())).trim());
            this.thread.start();
        }
    }

    protected Thread createFlowThread(String str) {
        return new Thread(new Runnable() { // from class: cascading.flow.BaseFlow.1
            @Override // java.lang.Runnable
            public void run() {
                BaseFlow.this.run();
            }
        }, str);
    }

    protected abstract void internalStart();

    @Override // cascading.flow.Flow, cascading.management.UnitOfWork
    @ProcessStop
    public synchronized void stop() {
        this.stopLock.lock();
        try {
            if (this.stop) {
                return;
            }
            this.stop = true;
            fireOnStopping();
            if (!this.flowStats.isFinished()) {
                this.flowStats.markStopped();
            }
            internalStopAllJobs();
            handleExecutorShutdown();
            internalClean(true);
            this.flowStats.cleanup();
            this.stopLock.unlock();
        } finally {
            this.flowStats.cleanup();
            this.stopLock.unlock();
        }
    }

    protected abstract void internalClean(boolean z);

    @Override // cascading.flow.Flow, cascading.management.UnitOfWork
    @ProcessComplete
    public void complete() {
        start();
        try {
            try {
                synchronized (this) {
                    while (this.thread == null && !this.stop) {
                        Util.safeSleep(10L);
                    }
                }
                if (this.thread != null) {
                    this.thread.join();
                }
                try {
                    this.stopLock.lock();
                    this.stopLock.unlock();
                    if (this.throwable instanceof FlowException) {
                        ((FlowException) this.throwable).setFlowName(getName());
                    }
                    if (this.throwable instanceof CascadingException) {
                        throw ((CascadingException) this.throwable);
                    }
                    if (this.throwable instanceof OutOfMemoryError) {
                        throw ((OutOfMemoryError) this.throwable);
                    }
                    if (this.throwable != null) {
                        throw new FlowException(getName(), "unhandled exception", this.throwable);
                    }
                    if (hasListeners()) {
                        Iterator<BaseFlow<Config>.SafeFlowListener> it = getListeners().iterator();
                        while (it.hasNext()) {
                            if (it.next().throwable != null) {
                                throw new FlowException(getName(), "unhandled listener exception", this.throwable);
                            }
                        }
                    }
                    this.thread = null;
                    this.throwable = null;
                    try {
                        commitTraps();
                        if (hasListeners()) {
                            Iterator<BaseFlow<Config>.SafeFlowListener> it2 = getListeners().iterator();
                            while (it2.hasNext()) {
                                it2.next().throwable = null;
                            }
                        }
                        this.flowStats.cleanup();
                    } finally {
                    }
                } catch (Throwable th) {
                    this.stopLock.unlock();
                    throw th;
                }
            } catch (InterruptedException e) {
                throw new FlowException(getName(), "thread interrupted", e);
            }
        } catch (Throwable th2) {
            this.thread = null;
            this.throwable = null;
            try {
                commitTraps();
                if (hasListeners()) {
                    Iterator<BaseFlow<Config>.SafeFlowListener> it3 = getListeners().iterator();
                    while (it3.hasNext()) {
                        it3.next().throwable = null;
                    }
                }
                this.flowStats.cleanup();
                throw th2;
            } finally {
            }
        }
    }

    private void commitTraps() {
        for (Tap tap : this.traps.values()) {
            try {
                if (!tap.commitResource(getConfig())) {
                    logError("unable to commit trap: " + tap.getFullIdentifier((Tap) getConfig()), null);
                }
            } catch (IOException e) {
                logError("unable to commit trap: " + tap.getFullIdentifier((Tap) getConfig()), e);
            }
        }
    }

    @Override // cascading.flow.Flow, cascading.management.UnitOfWork
    @ProcessCleanup
    public void cleanup() {
    }

    @Override // cascading.flow.Flow
    public TupleEntryIterator openSource() throws IOException {
        return this.sources.values().iterator().next().openForRead(getFlowProcess());
    }

    @Override // cascading.flow.Flow
    public TupleEntryIterator openSource(String str) throws IOException {
        if (this.sources.containsKey(str)) {
            return this.sources.get(str).openForRead(getFlowProcess());
        }
        throw new IllegalArgumentException("source does not exist: " + str);
    }

    @Override // cascading.flow.Flow
    public TupleEntryIterator openSink() throws IOException {
        return this.sinks.values().iterator().next().openForRead(getFlowProcess());
    }

    @Override // cascading.flow.Flow
    public TupleEntryIterator openSink(String str) throws IOException {
        if (this.sinks.containsKey(str)) {
            return this.sinks.get(str).openForRead(getFlowProcess());
        }
        throw new IllegalArgumentException("sink does not exist: " + str);
    }

    @Override // cascading.flow.Flow
    public TupleEntryIterator openTrap() throws IOException {
        return this.traps.values().iterator().next().openForRead(getFlowProcess());
    }

    @Override // cascading.flow.Flow
    public TupleEntryIterator openTrap(String str) throws IOException {
        if (this.traps.containsKey(str)) {
            return this.traps.get(str).openForRead(getFlowProcess());
        }
        throw new IllegalArgumentException("trap does not exist: " + str);
    }

    public void deleteSinks() throws IOException {
        Iterator<Tap> it = this.sinks.values().iterator();
        while (it.hasNext()) {
            deleteOrFail(it.next());
        }
    }

    private void deleteOrFail(Tap tap) throws IOException {
        if (tap.resourceExists((Tap) getConfig()) && !tap.deleteResource((Tap) getConfig())) {
            throw new FlowException("unable to delete resource: " + tap.getFullIdentifier((FlowProcess) getFlowProcess()));
        }
    }

    public void deleteSinksIfNotUpdate() throws IOException {
        for (Tap tap : this.sinks.values()) {
            if (!tap.isUpdate()) {
                deleteOrFail(tap);
            }
        }
    }

    public void deleteSinksIfReplace() throws IOException {
        for (Tap tap : this.sinks.values()) {
            if (tap.isReplace()) {
                deleteOrFail(tap);
            }
        }
    }

    public void deleteTrapsIfNotUpdate() throws IOException {
        for (Tap tap : this.traps.values()) {
            if (!tap.isUpdate()) {
                deleteOrFail(tap);
            }
        }
    }

    public void deleteCheckpointsIfNotUpdate() throws IOException {
        for (Tap tap : this.checkpoints.values()) {
            if (!tap.isUpdate()) {
                deleteOrFail(tap);
            }
        }
    }

    public void deleteTrapsIfReplace() throws IOException {
        for (Tap tap : this.traps.values()) {
            if (tap.isReplace()) {
                deleteOrFail(tap);
            }
        }
    }

    public void deleteCheckpointsIfReplace() throws IOException {
        for (Tap tap : this.checkpoints.values()) {
            if (tap.isReplace()) {
                deleteOrFail(tap);
            }
        }
    }

    @Override // cascading.flow.Flow
    public boolean resourceExists(Tap tap) throws IOException {
        return tap.resourceExists((Tap) getConfig());
    }

    @Override // cascading.flow.Flow
    public TupleEntryIterator openTapForRead(Tap tap) throws IOException {
        return tap.openForRead(getFlowProcess());
    }

    @Override // cascading.flow.Flow
    public TupleEntryCollector openTapForWrite(Tap tap) throws IOException {
        return tap.openForWrite(getFlowProcess());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void run() {
        if (this.thread == null) {
            throw new IllegalStateException("to start a Flow call start() or complete(), not Runnable#run()");
        }
        Version.printBanner();
        Update.checkForUpdate(getPlatformInfo());
        try {
            try {
                if (this.stop) {
                    handleThrowableAndMarkFailed();
                    if (!this.stop && !this.flowStats.isFinished()) {
                        this.flowStats.markSuccessful();
                    }
                    internalClean(this.stop);
                    try {
                        fireOnCompleted();
                        this.flowStats.cleanup();
                        internalShutdown();
                        deregisterShutdownHook();
                        return;
                    } finally {
                    }
                }
                this.flowStats.markStarted();
                fireOnStarting();
                if (LOG.isInfoEnabled()) {
                    logInfo("starting");
                    Iterator<Tap> it = getSourcesCollection().iterator();
                    while (it.hasNext()) {
                        logInfo(" source: " + it.next());
                    }
                    Iterator<Tap> it2 = getSinksCollection().iterator();
                    while (it2.hasNext()) {
                        logInfo(" sink: " + it2.next());
                    }
                }
                int maxNumParallelSteps = getMaxNumParallelSteps();
                if (maxNumParallelSteps == 0) {
                    maxNumParallelSteps = this.jobsMap.size();
                }
                if (maxNumParallelSteps == 0) {
                    throw new IllegalStateException("no jobs rendered for flow: " + getName());
                }
                if (LOG.isInfoEnabled()) {
                    logInfo(" parallel execution is enabled: " + (getMaxNumParallelSteps() != 1));
                    logInfo(" starting jobs: " + this.jobsMap.size());
                    logInfo(" allocating threads: " + maxNumParallelSteps);
                }
                Iterator<Future<Throwable>> it3 = spawnJobs(maxNumParallelSteps).iterator();
                while (true) {
                    if (!it3.hasNext()) {
                        break;
                    }
                    this.throwable = it3.next().get();
                    if (this.throwable != null) {
                        if (!this.stop) {
                            internalStopAllJobs();
                        }
                        handleExecutorShutdown();
                    }
                }
                handleThrowableAndMarkFailed();
                if (!this.stop && !this.flowStats.isFinished()) {
                    this.flowStats.markSuccessful();
                }
                internalClean(this.stop);
                try {
                    fireOnCompleted();
                    this.flowStats.cleanup();
                    internalShutdown();
                    deregisterShutdownHook();
                } finally {
                }
            } catch (Throwable th) {
                this.throwable = th;
                handleThrowableAndMarkFailed();
                if (!this.stop && !this.flowStats.isFinished()) {
                    this.flowStats.markSuccessful();
                }
                internalClean(this.stop);
                try {
                    fireOnCompleted();
                    this.flowStats.cleanup();
                    internalShutdown();
                    deregisterShutdownHook();
                } finally {
                    this.flowStats.cleanup();
                    internalShutdown();
                    deregisterShutdownHook();
                }
            }
        } catch (Throwable th2) {
            handleThrowableAndMarkFailed();
            if (!this.stop && !this.flowStats.isFinished()) {
                this.flowStats.markSuccessful();
            }
            internalClean(this.stop);
            try {
                fireOnCompleted();
                this.flowStats.cleanup();
                internalShutdown();
                deregisterShutdownHook();
                throw th2;
            } finally {
                this.flowStats.cleanup();
                internalShutdown();
                deregisterShutdownHook();
            }
        }
    }

    protected abstract int getMaxNumParallelSteps();

    protected abstract void internalShutdown();

    private List<Future<Throwable>> spawnJobs(int i) throws InterruptedException {
        if (this.stop) {
            return new ArrayList();
        }
        ArrayList arrayList = new ArrayList();
        Iterator<FlowStepJob<Config>> it = this.jobsMap.values().iterator();
        while (it.hasNext()) {
            arrayList.add(it.next());
        }
        return this.spawnStrategy.start(this, i, arrayList);
    }

    private void handleThrowableAndMarkFailed() {
        if (this.throwable == null || this.stop) {
            return;
        }
        this.flowStats.markFailed(this.throwable);
        fireOnThrowable();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Map<String, FlowStepJob<Config>> getJobsMap() {
        return this.jobsMap;
    }

    protected void initializeNewJobsMap() {
        this.jobsMap = new LinkedHashMap();
        TopologicalOrderIterator<FlowStep<Config>, Integer> topologicalIterator = this.flowStepGraph.getTopologicalIterator();
        while (topologicalIterator.hasNext()) {
            BaseFlowStep baseFlowStep = (BaseFlowStep) topologicalIterator.next();
            FlowStepJob flowStepJob = baseFlowStep.getFlowStepJob(getFlowProcess(), getConfig());
            this.jobsMap.put(baseFlowStep.getName(), flowStepJob);
            ArrayList arrayList = new ArrayList();
            Iterator it = Graphs.predecessorListOf(this.flowStepGraph, baseFlowStep).iterator();
            while (it.hasNext()) {
                arrayList.add(this.jobsMap.get(((FlowStep) it.next()).getName()));
            }
            flowStepJob.setPredecessors(arrayList);
            this.flowStats.addStepStats(flowStepJob.getStepStats());
        }
    }

    protected void internalStopAllJobs() {
        logInfo("stopping all jobs");
        try {
            if (this.jobsMap == null) {
                return;
            }
            ArrayList arrayList = new ArrayList(this.jobsMap.values());
            Collections.reverse(arrayList);
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                ((FlowStepJob) it.next()).stop();
            }
            logInfo("stopped all jobs");
        } finally {
            logInfo("stopped all jobs");
        }
    }

    protected void handleExecutorShutdown() {
        if (this.spawnStrategy.isCompleted(this)) {
            return;
        }
        logInfo("shutting down job executor");
        try {
            this.spawnStrategy.complete(this, 300, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
        }
        logInfo("shutdown complete");
    }

    protected void fireOnCompleted() {
        if (hasListeners()) {
            if (LOG.isDebugEnabled()) {
                logDebug("firing onCompleted event: " + getListeners().size());
            }
            Iterator<BaseFlow<Config>.SafeFlowListener> it = getListeners().iterator();
            while (it.hasNext()) {
                it.next().onCompleted(this);
            }
        }
    }

    protected void fireOnThrowable() {
        if (hasListeners()) {
            if (LOG.isDebugEnabled()) {
                logDebug("firing onThrowable event: " + getListeners().size());
            }
            boolean z = false;
            Iterator<BaseFlow<Config>.SafeFlowListener> it = getListeners().iterator();
            while (it.hasNext()) {
                z = it.next().onThrowable(this, this.throwable) || z;
            }
            if (z) {
                this.throwable = null;
            }
        }
    }

    protected void fireOnStopping() {
        if (hasListeners()) {
            if (LOG.isDebugEnabled()) {
                logDebug("firing onStopping event: " + getListeners().size());
            }
            Iterator<BaseFlow<Config>.SafeFlowListener> it = getListeners().iterator();
            while (it.hasNext()) {
                it.next().onStopping(this);
            }
        }
    }

    protected void fireOnStarting() {
        if (hasListeners()) {
            if (LOG.isDebugEnabled()) {
                logDebug("firing onStarting event: " + getListeners().size());
            }
            Iterator<BaseFlow<Config>.SafeFlowListener> it = getListeners().iterator();
            while (it.hasNext()) {
                it.next().onStarting(this);
            }
        }
    }

    public String toString() {
        StringBuffer stringBuffer = new StringBuffer();
        if (getName() != null) {
            stringBuffer.append(getName()).append(": ");
        }
        Iterator<FlowStep<Config>> it = getFlowSteps().iterator();
        while (it.hasNext()) {
            stringBuffer.append(it.next());
        }
        return stringBuffer.toString();
    }

    protected void logInfo(String str) {
        LOG.info("[" + Util.truncate(getName(), 25) + "] " + str);
    }

    private void logDebug(String str) {
        LOG.debug("[" + Util.truncate(getName(), 25) + "] " + str);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void logWarn(String str, Throwable th) {
        LOG.warn("[" + Util.truncate(getName(), 25) + "] " + str, th);
    }

    private void logError(String str, Throwable th) {
        LOG.error("[" + Util.truncate(getName(), 25) + "] " + str, th);
    }

    @Override // cascading.flow.Flow
    public void writeDOT(String str) {
        if (this.pipeGraph == null) {
            throw new UnsupportedOperationException("this flow instance cannot write a DOT file");
        }
        this.pipeGraph.writeDOT(str);
    }

    @Override // cascading.flow.Flow
    public void writeStepsDOT(String str) {
        if (this.flowStepGraph == null) {
            throw new UnsupportedOperationException("this flow instance cannot write a DOT file");
        }
        this.flowStepGraph.writeDOT(str);
    }

    public FlowHolder getHolder() {
        return new FlowHolder(this);
    }

    public void setCascade(Cascade cascade) {
        setConfigProperty(getConfig(), "cascading.cascade.id", cascade.getID());
        this.flowStats.recordInfo();
    }

    @Override // cascading.flow.Flow
    public String getCascadeID() {
        return getProperty("cascading.cascade.id");
    }

    @Override // cascading.flow.Flow
    public String getRunID() {
        return this.runID;
    }

    protected List<String> getClassPath() {
        return this.classPath;
    }

    @Override // cascading.management.UnitOfWork
    public void setSpawnStrategy(UnitOfWorkSpawnStrategy unitOfWorkSpawnStrategy) {
        this.spawnStrategy = unitOfWorkSpawnStrategy;
    }

    @Override // cascading.management.UnitOfWork
    public UnitOfWorkSpawnStrategy getSpawnStrategy() {
        return this.spawnStrategy;
    }

    protected void registerShutdownHook() {
        if (isStopJobsOnExit()) {
            this.shutdownHook = new ShutdownUtil.Hook() { // from class: cascading.flow.BaseFlow.2
                @Override // cascading.util.ShutdownUtil.Hook
                public ShutdownUtil.Hook.Priority priority() {
                    return ShutdownUtil.Hook.Priority.WORK_CHILD;
                }

                @Override // cascading.util.ShutdownUtil.Hook
                public void execute() {
                    BaseFlow.this.logInfo("shutdown hook calling stop on flow");
                    BaseFlow.this.stop();
                }
            };
            ShutdownUtil.addHook(this.shutdownHook);
        }
    }

    private void deregisterShutdownHook() {
        if (!isStopJobsOnExit() || this.stop) {
            return;
        }
        ShutdownUtil.removeHook(this.shutdownHook);
    }
}
