package org.apache.flink.runtime.entrypoint;

import java.io.File;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import javax.annotation.Nullable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.SchedulerExecutionMode;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.dispatcher.ExecutionGraphInfoStore;
import org.apache.flink.runtime.dispatcher.MemoryExecutionGraphInfoStore;
import org.apache.flink.runtime.dispatcher.PartialDispatcherServices;
import org.apache.flink.runtime.dispatcher.runner.DispatcherRunner;
import org.apache.flink.runtime.dispatcher.runner.DispatcherRunnerFactory;
import org.apache.flink.runtime.dispatcher.runner.TestingDispatcherRunner;
import org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory;
import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServicesBuilder;
import org.apache.flink.runtime.io.network.partition.NoOpResourceManagerPartitionTracker;
import org.apache.flink.runtime.jobmanager.JobGraphStoreFactory;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup;
import org.apache.flink.runtime.resourcemanager.ArbitraryWorkerResourceSpecFactory;
import org.apache.flink.runtime.resourcemanager.ResourceManager;
import org.apache.flink.runtime.resourcemanager.ResourceManagerFactory;
import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
import org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory;
import org.apache.flink.runtime.resourcemanager.TestingJobLeaderIdService;
import org.apache.flink.runtime.resourcemanager.TestingResourceManager;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerBuilder;
import org.apache.flink.runtime.rest.SessionRestEndpointFactory;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.testutils.TestJvmProcess;
import org.apache.flink.runtime.testutils.TestingClusterEntrypointProcess;
import org.apache.flink.runtime.util.SignalHandler;
import org.apache.flink.testutils.executor.TestExecutorResource;
import org.apache.flink.util.ConfigurationException;
import org.apache.flink.util.OperatingSystem;
import org.apache.flink.util.TestLogger;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/flink/runtime/entrypoint/ClusterEntrypointTest.class */
public class ClusterEntrypointTest extends TestLogger {
    private static final long TIMEOUT_MS = 10000;
    private Configuration flinkConfig;

    @ClassRule
    public static final TestExecutorResource<?> TEST_EXECUTOR_RESOURCE = new TestExecutorResource<>(Executors::newSingleThreadExecutor);

    @ClassRule
    public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/entrypoint/ClusterEntrypointTest$TestingDispatcherRunnerFactory.class */
    public static class TestingDispatcherRunnerFactory implements DispatcherRunnerFactory {
        private final CompletableFuture<ApplicationStatus> shutDownFuture;

        /* loaded from: input_file:org/apache/flink/runtime/entrypoint/ClusterEntrypointTest$TestingDispatcherRunnerFactory$Builder.class */
        public static final class Builder {
            private CompletableFuture<ApplicationStatus> shutDownFuture = new CompletableFuture<>();

            public Builder setShutDownFuture(CompletableFuture<ApplicationStatus> completableFuture) {
                this.shutDownFuture = completableFuture;
                return this;
            }

            public TestingDispatcherRunnerFactory build() {
                return new TestingDispatcherRunnerFactory(this.shutDownFuture);
            }
        }

        private TestingDispatcherRunnerFactory(CompletableFuture<ApplicationStatus> completableFuture) {
            this.shutDownFuture = completableFuture;
        }

