package org.apache.flink.test.recovery;

import java.io.IOException;
import java.io.Serializable;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.LongStream;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.api.common.ExecutionMode;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.functions.RichMapPartitionFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.Operator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.UnmodifiableConfiguration;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusOverview;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.rest.RestClient;
import org.apache.flink.runtime.rest.RestClientConfiguration;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobIdsWithStatusesOverviewHeaders;
import org.apache.flink.runtime.rest.messages.JobMessageParameters;
import org.apache.flink.runtime.rest.messages.JobVertexDetailsHeaders;
import org.apache.flink.runtime.rest.messages.JobVertexDetailsInfo;
import org.apache.flink.runtime.rest.messages.JobVertexMessageParameters;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.MessageParameters;
import org.apache.flink.runtime.rest.messages.ResponseBody;
import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders;
import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
import org.apache.flink.runtime.rest.messages.job.SubtaskExecutionAttemptDetailsInfo;
import org.apache.flink.runtime.testutils.MiniClusterResource;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.test.util.TestEnvironment;
import org.apache.flink.util.Collector;
import org.apache.flink.util.ConfigurationException;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.TemporaryClassLoaderContext;
import org.apache.flink.util.TestLogger;
import org.hamcrest.CoreMatchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/test/recovery/BatchFineGrainedRecoveryITCase.class */
public class BatchFineGrainedRecoveryITCase extends TestLogger {
    private static final int EMITTED_RECORD_NUMBER = 1000;
    private static final String MAP_PARTITION_TEST_PARTITION_MAPPER = "MapPartition (Test partition mapper ";
    private static final String TASK_NAME_PREFIX = "Test partition mapper ";
    private static MiniCluster miniCluster;
    private static MiniClusterClient client;
    private static AtomicInteger lastTaskManagerIndexInMiniCluster;
    private static GlobalMapFailureTracker failureTracker;
    private static final Logger LOG = LoggerFactory.getLogger(BatchFineGrainedRecoveryITCase.class);
    private static final Pattern MAPPER_NUMBER_IN_TASK_NAME_PATTERN = Pattern.compile("MapPartition \\(Test partition mapper (\\d+)\\)");
    private static final int ALL_MAPPERS_BACKTRACK_FAILURES = IntStream.range(0, 4).sum();
    private static final int MAX_JOB_RESTART_ATTEMPTS = ALL_MAPPERS_BACKTRACK_FAILURES + 6;
    private static final int MAP_NUMBER = 3;
    private static final int[] EXPECTED_MAP_ATTEMPT_NUMBERS = IntStream.range(0, MAP_NUMBER).map(i -> {
        return 1 + ((MAP_NUMBER - i) - 1) + 1 + 1;
    }).toArray();
    private static final List<Long> EXPECTED_JOB_OUTPUT = (List) LongStream.range(3, 1003).boxed().collect(Collectors.toList());

