package org.apache.flink.streaming.tests;

import java.io.IOException;
import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.lang.management.ManagementFactory;
import java.lang.management.RuntimeMXBean;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;
import sun.management.VMManagement;

/* loaded from: input_file:org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.class */
public class StickyAllocationAndLocalRecoveryTestJob {

    /* loaded from: input_file:org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob$MapperSchedulingAndFailureInfo.class */
    public static class MapperSchedulingAndFailureInfo implements Serializable {
        private static final long serialVersionUID = 1;
        final boolean failingTask;
        final boolean killedJvm;
        final int jvmPid;
        final String taskNameWithSubtask;
        final String allocationId;

        MapperSchedulingAndFailureInfo(boolean z, boolean z2, int i, String str, String str2) {
            this.failingTask = z;
            this.killedJvm = z2;
            this.jvmPid = i;
            this.taskNameWithSubtask = str;
            this.allocationId = str2;
        }

        public String toString() {
            return "MapperTestInfo{failingTask=" + this.failingTask + ", killedJvm=" + this.killedJvm + ", jvmPid=" + this.jvmPid + ", taskNameWithSubtask='" + this.taskNameWithSubtask + "', allocationId='" + this.allocationId + "'}";
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob$RandomLongSource.class */
    private static final class RandomLongSource extends RichParallelSourceFunction<Long> implements CheckpointedFunction {
        private static final long serialVersionUID = 1;
        final long delay;
        final int maxAttempts;
        transient ListState<Long> sourceCurrentKeyState;
        long currentKey;
        volatile boolean running = true;

        RandomLongSource(int i, long j) {
            this.delay = j;
            this.maxAttempts = i;
        }

        public void run(SourceFunction.SourceContext<Long> sourceContext) throws Exception {
            int numberOfParallelSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
            int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
            if (getRuntimeContext().getAttemptNumber() > this.maxAttempts) {
                synchronized (sourceContext.getCheckpointLock()) {
                    sourceContext.collect(Long.valueOf(Long.MAX_VALUE - indexOfThisSubtask));
                }
                return;
            }
            while (this.running) {
                synchronized (sourceContext.getCheckpointLock()) {
                    sourceContext.collect(Long.valueOf(this.currentKey));
                    this.currentKey += numberOfParallelSubtasks;
                }
                if (this.delay > 0) {
                    Thread.sleep(this.delay);
                }
            }
        }

        public void cancel() {
            this.running = false;
        }

        public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
            this.sourceCurrentKeyState.clear();
            this.sourceCurrentKeyState.add(Long.valueOf(this.currentKey));
        }

        public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
            this.sourceCurrentKeyState = functionInitializationContext.getOperatorStateStore().getListState(new ListStateDescriptor("currentKey", Long.class));
            this.currentKey = getRuntimeContext().getIndexOfThisSubtask();
            Iterable iterable = (Iterable) this.sourceCurrentKeyState.get();
            if (iterable != null) {
                Iterator it = iterable.iterator();
                if (it.hasNext()) {
                    this.currentKey = ((Long) it.next()).longValue();
                    Preconditions.checkState(!it.hasNext());
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob$StateCreatingFlatMap.class */
    private static final class StateCreatingFlatMap extends RichFlatMapFunction<Long, String> implements CheckpointedFunction, CheckpointListener {
        private static final long serialVersionUID = 1;
        final int valueSize;
        final boolean killTaskOnFailure;
        transient ValueState<String> valueState;
        transient ListState<MapperSchedulingAndFailureInfo> schedulingAndFailureState;
        transient MapperSchedulingAndFailureInfo currentSchedulingAndFailureInfo;
        volatile transient boolean failTask = false;
        volatile transient String allocationFailureMessage = null;

        StateCreatingFlatMap(int i, boolean z) {
            this.valueSize = i;
            this.killTaskOnFailure = z;
        }

        public void flatMap(Long l, Collector<String> collector) throws IOException {
            if (this.allocationFailureMessage != null) {
                collector.collect(this.allocationFailureMessage);
                this.allocationFailureMessage = null;
            }
            if (this.failTask) {
                if (!this.killTaskOnFailure) {
                    throw new RuntimeException("Artificial user code exception.");
                }
                Runtime.getRuntime().halt(-1);
            }
            if (null != this.valueState.value()) {
                throw new IllegalStateException("This should never happen, keys are generated monotonously.");
            }
            this.valueState.update(RandomStringUtils.random(this.valueSize, true, true));
        }

        public void snapshotState(FunctionSnapshotContext functionSnapshotContext) {
        }

        public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
            this.valueState = functionInitializationContext.getKeyedStateStore().getState(new ValueStateDescriptor("state", String.class));
            this.schedulingAndFailureState = functionInitializationContext.getOperatorStateStore().getUnionListState(new ListStateDescriptor("mapperState", MapperSchedulingAndFailureInfo.class));
            StreamingRuntimeContext runtimeContext = getRuntimeContext();
            String allocationIDAsString = runtimeContext.getAllocationIDAsString();
            String str = runtimeContext.getTaskNameWithSubtasks().split("#")[0];
            int access$000 = StickyAllocationAndLocalRecoveryTestJob.access$000();
            HashSet hashSet = new HashSet();
            if (functionInitializationContext.isRestored()) {
                Iterable<MapperSchedulingAndFailureInfo> iterable = (Iterable) this.schedulingAndFailureState.get();
                MapperSchedulingAndFailureInfo mapperSchedulingAndFailureInfo = null;
                ArrayList arrayList = new ArrayList();
                if (iterable != null) {
                    for (MapperSchedulingAndFailureInfo mapperSchedulingAndFailureInfo2 : iterable) {
                        arrayList.add(mapperSchedulingAndFailureInfo2);
                        if (str.equals(mapperSchedulingAndFailureInfo2.taskNameWithSubtask)) {
                            mapperSchedulingAndFailureInfo = mapperSchedulingAndFailureInfo2;
                        }
                        if (mapperSchedulingAndFailureInfo2.killedJvm) {
                            hashSet.add(Integer.valueOf(mapperSchedulingAndFailureInfo2.jvmPid));
                        }
                    }
                }
                Preconditions.checkNotNull(mapperSchedulingAndFailureInfo, "Expected to find info here.");
                if (!isScheduledToCorrectAllocation(mapperSchedulingAndFailureInfo, allocationIDAsString, hashSet)) {
                    this.allocationFailureMessage = String.format("Sticky allocation test failed: Subtask %s in attempt %d was rescheduled from allocation %s on JVM with PID %d to unexpected allocation %s on JVM with PID %d.\nComplete information from before the crash: %s.", str, Integer.valueOf(runtimeContext.getAttemptNumber()), mapperSchedulingAndFailureInfo.allocationId, Integer.valueOf(mapperSchedulingAndFailureInfo.jvmPid), allocationIDAsString, Integer.valueOf(access$000), arrayList);
                }
            }
            boolean shouldTaskFailForThisAttempt = shouldTaskFailForThisAttempt();
            this.currentSchedulingAndFailureInfo = new MapperSchedulingAndFailureInfo(shouldTaskFailForThisAttempt, shouldTaskFailForThisAttempt && this.killTaskOnFailure, access$000, str, allocationIDAsString);
            this.schedulingAndFailureState.clear();
            this.schedulingAndFailureState.add(this.currentSchedulingAndFailureInfo);
        }

        public void notifyCheckpointComplete(long j) {
            this.failTask = this.currentSchedulingAndFailureInfo.failingTask;
        }

        public void notifyCheckpointAborted(long j) {
        }

        private boolean shouldTaskFailForThisAttempt() {
            RuntimeContext runtimeContext = getRuntimeContext();
            return runtimeContext.getAttemptNumber() % runtimeContext.getNumberOfParallelSubtasks() == runtimeContext.getIndexOfThisSubtask();
        }

        private boolean isScheduledToCorrectAllocation(MapperSchedulingAndFailureInfo mapperSchedulingAndFailureInfo, String str, Set<Integer> set) {
            return mapperSchedulingAndFailureInfo.allocationId.equals(str) || set.contains(Integer.valueOf(mapperSchedulingAndFailureInfo.jvmPid));
        }

        public /* bridge */ /* synthetic */ void flatMap(Object obj, Collector collector) throws Exception {
            flatMap((Long) obj, (Collector<String>) collector);
        }
    }

    public static void main(String[] strArr) throws Exception {
        ParameterTool fromArgs = ParameterTool.fromArgs(strArr);
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(fromArgs.getInt("parallelism", 1));
        executionEnvironment.setMaxParallelism(fromArgs.getInt("maxParallelism", fromArgs.getInt("parallelism", 1)));
        executionEnvironment.enableCheckpointing(fromArgs.getInt("checkpointInterval", 1000));
        executionEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, fromArgs.getInt("restartDelay", 0)));
        if (fromArgs.getBoolean("externalizedCheckpoints", false)) {
            executionEnvironment.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        }
        executionEnvironment.getCheckpointConfig().setCheckpointStorage(fromArgs.getRequired("checkpointDir"));
        boolean z = fromArgs.getBoolean("killJvmOnFail", false);
        String str = fromArgs.get("stateBackend", "hashmap");
        if ("hashmap".equals(str)) {
            executionEnvironment.setStateBackend(new HashMapStateBackend());
        } else {
            if (!"rocks".equals(str)) {
                throw new IllegalArgumentException("Unknown backend: " + str);
            }
            executionEnvironment.setStateBackend(new EmbeddedRocksDBStateBackend(fromArgs.getBoolean("incrementalCheckpoints", false)));
        }
        executionEnvironment.getConfig().setGlobalJobParameters(fromArgs);
        executionEnvironment.addSource(new RandomLongSource(fromArgs.getInt("maxAttempts", 3), fromArgs.getLong("delay", 0L))).keyBy(l -> {
            return l;
        }).flatMap(new StateCreatingFlatMap(fromArgs.getInt("valueSize", 10), z)).addSink(new PrintSinkFunction());
        executionEnvironment.execute("Sticky Allocation And Local Recovery Test");
    }

    private static int getJvmPid() throws Exception {
        RuntimeMXBean runtimeMXBean = ManagementFactory.getRuntimeMXBean();
        Field declaredField = runtimeMXBean.getClass().getDeclaredField("jvm");
        declaredField.setAccessible(true);
        VMManagement vMManagement = (VMManagement) declaredField.get(runtimeMXBean);
        Method declaredMethod = vMManagement.getClass().getDeclaredMethod("getProcessId", new Class[0]);
        declaredMethod.setAccessible(true);
        return ((Integer) declaredMethod.invoke(vMManagement, new Object[0])).intValue();
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -2079664822:
                if (implMethodName.equals("lambda$main$64273719$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Long;)Ljava/lang/Long;")) {
                    return l -> {
                        return l;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }

    static /* synthetic */ int access$000() throws Exception {
        return getJvmPid();
    }
}