        public DispatcherRunner createDispatcherRunner(LeaderElectionService leaderElectionService, FatalErrorHandler fatalErrorHandler, JobGraphStoreFactory jobGraphStoreFactory, Executor executor, RpcService rpcService, PartialDispatcherServices partialDispatcherServices) throws Exception {
            return TestingDispatcherRunner.newBuilder().setShutDownFuture(this.shutDownFuture).build();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/entrypoint/ClusterEntrypointTest$TestingEntryPoint.class */
    public static class TestingEntryPoint extends ClusterEntrypoint {
        private final HighAvailabilityServices haService;
        private final ResourceManagerFactory<ResourceID> resourceManagerFactory;
        private final DispatcherRunnerFactory dispatcherRunnerFactory;

        /* loaded from: input_file:org/apache/flink/runtime/entrypoint/ClusterEntrypointTest$TestingEntryPoint$Builder.class */
        public static final class Builder {
            private HighAvailabilityServices haService = new TestingHighAvailabilityServicesBuilder().build();
            private ResourceManagerFactory<ResourceID> resourceManagerFactory = StandaloneResourceManagerFactory.getInstance();
            private DispatcherRunnerFactory dispatcherRunnerFactory = new TestingDispatcherRunnerFactory.Builder().build();
            private Configuration configuration = new Configuration();

            public Builder setHighAvailabilityServices(HighAvailabilityServices highAvailabilityServices) {
                this.haService = highAvailabilityServices;
                return this;
            }

            public Builder setResourceManagerFactory(ResourceManagerFactory<ResourceID> resourceManagerFactory) {
                this.resourceManagerFactory = resourceManagerFactory;
                return this;
            }

            public Builder setConfiguration(Configuration configuration) {
                this.configuration = configuration;
                return this;
            }

            public Builder setDispatcherRunnerFactory(DispatcherRunnerFactory dispatcherRunnerFactory) {
                this.dispatcherRunnerFactory = dispatcherRunnerFactory;
                return this;
            }

            public TestingEntryPoint build() {
                return new TestingEntryPoint(this.configuration, this.haService, this.resourceManagerFactory, this.dispatcherRunnerFactory);
            }
        }

        private TestingEntryPoint(Configuration configuration, HighAvailabilityServices highAvailabilityServices, ResourceManagerFactory<ResourceID> resourceManagerFactory, DispatcherRunnerFactory dispatcherRunnerFactory) {
            super(configuration);
            SignalHandler.register(LOG);
            this.haService = highAvailabilityServices;
            this.resourceManagerFactory = resourceManagerFactory;
            this.dispatcherRunnerFactory = dispatcherRunnerFactory;
        }

        protected DispatcherResourceManagerComponentFactory createDispatcherResourceManagerComponentFactory(Configuration configuration) throws IOException {
            return new DefaultDispatcherResourceManagerComponentFactory(this.dispatcherRunnerFactory, this.resourceManagerFactory, SessionRestEndpointFactory.INSTANCE);
        }

        protected ExecutionGraphInfoStore createSerializableExecutionGraphStore(Configuration configuration, ScheduledExecutor scheduledExecutor) {
            return new MemoryExecutionGraphInfoStore();
        }

        protected HighAvailabilityServices createHaServices(Configuration configuration, Executor executor) {
            return this.haService;
        }

        protected boolean supportsReactiveMode() {
            return false;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/entrypoint/ClusterEntrypointTest$TestingResourceManagerFactory.class */
    public static class TestingResourceManagerFactory extends ResourceManagerFactory<ResourceID> {
        private final BiConsumer<ApplicationStatus, String> deregisterAppConsumer;

        /* loaded from: input_file:org/apache/flink/runtime/entrypoint/ClusterEntrypointTest$TestingResourceManagerFactory$Builder.class */
        public static final class Builder {
            private BiConsumer<ApplicationStatus, String> deregisterAppConsumer = (applicationStatus, str) -> {
            };

            public Builder setInternalDeregisterApplicationConsumer(BiConsumer<ApplicationStatus, String> biConsumer) {
                this.deregisterAppConsumer = biConsumer;
                return this;
            }

            public TestingResourceManagerFactory build() {
                return new TestingResourceManagerFactory(this.deregisterAppConsumer);
            }
        }

        private TestingResourceManagerFactory(BiConsumer<ApplicationStatus, String> biConsumer) {
            this.deregisterAppConsumer = biConsumer;
        }

        protected ResourceManager<ResourceID> createResourceManager(Configuration configuration, ResourceID resourceID, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, FatalErrorHandler fatalErrorHandler, ClusterInformation clusterInformation, @Nullable String str, ResourceManagerMetricGroup resourceManagerMetricGroup, ResourceManagerRuntimeServices resourceManagerRuntimeServices, Executor executor) throws Exception {
            return new TestingResourceManager(rpcService, resourceID, highAvailabilityServices, heartbeatServices, SlotManagerBuilder.newBuilder().setScheduledExecutor(rpcService.getScheduledExecutor()).build(), NoOpResourceManagerPartitionTracker::get, new TestingJobLeaderIdService.Builder().build(), fatalErrorHandler, resourceManagerMetricGroup) { // from class: org.apache.flink.runtime.entrypoint.ClusterEntrypointTest.TestingResourceManagerFactory.1
                @Override // org.apache.flink.runtime.resourcemanager.TestingResourceManager
                protected void internalDeregisterApplication(ApplicationStatus applicationStatus, @Nullable String str2) {
                    TestingResourceManagerFactory.this.deregisterAppConsumer.accept(applicationStatus, str2);
                }
            };
        }

        protected ResourceManagerRuntimeServicesConfiguration createResourceManagerRuntimeServicesConfiguration(Configuration configuration) throws ConfigurationException {
            return ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration, ArbitraryWorkerResourceSpecFactory.INSTANCE);
        }
    }

    @Before
    public void before() {
        this.flinkConfig = new Configuration();
    }

    @Test(expected = IllegalConfigurationException.class)
    public void testStandaloneSessionClusterEntrypointDeniedInReactiveMode() {
        this.flinkConfig.set(JobManagerOptions.SCHEDULER_MODE, SchedulerExecutionMode.REACTIVE);
        new TestingEntryPoint.Builder().setConfiguration(this.flinkConfig).build();
        Assert.fail("Entrypoint initialization is supposed to fail");
    }

    @Test
    public void testCloseAsyncShouldNotCleanUpHAData() throws Exception {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        CompletableFuture<Void> completableFuture2 = new CompletableFuture<>();
        TestingEntryPoint build = new TestingEntryPoint.Builder().setConfiguration(this.flinkConfig).setHighAvailabilityServices(new TestingHighAvailabilityServicesBuilder().setCloseFuture(completableFuture).setCloseAndCleanupAllDataFuture(completableFuture2).build()).build();
        CompletableFuture<ApplicationStatus> startClusterEntrypoint = startClusterEntrypoint(build);
        build.closeAsync();
        MatcherAssert.assertThat(startClusterEntrypoint.get(TIMEOUT_MS, TimeUnit.MILLISECONDS), CoreMatchers.is(ApplicationStatus.UNKNOWN));
        MatcherAssert.assertThat(Boolean.valueOf(completableFuture.isDone()), CoreMatchers.is(true));
        MatcherAssert.assertThat(Boolean.valueOf(completableFuture2.isDone()), CoreMatchers.is(false));
    }

    @Test
    public void testCloseAsyncShouldNotDeregisterApp() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        TestingEntryPoint build = new TestingEntryPoint.Builder().setConfiguration(this.flinkConfig).setResourceManagerFactory(new TestingResourceManagerFactory.Builder().setInternalDeregisterApplicationConsumer((applicationStatus, str) -> {
            completableFuture.complete(null);
        }).build()).build();
        CompletableFuture<ApplicationStatus> startClusterEntrypoint = startClusterEntrypoint(build);
        build.closeAsync();
        MatcherAssert.assertThat(startClusterEntrypoint.get(TIMEOUT_MS, TimeUnit.MILLISECONDS), CoreMatchers.is(ApplicationStatus.UNKNOWN));
        MatcherAssert.assertThat(Boolean.valueOf(completableFuture.isDone()), CoreMatchers.is(false));
    }

    @Test
    public void testClusterFinishedNormallyShouldDeregisterAppAndCleanupHAData() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        CompletableFuture<Void> completableFuture2 = new CompletableFuture<>();
        CompletableFuture<ApplicationStatus> completableFuture3 = new CompletableFuture<>();
        TestingHighAvailabilityServices build = new TestingHighAvailabilityServicesBuilder().setCloseAndCleanupAllDataFuture(completableFuture2).build();
        TestingResourceManagerFactory build2 = new TestingResourceManagerFactory.Builder().setInternalDeregisterApplicationConsumer((applicationStatus, str) -> {
            completableFuture.complete(null);
        }).build();
        CompletableFuture<ApplicationStatus> startClusterEntrypoint = startClusterEntrypoint(new TestingEntryPoint.Builder().setConfiguration(this.flinkConfig).setResourceManagerFactory(build2).setDispatcherRunnerFactory(new TestingDispatcherRunnerFactory.Builder().setShutDownFuture(completableFuture3).build()).setHighAvailabilityServices(build).build());
        completableFuture3.complete(ApplicationStatus.SUCCEEDED);
        MatcherAssert.assertThat(startClusterEntrypoint.get(TIMEOUT_MS, TimeUnit.MILLISECONDS), CoreMatchers.is(ApplicationStatus.SUCCEEDED));
        MatcherAssert.assertThat(Boolean.valueOf(completableFuture.isDone()), CoreMatchers.is(true));
        MatcherAssert.assertThat(Boolean.valueOf(completableFuture2.isDone()), CoreMatchers.is(true));
    }