    @ClassRule
    public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(createConfiguration()).setNumberTaskManagers(1).setNumberSlotsPerTaskManager(1).build());
    private static final Random rnd = new Random();

    /* loaded from: input_file:org/apache/flink/test/recovery/BatchFineGrainedRecoveryITCase$AbstractOnceAfterCallNumberFailureStrategy.class */
    private static abstract class AbstractOnceAfterCallNumberFailureStrategy implements FailureStrategy {
        private static final long serialVersionUID = 1;
        private final UUID id;
        private final int failAfterCallNumber;
        private transient int callCounter;

        private AbstractOnceAfterCallNumberFailureStrategy(int i) {
            this.failAfterCallNumber = i;
            this.id = UUID.randomUUID();
        }

        @Override // org.apache.flink.test.recovery.BatchFineGrainedRecoveryITCase.FailureStrategy
        public boolean failOrNot(int i) throws Exception {
            this.callCounter++;
            boolean z = this.callCounter == this.failAfterCallNumber;
            if (z) {
                fail(i);
            }
            return z;
        }

        abstract void fail(int i) throws Exception;

        public String toString() {
            return getClass().getSimpleName() + " (fail after " + this.failAfterCallNumber + " calls)";
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            return Objects.equals(this.id, ((AbstractOnceAfterCallNumberFailureStrategy) obj).id);
        }

        public int hashCode() {
            return this.id.hashCode();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/test/recovery/BatchFineGrainedRecoveryITCase$ExceptionFailureStrategy.class */
    public static class ExceptionFailureStrategy extends AbstractOnceAfterCallNumberFailureStrategy {
        private static final long serialVersionUID = 1;

        private ExceptionFailureStrategy(int i) {
            super(i);
        }

        @Override // org.apache.flink.test.recovery.BatchFineGrainedRecoveryITCase.AbstractOnceAfterCallNumberFailureStrategy
        void fail(int i) throws FlinkException {
            throw new FlinkException("BAGA-BOOM!!! The user function generated test failure.");
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    @FunctionalInterface
    /* loaded from: input_file:org/apache/flink/test/recovery/BatchFineGrainedRecoveryITCase$FailureStrategy.class */
    public interface FailureStrategy extends Serializable {
        boolean failOrNot(int i) throws Exception;
    }

    /* loaded from: input_file:org/apache/flink/test/recovery/BatchFineGrainedRecoveryITCase$GlobalMapFailureTracker.class */
    private static class GlobalMapFailureTracker {
        private final List<Set<FailureStrategy>> mapFailures;
        private final Object classLock;

        @GuardedBy("classLock")
        private Throwable unexpectedFailure;

        private GlobalMapFailureTracker(int i) {
            this.classLock = new Object();
            this.mapFailures = new ArrayList(i);
            IntStream.range(0, i).forEach(i2 -> {
                addNewMapper();
            });
        }

        private int addNewMapper() {
            this.mapFailures.add(new HashSet(2));
            return this.mapFailures.size() - 1;
        }

        /* JADX INFO: Access modifiers changed from: private */
        /* JADX WARN: Removed duplicated region for block: B:8:0x0031  */
        /*
            Code decompiled incorrectly, please refer to instructions dump.
            To view partially-correct add '--show-bad-code' argument
        */
        public boolean failOrNot(int r4, org.apache.flink.test.recovery.BatchFineGrainedRecoveryITCase.FailureStrategy r5) throws java.lang.Exception {
            /*
                r3 = this;
                r0 = r3
                java.util.List<java.util.Set<org.apache.flink.test.recovery.BatchFineGrainedRecoveryITCase$FailureStrategy>> r0 = r0.mapFailures
                r1 = r4
                java.lang.Object r0 = r0.get(r1)
                java.util.Set r0 = (java.util.Set) r0
                r1 = r5
                boolean r0 = r0.contains(r1)
                r6 = r0
                r0 = 0
                r7 = r0
                r0 = r6
                if (r0 != 0) goto L29
                r0 = r5
                r1 = r4
                boolean r0 = r0.failOrNot(r1)     // Catch: java.lang.Exception -> L48 java.lang.Throwable -> L50
                if (r0 == 0) goto L29
                r0 = 1
                goto L2a
            L29:
                r0 = 0
            L2a:
                r7 = r0
                r0 = r7
                if (r0 == 0) goto L6e
                r0 = r3
                java.util.List<java.util.Set<org.apache.flink.test.recovery.BatchFineGrainedRecoveryITCase$FailureStrategy>> r0 = r0.mapFailures
                r1 = r4
                java.lang.Object r0 = r0.get(r1)
                java.util.Set r0 = (java.util.Set) r0
                r1 = r5
                boolean r0 = r0.add(r1)
                goto L6e
            L48:
                r8 = move-exception
                r0 = 1
                r7 = r0
                r0 = r8
                throw r0     // Catch: java.lang.Throwable -> L50
            L50:
                r9 = move-exception
                r0 = r7
                if (r0 == 0) goto L6b
                r0 = r3
                java.util.List<java.util.Set<org.apache.flink.test.recovery.BatchFineGrainedRecoveryITCase$FailureStrategy>> r0 = r0.mapFailures
                r1 = r4
                java.lang.Object r0 = r0.get(r1)
                java.util.Set r0 = (java.util.Set) r0
                r1 = r5
                boolean r0 = r0.add(r1)
            L6b:
                r0 = r9
                throw r0
            L6e:
                r0 = r7
                return r0
            */
            throw new UnsupportedOperationException("Method not decompiled: org.apache.flink.test.recovery.BatchFineGrainedRecoveryITCase.GlobalMapFailureTracker.failOrNot(int, org.apache.flink.test.recovery.BatchFineGrainedRecoveryITCase$FailureStrategy):boolean");
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void unrelatedFailure(Throwable th) {
            synchronized (this.classLock) {
                this.unexpectedFailure = ExceptionUtils.firstOrSuppressed(th, this.unexpectedFailure);
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void verify(int[] iArr) {
            synchronized (this.classLock) {
                if (this.unexpectedFailure != null) {
                    throw new AssertionError("Test failed due to unexpected exception.", this.unexpectedFailure);
                }
            }
            Assert.assertThat(iArr, CoreMatchers.is(BatchFineGrainedRecoveryITCase.EXPECTED_MAP_ATTEMPT_NUMBERS));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/test/recovery/BatchFineGrainedRecoveryITCase$GloballyTrackingFailureStrategy.class */
    public static class GloballyTrackingFailureStrategy implements FailureStrategy {
        private static final long serialVersionUID = 1;
        private final FailureStrategy wrappedFailureStrategy;

        private GloballyTrackingFailureStrategy(FailureStrategy failureStrategy) {
            this.wrappedFailureStrategy = failureStrategy;
        }

        @Override // org.apache.flink.test.recovery.BatchFineGrainedRecoveryITCase.FailureStrategy
        public boolean failOrNot(int i) throws Exception {
            return BatchFineGrainedRecoveryITCase.failureTracker.failOrNot(i, this.wrappedFailureStrategy);
        }

        public String toString() {
            return "Tracked{" + this.wrappedFailureStrategy + '}';
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/test/recovery/BatchFineGrainedRecoveryITCase$InternalTaskInfo.class */
    public static class InternalTaskInfo {
        private final String name;
        private final int attempt;

        private InternalTaskInfo(String str, SubtaskExecutionAttemptDetailsInfo subtaskExecutionAttemptDetailsInfo) {
            this.name = str;
            this.attempt = subtaskExecutionAttemptDetailsInfo.getAttempt();
        }

        public String toString() {
            return this.name + " (Attempt #" + this.attempt + ')';
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/test/recovery/BatchFineGrainedRecoveryITCase$JoinedFailureStrategy.class */
    public static class JoinedFailureStrategy implements FailureStrategy {
        private static final long serialVersionUID = 1;
        private final FailureStrategy[] failureStrategies;

        private JoinedFailureStrategy(FailureStrategy... failureStrategyArr) {
            this.failureStrategies = failureStrategyArr;
        }

        @Override // org.apache.flink.test.recovery.BatchFineGrainedRecoveryITCase.FailureStrategy
        public boolean failOrNot(int i) throws Exception {
            for (FailureStrategy failureStrategy : this.failureStrategies) {
                if (failureStrategy.failOrNot(i)) {
                    return true;
                }
            }
            return false;
        }

        public String toString() {
            return String.join(" or ", (Iterable<? extends CharSequence>) () -> {
                return Arrays.stream(this.failureStrategies).map((v0) -> {
                    return v0.toString();
                }).iterator();
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/test/recovery/BatchFineGrainedRecoveryITCase$MiniClusterClient.class */
    public static class MiniClusterClient implements AutoCloseable {
        private final RestClient restClient;
        private final ExecutorService executorService;
        private final URI restAddress;

        private MiniClusterClient(MiniCluster miniCluster) throws ConfigurationException {
            this.restAddress = (URI) miniCluster.getRestAddress().join();
            this.executorService = Executors.newSingleThreadScheduledExecutor(new ExecutorThreadFactory("Flink-RestClient-IO"));
            this.restClient = createRestClient();
        }

        private RestClient createRestClient() throws ConfigurationException {
            return new RestClient(RestClientConfiguration.fromConfiguration(new UnmodifiableConfiguration(new Configuration())), this.executorService);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public List<InternalTaskInfo> getInternalTaskInfos() {
            return (List) getJobs().stream().flatMap(jobID -> {
                return getJobDetails(jobID).join().getJobVertexInfos().stream().map(jobVertexDetailsInfo -> {
                    return Tuple2.of(jobID, jobVertexDetailsInfo);
                });
            }).flatMap(tuple2 -> {
                return getJobVertexDetailsInfo((JobID) tuple2.f0, ((JobDetailsInfo.JobVertexDetailsInfo) tuple2.f1).getJobVertexID()).getSubtasks().stream().map(subtaskExecutionAttemptDetailsInfo -> {
                    return new InternalTaskInfo(((JobDetailsInfo.JobVertexDetailsInfo) tuple2.f1).getName(), subtaskExecutionAttemptDetailsInfo);
                });
            }).collect(Collectors.toList());
        }

        private Collection<JobID> getJobs() {
            return (Collection) ((JobIdsWithStatusOverview) sendRequest(JobIdsWithStatusesOverviewHeaders.getInstance(), EmptyMessageParameters.getInstance()).join()).getJobsWithStatus().stream().map((v0) -> {
                return v0.getJobId();
            }).collect(Collectors.toList());
        }

        private CompletableFuture<JobDetailsInfo> getJobDetails(JobID jobID) {
            JobMessageParameters jobMessageParameters = new JobMessageParameters();
            jobMessageParameters.jobPathParameter.resolve(jobID);
            return sendRequest(JobDetailsHeaders.getInstance(), jobMessageParameters);
        }

        private JobVertexDetailsInfo getJobVertexDetailsInfo(JobID jobID, JobVertexID jobVertexID) {
            JobVertexDetailsHeaders jobVertexDetailsHeaders = JobVertexDetailsHeaders.getInstance();
            JobVertexMessageParameters jobVertexMessageParameters = new JobVertexMessageParameters();
            jobVertexMessageParameters.jobPathParameter.resolve(jobID);
            jobVertexMessageParameters.jobVertexIdPathParameter.resolve(jobVertexID);
            return (JobVertexDetailsInfo) sendRequest(jobVertexDetailsHeaders, jobVertexMessageParameters).join();
        }

        private <M extends MessageHeaders<EmptyRequestBody, P, U>, U extends MessageParameters, P extends ResponseBody> CompletableFuture<P> sendRequest(M m, U u) {
            try {
                return this.restClient.sendRequest(this.restAddress.getHost(), this.restAddress.getPort(), m, u, EmptyRequestBody.getInstance());
            } catch (IOException e) {
                return FutureUtils.completedExceptionally(e);
            }
        }

        @Override // java.lang.AutoCloseable
        public void close() throws Exception {
            this.restClient.close();
            this.executorService.shutdownNow();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/test/recovery/BatchFineGrainedRecoveryITCase$OneTimeFailureStrategy.class */
    public static class OneTimeFailureStrategy implements FailureStrategy {
        private static final long serialVersionUID = 1;
        private final FailureStrategy wrappedFailureStrategy;
        private transient boolean failed;

        private OneTimeFailureStrategy(FailureStrategy failureStrategy) {
            this.wrappedFailureStrategy = failureStrategy;
        }

        @Override // org.apache.flink.test.recovery.BatchFineGrainedRecoveryITCase.FailureStrategy
        public boolean failOrNot(int i) throws Exception {
            if (this.failed) {
                return false;
            }
            try {
                boolean failOrNot = this.wrappedFailureStrategy.failOrNot(i);
                this.failed = failOrNot;
                return failOrNot;
            } catch (Exception e) {
                this.failed = true;
                throw e;
            }
        }

        public String toString() {
            return "FailingOnce{" + this.wrappedFailureStrategy + '}';
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/test/recovery/BatchFineGrainedRecoveryITCase$TaskExecutorFailureStrategy.class */
    public static class TaskExecutorFailureStrategy extends AbstractOnceAfterCallNumberFailureStrategy {
        private static final long serialVersionUID = 1;

        private TaskExecutorFailureStrategy(int i) {
            super(i);
        }

        @Override // org.apache.flink.test.recovery.BatchFineGrainedRecoveryITCase.AbstractOnceAfterCallNumberFailureStrategy
        void fail(int i) throws Exception {
            TemporaryClassLoaderContext of = TemporaryClassLoaderContext.of(ClassLoader.getSystemClassLoader());
            Throwable th = null;
            try {
                try {
                    BatchFineGrainedRecoveryITCase.restartTaskManager();
                } catch (Throwable th2) {
                    if (of != null) {
                        if (0 != 0) {
                            try {
                                of.close();
                            } catch (Throwable th3) {
                                th.addSuppressed(th3);
                            }
                        } else {
                            of.close();
                        }
                    }
                    throw th2;
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            } catch (Throwable th4) {
                BatchFineGrainedRecoveryITCase.failureTracker.unrelatedFailure(th4);
                throw th4;
            }
            if (of != null) {
                if (0 == 0) {
                    of.close();
                    return;
                }
                try {
                    of.close();
                } catch (Throwable th5) {
                    th.addSuppressed(th5);
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/test/recovery/BatchFineGrainedRecoveryITCase$TestPartitionMapper.class */
    private static class TestPartitionMapper extends RichMapPartitionFunction<Long, Long> {
        private static final long serialVersionUID = 1;
        private final int trackingIndex;
        private final FailureStrategy failureStrategy;

        private TestPartitionMapper(int i, FailureStrategy failureStrategy) {
            this.trackingIndex = i;
            this.failureStrategy = failureStrategy;
        }

        public void mapPartition(Iterable<Long> iterable, Collector<Long> collector) throws Exception {
            for (Long l : iterable) {
                this.failureStrategy.failOrNot(this.trackingIndex);
                collector.collect(Long.valueOf(l.longValue() + 1));
            }
        }
    }

    @Before
    public void setup() throws Exception {
        miniCluster = MINI_CLUSTER_RESOURCE.getMiniCluster();
        client = new MiniClusterClient(miniCluster);
        lastTaskManagerIndexInMiniCluster = new AtomicInteger(0);
        failureTracker = new GlobalMapFailureTracker(MAP_NUMBER);
    }

    @After
    public void teardown() throws Exception {
        if (client != null) {
            client.close();
        }
    }

    @Test
    public void testProgram() throws Exception {
        Operator generateSequence = createExecutionEnvironment().generateSequence(0L, 999L);
        for (int i = 0; i < MAP_NUMBER; i++) {
            generateSequence = generateSequence.mapPartition(new TestPartitionMapper(i, createFailureStrategy(i))).name(TASK_NAME_PREFIX + i);
        }
        Assert.assertThat(generateSequence.collect(), CoreMatchers.is(EXPECTED_JOB_OUTPUT));
        failureTracker.verify(getMapperAttempts());
    }

    private static Configuration createConfiguration() {
        Configuration configuration = new Configuration();
        configuration.setString(JobManagerOptions.EXECUTION_FAILOVER_STRATEGY, "region");
        return configuration;
    }

    private static FailureStrategy createFailureStrategy(int i) {
        OneTimeFailureStrategy oneTimeFailureStrategy = new OneTimeFailureStrategy(new JoinedFailureStrategy(new FailureStrategy[]{new GloballyTrackingFailureStrategy(new ExceptionFailureStrategy(rnd.nextInt(EMITTED_RECORD_NUMBER) + 1)), new GloballyTrackingFailureStrategy(new TaskExecutorFailureStrategy(rnd.nextInt(EMITTED_RECORD_NUMBER) + 1))}));
        LOG.info("FailureStrategy for the mapper {}: {}", Integer.valueOf(i), oneTimeFailureStrategy);
        return oneTimeFailureStrategy;
    }

    private static ExecutionEnvironment createExecutionEnvironment() {
        TestEnvironment testEnvironment = new TestEnvironment(miniCluster, 1, true);
        testEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(MAX_JOB_RESTART_ATTEMPTS, Time.milliseconds(10L)));
        testEnvironment.getConfig().setExecutionMode(ExecutionMode.BATCH_FORCED);
        return testEnvironment;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void restartTaskManager() throws Exception {
        try {
            miniCluster.terminateTaskManager(lastTaskManagerIndexInMiniCluster.getAndIncrement()).get();
            miniCluster.startTaskManager();
        } catch (Throwable th) {
            miniCluster.startTaskManager();
            throw th;
        }
    }

    private static int[] getMapperAttempts() {
        int[] iArr = new int[MAP_NUMBER];
        client.getInternalTaskInfos().stream().filter(internalTaskInfo -> {
            return internalTaskInfo.name.startsWith(MAP_PARTITION_TEST_PARTITION_MAPPER);
        }).forEach(internalTaskInfo2 -> {
            iArr[parseMapperNumberFromTaskName(internalTaskInfo2.name)] = internalTaskInfo2.attempt;
        });
        return iArr;
    }

    private static int parseMapperNumberFromTaskName(String str) {
        Matcher matcher = MAPPER_NUMBER_IN_TASK_NAME_PATTERN.matcher(str);
        if (matcher.matches()) {
            return Integer.parseInt(matcher.group(1));
        }
        throw new FlinkRuntimeException("Failed to find mapper number in its task name: " + str);
    }
}