    @Test
    public void testCloseAsyncShouldBeExecutedInShutdownHook() throws Exception {
        Assume.assumeTrue(OperatingSystem.isLinux() || OperatingSystem.isMac());
        File file = new File(TEMPORARY_FOLDER.getRoot(), UUID.randomUUID() + ".marker");
        TestingClusterEntrypointProcess testingClusterEntrypointProcess = new TestingClusterEntrypointProcess(file);
        testingClusterEntrypointProcess.startProcess();
        boolean z = false;
        try {
            long processId = testingClusterEntrypointProcess.getProcessId();
            Assert.assertTrue("Cannot determine process ID", processId != -1);
            TestJvmProcess.waitForMarkerFile(file, 30000L);
            TestJvmProcess.killProcessWithSigTerm(processId);
            MatcherAssert.assertThat(String.format("Process %s does not exit within %s ms", Long.valueOf(processId), Long.valueOf(TIMEOUT_MS)), Boolean.valueOf(testingClusterEntrypointProcess.waitFor(TIMEOUT_MS, TimeUnit.MILLISECONDS)), CoreMatchers.is(true));
            MatcherAssert.assertThat("markerFile should be deleted in closeAsync shutdownHook", Boolean.valueOf(file.exists()), CoreMatchers.is(false));
            z = true;
            if (1 == 0) {
                testingClusterEntrypointProcess.printProcessLog();
            }
            testingClusterEntrypointProcess.destroy();
        } catch (Throwable th) {
            if (!z) {
                testingClusterEntrypointProcess.printProcessLog();
            }
            testingClusterEntrypointProcess.destroy();
            throw th;
        }
    }

    private CompletableFuture<ApplicationStatus> startClusterEntrypoint(TestingEntryPoint testingEntryPoint) throws Exception {
        testingEntryPoint.startCluster();
        return FutureUtils.supplyAsync(() -> {
            return (ApplicationStatus) testingEntryPoint.getTerminationFuture().get();
        }, TEST_EXECUTOR_RESOURCE.getExecutor());
    }
}
