From bc9a99fa061e4cd2f003742442c7151bfa141c30 Mon Sep 17 00:00:00 2001 From: Sergey Nuyanzin Date: Thu, 26 Dec 2024 20:33:55 +0100 Subject: [PATCH] [FLINK-36948][tests] Remove deprecated FlinkMatchers --- ...DispatcherCachedOperationsHandlerTest.java | 66 ++- .../dispatcher/DispatcherCleanupITCase.java | 86 ++-- .../DispatcherResourceCleanupTest.java | 25 +- .../runtime/dispatcher/DispatcherTest.java | 223 +++++----- ...ispatcherResourceManagerComponentTest.java | 9 +- .../heartbeat/HeartbeatManagerTest.java | 77 ++-- .../DefaultExecutionPlanStoreTest.java | 97 ++--- .../CatalogBaseTableResolutionTest.java | 85 ++-- .../table/catalog/SchemaResolutionTest.java | 12 +- .../DataTypeExtractorScalaTest.scala | 3 +- .../table/codesplit/JavaCodeSplitterTest.java | 17 +- .../extraction/DataTypeExtractorTest.java | 8 +- .../table/planner/codegen/CodeSplitTest.java | 4 +- .../tvf/slicing/SliceAssignerTestBase.java | 11 +- .../FlinkCompletableFutureAssert.java | 19 +- .../flink/core/testutils/FlinkMatchers.java | 406 ------------------ .../runtime/jobmaster/JobMasterITCase.java | 16 +- .../scheduling/AdaptiveSchedulerITCase.java | 92 ++-- 18 files changed, 359 insertions(+), 897 deletions(-) delete mode 100644 flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/FlinkMatchers.java diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCachedOperationsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCachedOperationsHandlerTest.java index 7768e36724c02..889f44d846d65 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCachedOperationsHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCachedOperationsHandlerTest.java @@ -24,7 +24,7 @@ import org.apache.flink.configuration.RestOptions; import org.apache.flink.core.execution.CheckpointType; import org.apache.flink.core.execution.SavepointFormatType; -import org.apache.flink.core.testutils.FlinkMatchers; +import org.apache.flink.core.testutils.FlinkAssertions; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.rest.handler.async.CompletedOperationCache; import org.apache.flink.runtime.rest.handler.async.OperationResult; @@ -41,11 +41,8 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import static org.apache.flink.core.testutils.FlinkMatchers.futureFailedWith; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.core.Is.is; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Tests for the {@link DispatcherCachedOperationsHandler} component. */ public class DispatcherCachedOperationsHandlerTest extends TestLogger { @@ -124,18 +121,17 @@ public void triggerSavepointRepeatedly() throws ExecutionException, InterruptedE TriggerSavepointMode.SAVEPOINT, TIMEOUT); - assertThat(triggerSavepointFunction.getNumberOfInvocations(), is(1)); - assertThat( - triggerSavepointFunction.getInvocationParameters().get(0), - is( + assertThat(triggerSavepointFunction.getNumberOfInvocations()).isOne(); + assertThat(triggerSavepointFunction.getInvocationParameters().get(0)) + .isEqualTo( new Tuple4<>( jobID, targetDirectory, SavepointFormatType.CANONICAL, - TriggerSavepointMode.SAVEPOINT))); + TriggerSavepointMode.SAVEPOINT)); - assertThat(firstAcknowledge.get(), is(Acknowledge.get())); - assertThat(secondAcknowledge.get(), is(Acknowledge.get())); + assertThat(firstAcknowledge.get()).isEqualTo(Acknowledge.get()); + assertThat(secondAcknowledge.get()).isEqualTo(Acknowledge.get()); } @Test @@ -155,18 +151,17 @@ public void stopWithSavepointRepeatedly() throws ExecutionException, Interrupted TriggerSavepointMode.TERMINATE_WITH_SAVEPOINT, TIMEOUT); - assertThat(stopWithSavepointFunction.getNumberOfInvocations(), is(1)); - assertThat( - stopWithSavepointFunction.getInvocationParameters().get(0), - is( + assertThat(stopWithSavepointFunction.getNumberOfInvocations()).isOne(); + assertThat(stopWithSavepointFunction.getInvocationParameters().get(0)) + .isEqualTo( new Tuple4<>( jobID, targetDirectory, SavepointFormatType.CANONICAL, - TriggerSavepointMode.TERMINATE_WITH_SAVEPOINT))); + TriggerSavepointMode.TERMINATE_WITH_SAVEPOINT)); - assertThat(firstAcknowledge.get(), is(Acknowledge.get())); - assertThat(secondAcknowledge.get(), is(Acknowledge.get())); + assertThat(firstAcknowledge.get()).isEqualTo(Acknowledge.get()); + assertThat(secondAcknowledge.get()).isEqualTo(Acknowledge.get()); } @Test @@ -190,23 +185,22 @@ public void retryingCompletedOperationDoesNotMarkCacheEntryAsAccessed() .get(); // should not complete because we wait for the result to be accessed - assertThat( - savepointTriggerCache.closeAsync(), - FlinkMatchers.willNotComplete(Duration.ofMillis(10))); + FlinkAssertions.assertThatFuture(savepointTriggerCache.closeAsync()) + .willNotCompleteWithin(Duration.ofMillis(10)); } @Test public void throwsIfCacheIsShuttingDown() { savepointTriggerCache.closeAsync(); - assertThrows( - IllegalStateException.class, - () -> - handler.triggerSavepoint( - operationKey, - targetDirectory, - SavepointFormatType.CANONICAL, - TriggerSavepointMode.SAVEPOINT, - TIMEOUT)); + assertThatThrownBy( + () -> + handler.triggerSavepoint( + operationKey, + targetDirectory, + SavepointFormatType.CANONICAL, + TriggerSavepointMode.SAVEPOINT, + TIMEOUT)) + .isInstanceOf(IllegalStateException.class); } @Test @@ -224,15 +218,17 @@ public void getStatus() throws ExecutionException, InterruptedException { CompletableFuture> statusFuture = handler.getSavepointStatus(operationKey); - assertEquals(statusFuture.get(), OperationResult.success(savepointLocation)); + assertThat(statusFuture.get()).isEqualTo(OperationResult.success(savepointLocation)); } @Test - public void getStatusFailsIfKeyUnknown() throws InterruptedException { + public void getStatusFailsIfKeyUnknown() { CompletableFuture> statusFuture = handler.getSavepointStatus(operationKey); - assertThat(statusFuture, futureFailedWith(UnknownOperationKeyException.class)); + FlinkAssertions.assertThatFuture(statusFuture) + .eventuallyFails() + .withCauseOfType(UnknownOperationKeyException.class); } private abstract static class TriggerCheckpointSpyFunction diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCleanupITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCleanupITCase.java index 92725c4b09d6e..d07a38156d882 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCleanupITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCleanupITCase.java @@ -21,7 +21,6 @@ import org.apache.flink.api.common.JobStatus; import org.apache.flink.configuration.CleanupOptions; import org.apache.flink.core.execution.RecoveryClaimMode; -import org.apache.flink.core.testutils.FlinkMatchers; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.checkpoint.EmbeddedCompletedCheckpointStore; import org.apache.flink.runtime.checkpoint.PerJobCheckpointRecoveryFactory; @@ -53,10 +52,7 @@ import org.apache.flink.runtime.testutils.TestingJobResultStore; import org.apache.flink.util.concurrent.FutureUtils; -import org.hamcrest.CoreMatchers; -import org.hamcrest.collection.IsEmptyCollection; import org.junit.After; -import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -65,16 +61,14 @@ import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.core.IsEqual.equalTo; -import static org.junit.Assert.assertTrue; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** An integration test for various fail-over scenarios of the {@link Dispatcher} component. */ public class DispatcherCleanupITCase extends AbstractDispatcherTest { @@ -95,8 +89,8 @@ public void setUp() throws Exception { // First job cleanup still succeeded for the // CompletedCheckpointStore because the JobGraph cleanup happens // after the JobManagerRunner closing - assertTrue(previous.getShutdownStatus().isPresent()); - assertTrue(previous.getAllCheckpoints().isEmpty()); + assertThat(previous.getShutdownStatus()).isPresent(); + assertThat(previous.getAllCheckpoints()).isEmpty(); return new EmbeddedCompletedCheckpointStore( maxCheckpoints, previous.getAllCheckpoints(), @@ -191,12 +185,11 @@ public void testCleanupThroughRetries() throws Exception { successfulCleanupLatch.await(); - assertThat(actualGlobalCleanupCallCount.get(), equalTo(numberOfErrors + 1)); + assertThat(actualGlobalCleanupCallCount.get()).isEqualTo(numberOfErrors + 1); - assertThat( - "The JobGraph should be removed from ExecutionPlanStore.", - haServices.getExecutionPlanStore().getJobIds(), - IsEmptyCollection.empty()); + assertThat(haServices.getExecutionPlanStore().getJobIds()) + .as("The JobGraph should be removed from ExecutionPlanStore.") + .isEmpty(); CommonTestUtils.waitUntilCondition( () -> haServices.getJobResultStore().hasJobResultEntryAsync(jobId).get()); @@ -233,20 +226,15 @@ public void testCleanupNotCancellable() throws Exception { CommonTestUtils.waitUntilCondition(() -> jobManagerRunnerEntry.get() != null); - assertThat( - "The JobResultStore should have this job still marked as dirty.", - haServices.getJobResultStore().hasDirtyJobResultEntryAsync(jobId).get(), - CoreMatchers.is(true)); + assertThat(haServices.getJobResultStore().hasDirtyJobResultEntryAsync(jobId).get()) + .as("The JobResultStore should have this job still marked as dirty.") + .isTrue(); final DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class); - try { - dispatcherGateway.cancelJob(jobId, TIMEOUT).get(); - Assert.fail("Should fail because cancelling the cleanup is not allowed."); - } catch (ExecutionException e) { - assertThat(e, FlinkMatchers.containsCause(JobCancellationFailedException.class)); - } + assertThatThrownBy(() -> dispatcherGateway.cancelJob(jobId, TIMEOUT).get()) + .hasCauseInstanceOf(JobCancellationFailedException.class); jobManagerRunnerCleanupFuture.complete(null); CommonTestUtils.waitUntilCondition( @@ -305,25 +293,22 @@ public void testCleanupAfterLeadershipChange() throws Exception { waitForJobToFinish(confirmedLeaderInformation, dispatcherGateway, jobId); firstCleanupTriggered.await(); - assertThat( - "The cleanup should have been triggered only once.", - actualGlobalCleanupCallCount.get(), - equalTo(1)); - assertThat( - "The cleanup should not have reached the successful cleanup code path.", - successfulJobGraphCleanup.isDone(), - equalTo(false)); + assertThat(actualGlobalCleanupCallCount.get()) + .as("The cleanup should have been triggered only once.") + .isOne(); + assertThat(successfulJobGraphCleanup.isDone()) + .as("The cleanup should not have reached the successful cleanup code path.") + .isFalse(); + assertThat(haServices.getExecutionPlanStore().getJobIds()) + .as("The JobGraph is still stored in the ExecutionPlanStore.") + .containsExactly(jobId); assertThat( - "The JobGraph is still stored in the ExecutionPlanStore.", - haServices.getExecutionPlanStore().getJobIds(), - equalTo(Collections.singleton(jobId))); - assertThat( - "The JobResultStore should have this job marked as dirty.", - haServices.getJobResultStore().getDirtyResults().stream() - .map(JobResult::getJobId) - .collect(Collectors.toSet()), - equalTo(Collections.singleton(jobId))); + haServices.getJobResultStore().getDirtyResults().stream() + .map(JobResult::getJobId) + .collect(Collectors.toSet())) + .as("The JobResultStore should have this job marked as dirty.") + .containsExactly(jobId); // Run a second dispatcher, that restores our finished job. final Dispatcher secondDispatcher = @@ -338,17 +323,16 @@ public void testCleanupAfterLeadershipChange() throws Exception { CommonTestUtils.waitUntilCondition( () -> haServices.getJobResultStore().getDirtyResults().isEmpty()); - assertThat( - "The JobGraph is not stored in the ExecutionPlanStore.", - haServices.getExecutionPlanStore().getJobIds(), - IsEmptyCollection.empty()); - assertTrue( - "The JobResultStore has the job listed as clean.", - haServices.getJobResultStore().hasJobResultEntryAsync(jobId).get()); + assertThat(haServices.getExecutionPlanStore().getJobIds()) + .as("The JobGraph is not stored in the ExecutionPlanStore.") + .isEmpty(); + assertThat(haServices.getJobResultStore().hasJobResultEntryAsync(jobId).get()) + .as("The JobResultStore has the job listed as clean.") + .isTrue(); - assertThat(successfulJobGraphCleanup.get(), equalTo(jobId)); + assertThat(successfulJobGraphCleanup.get()).isEqualTo(jobId); - assertThat(actualGlobalCleanupCallCount.get(), equalTo(2)); + assertThat(actualGlobalCleanupCallCount.get()).isEqualTo(2); } private void waitForJobToFinish( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java index cae270a84ed03..09c3be44baea5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java @@ -84,9 +84,8 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; -import static org.apache.flink.core.testutils.FlinkMatchers.containsCause; -import static org.apache.flink.core.testutils.FlinkMatchers.containsMessage; import static org.apache.flink.runtime.dispatcher.AbstractDispatcherTest.awaitStatus; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; @@ -272,12 +271,7 @@ public void testGlobalCleanupWhenJobSubmissionFails() throws Exception { startDispatcher(new FailingJobManagerRunnerFactory(new FlinkException("Test exception"))); final CompletableFuture submissionFuture = submitJob(); - try { - submissionFuture.get(); - fail("Job submission was expected to fail."); - } catch (ExecutionException ee) { - assertThat(ee, containsCause(JobSubmissionException.class)); - } + assertThatThrownBy(submissionFuture::get).hasCauseInstanceOf(JobSubmissionException.class); assertGlobalCleanupTriggered(jobId); } @@ -647,15 +641,12 @@ public void testFailingJobManagerRunnerCleanup() throws Exception { // submit and fail during job master runner construction queue.offer(Optional.of(testException)); - try { - dispatcherGateway.submitJob(jobGraph, Duration.ofMinutes(1)).get(); - fail("A FlinkException is expected"); - } catch (Throwable expectedException) { - assertThat(expectedException, containsCause(FlinkException.class)); - assertThat(expectedException, containsMessage(testException.getMessage())); - // make sure we've cleaned up in correct order (including HA) - assertGlobalCleanupTriggered(jobId); - } + assertThatThrownBy(() -> dispatcherGateway.submitJob(jobGraph, Duration.ofMinutes(1)).get()) + .hasCauseInstanceOf(FlinkException.class) + .hasRootCauseMessage(testException.getMessage()); + + // make sure we've cleaned up in correct order (including HA) + assertGlobalCleanupTriggered(jobId); // don't fail this time queue.offer(Optional.empty()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java index dbb8d652b28b6..a1d44b3712e90 100755 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java @@ -27,7 +27,6 @@ import org.apache.flink.core.execution.SavepointFormatType; import org.apache.flink.core.failure.FailureEnricher; import org.apache.flink.core.testutils.FlinkAssertions; -import org.apache.flink.core.testutils.FlinkMatchers; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.blob.BlobServer; import org.apache.flink.runtime.blob.PermanentBlobKey; @@ -107,7 +106,6 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; import org.assertj.core.api.Assertions; -import org.hamcrest.Matchers; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -144,16 +142,8 @@ import java.util.function.Function; import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.greaterThan; -import static org.hamcrest.Matchers.instanceOf; -import static org.hamcrest.core.Is.is; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertThrows; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Test for the {@link Dispatcher} component. */ public class DispatcherTest extends AbstractDispatcherTest { @@ -219,9 +209,9 @@ public void testJobSubmission() throws Exception { jobMasterLeaderElection.getStartFuture().get(); - assertTrue( - "jobManagerRunner was not started", - jobMasterLeaderElection.getStartFuture().isDone()); + assertThat(jobMasterLeaderElection.getStartFuture()) + .as("jobManagerRunner was not started") + .isDone(); } @Test @@ -290,11 +280,11 @@ public void testDuplicateJobSubmissionIsDetectedOnSimultaneousSubmission() throw .eventuallySucceeds(); // verify that all but one submission failed as duplicates - Assertions.assertThat(exceptions) + assertThat(exceptions) .hasSize(numThreads - 1) .allSatisfy( t -> - Assertions.assertThat(t) + assertThat(t) .hasCauseInstanceOf(DuplicateJobSubmissionException.class)); } @@ -308,12 +298,14 @@ private void assertDuplicateJobSubmission() throws Exception { dispatcher.getSelfGateway(DispatcherGateway.class); final CompletableFuture submitFuture = dispatcherGateway.submitJob(jobGraph, TIMEOUT); - final ExecutionException executionException = - assertThrows(ExecutionException.class, submitFuture::get); - assertTrue(executionException.getCause() instanceof DuplicateJobSubmissionException); - final DuplicateJobSubmissionException duplicateException = - (DuplicateJobSubmissionException) executionException.getCause(); - assertTrue(duplicateException.isGloballyTerminated()); + assertThatThrownBy(submitFuture::get) + .hasCauseInstanceOf(DuplicateJobSubmissionException.class) + .satisfies( + e -> + assertThat( + ((DuplicateJobSubmissionException) e.getCause()) + .isGloballyTerminated()) + .isTrue()); } @Test @@ -328,12 +320,14 @@ public void testDuplicateJobSubmissionWithRunningJobId() throws Exception { dispatcher.getSelfGateway(DispatcherGateway.class); final CompletableFuture submitFuture = dispatcherGateway.submitJob(jobGraph, TIMEOUT); - final ExecutionException executionException = - assertThrows(ExecutionException.class, submitFuture::get); - assertTrue(executionException.getCause() instanceof DuplicateJobSubmissionException); - final DuplicateJobSubmissionException duplicateException = - (DuplicateJobSubmissionException) executionException.getCause(); - assertFalse(duplicateException.isGloballyTerminated()); + assertThatThrownBy(submitFuture::get) + .hasCauseInstanceOf(DuplicateJobSubmissionException.class) + .satisfies( + e -> + assertThat( + ((DuplicateJobSubmissionException) e.getCause()) + .isGloballyTerminated()) + .isFalse()); } /** @@ -364,12 +358,8 @@ public void testJobSubmissionWithPartialResourceConfigured() throws Exception { CompletableFuture acknowledgeFuture = dispatcherGateway.submitJob(jobGraphWithTwoVertices, TIMEOUT); - try { - acknowledgeFuture.get(); - fail("job submission should have failed"); - } catch (ExecutionException e) { - assertTrue(ExceptionUtils.findThrowable(e, JobSubmissionException.class).isPresent()); - } + assertThatThrownBy(() -> acknowledgeFuture.get()) + .hasCauseInstanceOf(JobSubmissionException.class); } @Test @@ -384,15 +374,14 @@ public void testNonBlockingJobSubmission() throws Exception { blockingJobMaster.waitForBlockingInit(); // ensure INITIALIZING status - assertThat( - dispatcherGateway.requestJobStatus(jobId, TIMEOUT).get(), - is(JobStatus.INITIALIZING)); + assertThat(dispatcherGateway.requestJobStatus(jobId, TIMEOUT).get()) + .isSameAs(JobStatus.INITIALIZING); // ensure correct JobDetails MultipleJobsDetails multiDetails = dispatcherGateway.requestMultipleJobDetails(TIMEOUT).get(); - assertEquals(1, multiDetails.getJobs().size()); - assertEquals(jobId, multiDetails.getJobs().iterator().next().getJobId()); + assertThat(multiDetails.getJobs()).hasSize(1); + assertThat(multiDetails.getJobs().iterator().next().getJobId()).isEqualTo(jobId); // let the initialization finish. blockingJobMaster.unblockJobMasterInitialization(); @@ -410,24 +399,21 @@ public void testInvalidCallDuringInitialization() throws Exception { dispatcherGateway.submitJob(jobGraph, TIMEOUT).get(); - assertThat( - dispatcherGateway.requestJobStatus(jobId, TIMEOUT).get(), - is(JobStatus.INITIALIZING)); + assertThat(dispatcherGateway.requestJobStatus(jobId, TIMEOUT).get()) + .isSameAs(JobStatus.INITIALIZING); // this call is supposed to fail - try { - dispatcherGateway - .triggerSavepointAndGetLocation( - jobId, - "file:///tmp/savepoint", - SavepointFormatType.CANONICAL, - TriggerSavepointMode.SAVEPOINT, - TIMEOUT) - .get(); - fail("Previous statement should have failed"); - } catch (ExecutionException t) { - assertTrue(t.getCause() instanceof UnavailableDispatcherOperationException); - } + assertThatThrownBy( + () -> + dispatcherGateway + .triggerSavepointAndGetLocation( + jobId, + "file:///tmp/savepoint", + SavepointFormatType.CANONICAL, + TriggerSavepointMode.SAVEPOINT, + TIMEOUT) + .get()) + .hasCauseInstanceOf(UnavailableDispatcherOperationException.class); } @Test @@ -490,8 +476,8 @@ public void testCancellationOfCanceledTerminalDoesNotThrowException() throws Exc // wait for job to finish dispatcher.getJobTerminationFuture(jobId, TIMEOUT).get(); // sanity check - assertThat( - dispatcherGateway.requestJobStatus(jobId, TIMEOUT).get(), is(JobStatus.CANCELED)); + assertThat(dispatcherGateway.requestJobStatus(jobId, TIMEOUT).get()) + .isSameAs(JobStatus.CANCELED); dispatcherGateway.cancelJob(jobId, TIMEOUT).get(); } @@ -524,16 +510,15 @@ public void testCancellationOfNonCanceledTerminalJobFailsWithAppropriateExceptio // wait for job to finish dispatcher.getJobTerminationFuture(jobId, TIMEOUT).get(); // sanity check - assertThat( - dispatcherGateway.requestJobStatus(jobId, TIMEOUT).get(), is(JobStatus.FINISHED)); + assertThat(dispatcherGateway.requestJobStatus(jobId, TIMEOUT).get()) + .isSameAs(JobStatus.FINISHED); final CompletableFuture cancelFuture = dispatcherGateway.cancelJob(jobId, TIMEOUT); - assertThat( - cancelFuture, - FlinkMatchers.futureWillCompleteExceptionally( - FlinkJobTerminatedWithoutCancellationException.class, Duration.ofHours(8))); + FlinkAssertions.assertThatFuture(cancelFuture) + .eventuallyFails() + .withCauseOfType(FlinkJobTerminatedWithoutCancellationException.class); } @Test @@ -570,10 +555,10 @@ public void testNoHistoryServerArchiveCreatedForSuspendedJob() throws Exception // wait for job to finish dispatcherGateway.requestJobResult(jobId, TIMEOUT).get(); // sanity check - assertThat( - dispatcherGateway.requestJobStatus(jobId, TIMEOUT).get(), is(JobStatus.SUSPENDED)); + assertThat(dispatcherGateway.requestJobStatus(jobId, TIMEOUT).get()) + .isSameAs(JobStatus.SUSPENDED); - assertThat(archiveAttemptFuture.isDone(), is(false)); + assertThat(archiveAttemptFuture).isNotDone(); } @Test @@ -615,7 +600,7 @@ public void testJobManagerRunnerInitializationFailureFailsJob() throws Exception // get failure cause ArchivedExecutionGraph execGraph = dispatcherGateway.requestJob(jobGraph.getJobID(), TIMEOUT).get(); - assertThat(execGraph.getState(), is(JobStatus.FAILED)); + assertThat(execGraph.getState()).isSameAs(JobStatus.FAILED); Assert.assertNotNull(execGraph.getFailureInfo()); Throwable throwable = @@ -625,7 +610,7 @@ public void testJobManagerRunnerInitializationFailureFailsJob() throws Exception .deserializeError(ClassLoader.getSystemClassLoader()); // ensure correct exception type - assertThat(throwable.getMessage(), equalTo(testFailure.getMessage())); + assertThat(throwable).hasMessage(testFailure.getMessage()); } /** Test that {@link JobResult} is cached when the job finishes. */ @@ -654,13 +639,12 @@ public void testCacheJobExecutionResult() throws Exception { dispatcher.completeJobExecution(failedExecutionGraphInfo); - assertThat( - dispatcherGateway.requestJobStatus(failedJobId, TIMEOUT).get(), - equalTo(expectedState)); + assertThat(dispatcherGateway.requestJobStatus(failedJobId, TIMEOUT).get()) + .isEqualTo(expectedState); final CompletableFuture completableFutureCompletableFuture = dispatcher.callAsyncInMainThread( () -> dispatcher.requestExecutionGraphInfo(failedJobId, TIMEOUT)); - assertThat(completableFutureCompletableFuture.get(), is(failedExecutionGraphInfo)); + assertThat(completableFutureCompletableFuture.get()).isEqualTo(failedExecutionGraphInfo); } @Test @@ -685,8 +669,8 @@ public void testRetrieveCheckpointStats() throws Exception { CompletableFuture resultsFuture = dispatcher.callAsyncInMainThread( () -> dispatcher.requestCheckpointStats(jobId, TIMEOUT)); - Assertions.assertThat(resultsFuture).succeedsWithin(Duration.ofSeconds(1)); - Assertions.assertThat(resultsFuture).isCompletedWithValue(snapshot); + assertThat(resultsFuture).succeedsWithin(Duration.ofSeconds(1)); + assertThat(resultsFuture).isCompletedWithValue(snapshot); } @Test @@ -734,8 +718,8 @@ private void testRetrieveCheckpointStatsWithJobStatus(JobStatus jobStatus) throw CompletableFuture resultsFuture = dispatcher.callAsyncInMainThread( () -> dispatcher.requestCheckpointStats(jobId, TIMEOUT)); - Assertions.assertThat(resultsFuture).succeedsWithin(Duration.ofSeconds(1)); - Assertions.assertThat(resultsFuture).isCompletedWithValue(snapshot); + assertThat(resultsFuture).succeedsWithin(Duration.ofSeconds(1)); + assertThat(resultsFuture).isCompletedWithValue(snapshot); } private CheckpointStatsSnapshot getTestCheckpointStatsSnapshotWithTwoFailedCheckpoints() { @@ -763,8 +747,8 @@ public void testRetrieveCheckpointStatsOnNonExistentJob() throws Exception { dispatcher.callAsyncInMainThread( () -> dispatcher.requestCheckpointStats(jobId, TIMEOUT)); - Assertions.assertThat(resultsFuture).failsWithin(Duration.ofSeconds(1)); - Assertions.assertThat(resultsFuture).isCompletedExceptionally(); + assertThat(resultsFuture).failsWithin(Duration.ofSeconds(1)); + assertThat(resultsFuture).isCompletedExceptionally(); Assertions.assertThatThrownBy(resultsFuture::get) .hasCauseInstanceOf(FlinkJobNotFoundException.class) @@ -781,12 +765,8 @@ public void testThrowExceptionIfJobExecutionResultNotFound() throws Exception { final DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class); - try { - dispatcherGateway.requestJob(new JobID(), TIMEOUT).get(); - } catch (ExecutionException e) { - final Throwable throwable = ExceptionUtils.stripExecutionException(e); - assertThat(throwable, instanceOf(FlinkJobNotFoundException.class)); - } + assertThatThrownBy(() -> dispatcherGateway.requestJob(new JobID(), TIMEOUT).get()) + .hasCauseInstanceOf(FlinkJobNotFoundException.class); } /** Tests that we can dispose a savepoint. */ @@ -804,11 +784,11 @@ public void testSavepointDisposal() throws Exception { final DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class); - assertThat(Files.exists(savepointPath), is(true)); + assertThat(Files.exists(savepointPath)).isTrue(); dispatcherGateway.disposeSavepoint(externalPointer.toString(), TIMEOUT).get(); - assertThat(Files.exists(savepointPath), is(false)); + assertThat(Files.exists(savepointPath)).isFalse(); } @Nonnull @@ -892,10 +872,8 @@ private void testJobManagerRunnerFailureResultingInFatalError( final Throwable error = fatalErrorHandler.getErrorFuture().get(TIMEOUT.toMillis(), TimeUnit.MILLISECONDS); - assertThat( - ExceptionUtils.findThrowableWithMessage(error, testException.getMessage()) - .isPresent(), - is(true)); + assertThat(ExceptionUtils.findThrowableWithMessage(error, testException.getMessage())) + .isPresent(); fatalErrorHandler.clearError(); } @@ -946,15 +924,13 @@ public void testJobCleanupWithoutRecoveredJobGraph() throws Exception { final TestingJobManagerRunner cleanupRunner = cleanupRunnerFactory.takeCreatedJobManagerRunner(); - assertThat( - "The CleanupJobManagerRunner has the wrong job ID attached.", - cleanupRunner.getJobID(), - is(jobIdOfRecoveredDirtyJobs)); + assertThat(cleanupRunner.getJobID()) + .as("The CleanupJobManagerRunner has the wrong job ID attached.") + .isEqualTo(jobIdOfRecoveredDirtyJobs); - assertThat( - "No JobMaster should have been started.", - jobManagerRunnerFactory.getQueueSize(), - is(0)); + assertThat(jobManagerRunnerFactory.getQueueSize()) + .as("No JobMaster should have been started.") + .isZero(); } @Test @@ -975,11 +951,11 @@ public void testPersistedJobGraphWhenDispatcherIsShutDown() throws Exception { dispatcher.getSelfGateway(DispatcherGateway.class); dispatcherGateway.submitJob(jobGraph, TIMEOUT).get(); - assertThat(dispatcher.getNumberJobs(TIMEOUT).get(), Matchers.is(1)); + assertThat(dispatcher.getNumberJobs(TIMEOUT).get()).isOne(); dispatcher.close(); - assertThat(submittedExecutionPlanStore.contains(jobGraph.getJobID()), Matchers.is(true)); + assertThat(submittedExecutionPlanStore.contains(jobGraph.getJobID())).isTrue(); } /** Tests that a submitted job is suspended if the Dispatcher is terminated. */ @@ -998,12 +974,12 @@ public void testJobSuspensionWhenDispatcherIsTerminated() throws Exception { final CompletableFuture jobResultFuture = dispatcherGateway.requestJobResult(jobGraph.getJobID(), TIMEOUT); - assertThat(jobResultFuture.isDone(), is(false)); + assertThat(jobResultFuture).isNotDone(); dispatcher.close(); final JobResult jobResult = jobResultFuture.get(); - assertEquals(jobResult.getApplicationStatus(), ApplicationStatus.UNKNOWN); + assertThat(jobResult.getApplicationStatus()).isSameAs(ApplicationStatus.UNKNOWN); } @Test @@ -1093,13 +1069,10 @@ public void testOnRemovedJobGraphDoesNotCleanUpHAFiles() throws Exception { processFuture.join(); - assertThat(releaseJobGraphFuture.get(), is(jobGraph.getJobID())); + assertThat(releaseJobGraphFuture.get()).isEqualTo(jobGraph.getJobID()); - try { - removeJobGraphFuture.get(10L, TimeUnit.MILLISECONDS); - fail("onRemovedExecutionPlan should not remove the job from the ExecutionPlanStore."); - } catch (TimeoutException expected) { - } + assertThatThrownBy(() -> removeJobGraphFuture.get(10L, TimeUnit.MILLISECONDS)) + .isInstanceOf(TimeoutException.class); } @Test @@ -1119,7 +1092,7 @@ public void testInitializationTimestampForwardedToJobManagerRunner() throws Exce final long initializationTimestamp = initializationTimestampQueue.take(); // ensure all statuses are set in the ExecutionGraph - assertThat(initializationTimestamp, greaterThan(0L)); + assertThat(initializationTimestamp).isGreaterThan(0L); } @Test @@ -1286,8 +1259,8 @@ private JobManagerRunner completedJobManagerRunnerWithJobStatus( private static void assertOnlyContainsSingleJobWithState( final JobStatus expectedJobStatus, final MultipleJobsDetails multipleJobsDetails) { final Collection finishedJobDetails = multipleJobsDetails.getJobs(); - assertEquals(1, finishedJobDetails.size()); - assertEquals(expectedJobStatus, finishedJobDetails.iterator().next().getStatus()); + assertThat(finishedJobDetails).hasSize(1); + assertThat(finishedJobDetails.iterator().next().getStatus()).isEqualTo(expectedJobStatus); } @Test @@ -1304,7 +1277,7 @@ public void testOnlyRecoveredJobsAreRetainedInTheBlobServer() throws Exception { .setRecoveredJobs(Collections.singleton(new JobGraph(jobId1, "foobar"))) .build(rpcService); - Assertions.assertThat(blobServer.getFile(jobId1, blobKey1)).hasBinaryContent(fileContent); + assertThat(blobServer.getFile(jobId1, blobKey1)).hasBinaryContent(fileContent); Assertions.assertThatThrownBy(() -> blobServer.getFile(jobId2, blobKey2)) .isInstanceOf(NoSuchFileException.class); } @@ -1326,16 +1299,16 @@ public void testRetrieveJobResultAfterSubmissionOfFailedJob() throws Exception { submitFuture.get(); final ArchivedExecutionGraph archivedExecutionGraph = dispatcherGateway.requestJob(failedJobId, TIMEOUT).get(); - Assertions.assertThat(archivedExecutionGraph.getJobID()).isEqualTo(failedJobId); - Assertions.assertThat(archivedExecutionGraph.getJobName()).isEqualTo(failedJobName); - Assertions.assertThat(archivedExecutionGraph.getState()).isEqualTo(JobStatus.FAILED); - Assertions.assertThat(archivedExecutionGraph.getFailureInfo()) + assertThat(archivedExecutionGraph.getJobID()).isEqualTo(failedJobId); + assertThat(archivedExecutionGraph.getJobName()).isEqualTo(failedJobName); + assertThat(archivedExecutionGraph.getState()).isEqualTo(JobStatus.FAILED); + assertThat(archivedExecutionGraph.getFailureInfo()) .isNotNull() .extracting(ErrorInfo::getException) .extracting(e -> e.deserializeError(Thread.currentThread().getContextClassLoader())) .satisfies( exception -> - Assertions.assertThat(exception) + assertThat(exception) .isInstanceOf(RuntimeException.class) .hasMessage("Test exception.")); } @@ -1430,8 +1403,7 @@ public void testPersistErrorHandling() throws Exception { .isInstanceOf(RestHandlerException.class) .satisfies( e -> - Assertions.assertThat( - ((RestHandlerException) e).getHttpResponseStatus()) + assertThat(((RestHandlerException) e).getHttpResponseStatus()) .isSameAs(HttpResponseStatus.INTERNAL_SERVER_ERROR)); // verify that persist errors prevents the requirement from being applied @@ -1509,8 +1481,8 @@ public void testJobResourceRequirementsAreGuardedAgainstConcurrentModification() testConcurrentModificationIsPrevented( dispatcherGateway, blockingJobMaster, secondJobGraph); - Assertions.assertThat(firstPendingUpdateFuture).isNotCompleted(); - Assertions.assertThat(secondPendingUpdateFuture).isNotCompleted(); + assertThat(firstPendingUpdateFuture).isNotCompleted(); + assertThat(secondPendingUpdateFuture).isNotCompleted(); blockedUpdatesToJobMasterFuture.complete(Acknowledge.get()); assertThatFuture(firstPendingUpdateFuture).eventuallySucceeds(); assertThatFuture(secondPendingUpdateFuture).eventuallySucceeds(); @@ -1541,8 +1513,7 @@ private CompletableFuture testConcurrentModificationIsPrevented( .isInstanceOf(RestHandlerException.class) .satisfies( e -> - Assertions.assertThat( - ((RestHandlerException) e).getHttpResponseStatus()) + assertThat(((RestHandlerException) e).getHttpResponseStatus()) .isSameAs(HttpResponseStatus.CONFLICT)); assertThatFuture(pendingUpdateFuture).isNotCompleted(); @@ -1634,7 +1605,7 @@ public JobManagerRunner createJobManagerRunner( Collection failureEnricher, long initializationTimestamp) throws Exception { - assertEquals(expectedJobId, graph.getJobID()); + assertThat(graph.getJobID()).isEqualTo(expectedJobId); final JobMasterGateway jobMasterGateway = new TestingJobMasterGatewayBuilder() .setRequestJobSupplier( @@ -1943,7 +1914,7 @@ public JobManagerRunner createJobManagerRunner( Collection failureEnrichers, long initializationTimestamp) throws Exception { - assertEquals(expectedJobId, graph.getJobID()); + assertThat(graph.getJobID()).isEqualTo(expectedJobId); return JobMasterServiceLeadershipRunnerFactory.INSTANCE.createJobManagerRunner( graph, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponentTest.java index 21ed3e30d50a6..116a25d07c599 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponentTest.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.entrypoint.component; +import org.apache.flink.core.testutils.FlinkAssertions; import org.apache.flink.runtime.clusterframework.ApplicationStatus; import org.apache.flink.runtime.dispatcher.DispatcherOperationCaches; import org.apache.flink.runtime.dispatcher.runner.TestingDispatcherRunner; @@ -36,9 +37,7 @@ import java.time.Duration; import java.util.concurrent.CompletableFuture; -import static org.apache.flink.core.testutils.FlinkMatchers.containsCause; -import static org.apache.flink.core.testutils.FlinkMatchers.willNotComplete; -import static org.junit.Assert.assertThat; +import static org.assertj.core.api.Assertions.assertThat; /** Tests for the {@link DispatcherResourceManagerComponent}. */ public class DispatcherResourceManagerComponentTest extends TestLogger { @@ -59,7 +58,7 @@ public void unexpectedResourceManagerTermination_failsFatally() { terminationFuture.completeExceptionally(expectedException); final Throwable error = fatalErrorHandler.getException(); - assertThat(error, containsCause(expectedException)); + assertThat(error).hasCause(expectedException); } private DispatcherResourceManagerComponent createDispatcherResourceManagerComponent( @@ -94,7 +93,7 @@ public void unexpectedResourceManagerTermination_ifNotRunning_doesNotFailFatally terminationFuture.completeExceptionally(expectedException); final CompletableFuture errorFuture = fatalErrorHandler.getErrorFuture(); - assertThat(errorFuture, willNotComplete(Duration.ofMillis(10L))); + FlinkAssertions.assertThatFuture(errorFuture).willNotCompleteWithin(Duration.ofMillis(10L)); } /** diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java index 955338f9f3ee8..a52c1501ccdc0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java @@ -18,7 +18,7 @@ package org.apache.flink.runtime.heartbeat; -import org.apache.flink.core.testutils.FlinkMatchers; +import org.apache.flink.core.testutils.FlinkAssertions; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.rpc.exceptions.RecipientUnreachableException; @@ -29,7 +29,6 @@ import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor; import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter; -import org.hamcrest.Matcher; import org.junit.ClassRule; import org.junit.Test; import org.slf4j.Logger; @@ -51,14 +50,8 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; -import static org.hamcrest.Matchers.greaterThanOrEqualTo; -import static org.hamcrest.Matchers.hasSize; -import static org.hamcrest.Matchers.is; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Tests for the {@link HeartbeatManager}. */ public class HeartbeatManagerTest extends TestLogger { @@ -116,12 +109,12 @@ public void testRegularHeartbeat() throws InterruptedException { final String inputPayload1 = "foobar"; heartbeatManager.requestHeartbeat(targetResourceID, inputPayload1); - assertThat(reportedPayloads.take(), is(inputPayload1)); - assertThat(reportedPayloadsHeartbeatTarget.take(), is(outputPayload)); + assertThat(reportedPayloads.take()).isEqualTo(inputPayload1); + assertThat(reportedPayloadsHeartbeatTarget.take()).isEqualTo(outputPayload); final String inputPayload2 = "barfoo"; heartbeatManager.receiveHeartbeat(targetResourceID, inputPayload2); - assertThat(reportedPayloads.take(), is(inputPayload2)); + assertThat(reportedPayloads.take()).isEqualTo(inputPayload2); } /** Tests that the heartbeat monitors are updated when receiving a new heartbeat signal. */ @@ -157,9 +150,9 @@ public void testHeartbeatMonitorUpdate() { final List> scheduledTasksAfterHeartbeat = manuallyTriggeredScheduledExecutor.getAllScheduledTasks(); - assertThat(scheduledTasksAfterHeartbeat, hasSize(2)); + assertThat(scheduledTasksAfterHeartbeat).hasSize(2); // the first scheduled future should be cancelled by the heartbeat update - assertTrue(scheduledTasksAfterHeartbeat.get(0).isCancelled()); + assertThat(scheduledTasksAfterHeartbeat.get(0).isCancelled()).isTrue(); } /** Tests that a heartbeat timeout is signaled if the heartbeat is not reported in time. */ @@ -197,12 +190,12 @@ public void testHeartbeatTimeout() throws Exception { Thread.sleep(HEARTBEAT_INTERVAL); } - assertFalse(timeoutFuture.isDone()); + FlinkAssertions.assertThatFuture(timeoutFuture).eventuallySucceeds(); ResourceID timeoutResourceID = timeoutFuture.get(2 * HEARTBEAT_TIMEOUT, TimeUnit.MILLISECONDS); - assertEquals(targetResourceID, timeoutResourceID); + assertThat(targetResourceID).isEqualTo(timeoutResourceID); } /** @@ -264,20 +257,19 @@ public void testHeartbeatCluster() throws Exception { Thread.sleep(2 * HEARTBEAT_TIMEOUT); - assertFalse(targetHeartbeatTimeoutFuture.isDone()); + assertThat(targetHeartbeatTimeoutFuture).isNotDone(); heartbeatManagerTarget.stop(); ResourceID timeoutResourceID = targetHeartbeatTimeoutFuture.get(2 * HEARTBEAT_TIMEOUT, TimeUnit.MILLISECONDS); - assertThat(timeoutResourceID, is(resourceIdTarget)); + assertThat(timeoutResourceID).isEqualTo(resourceIdTarget); int numberHeartbeats = (int) (2 * HEARTBEAT_TIMEOUT / HEARTBEAT_INTERVAL); - final Matcher numberHeartbeatsMatcher = greaterThanOrEqualTo(numberHeartbeats / 2); - assertThat(numReportPayloadCallsTarget.get(), is(numberHeartbeatsMatcher)); - assertThat(numReportPayloadCallsSender.get(), is(numberHeartbeatsMatcher)); + assertThat(numReportPayloadCallsTarget.get()).isGreaterThanOrEqualTo(numberHeartbeats / 2); + assertThat(numReportPayloadCallsSender.get()).isGreaterThanOrEqualTo(numberHeartbeats / 2); } /** Tests that after unmonitoring a target, there won't be a timeout triggered. */ @@ -311,12 +303,9 @@ public void testTargetUnmonitoring() throws Exception { heartbeatManager.unmonitorTarget(targetID); - try { - timeoutFuture.get(2 * heartbeatTimeout, TimeUnit.MILLISECONDS); - fail("Timeout should time out."); - } catch (TimeoutException ignored) { - // the timeout should not be completed since we unmonitored the target - } + assertThatThrownBy(() -> timeoutFuture.get(2 * heartbeatTimeout, TimeUnit.MILLISECONDS)) + // the timeout should not be completed since we unmonitored the target + .isInstanceOf(TimeoutException.class); } /** Tests that the last heartbeat from an unregistered target equals -1. */ @@ -337,7 +326,7 @@ public void testLastHeartbeatFromUnregisteredTarget() { LOG); try { - assertEquals(-1L, heartbeatManager.getLastHeartbeatFrom(ResourceID.generate())); + assertThat(heartbeatManager.getLastHeartbeatFrom(ResourceID.generate())).isEqualTo(-1L); } finally { heartbeatManager.stop(); } @@ -363,13 +352,14 @@ public void testLastHeartbeatFrom() { heartbeatManager.monitorTarget( target, new TestingHeartbeatTargetBuilder<>().createTestingHeartbeatTarget()); - assertEquals(0L, heartbeatManager.getLastHeartbeatFrom(target)); + assertThat(heartbeatManager.getLastHeartbeatFrom(target)).isZero(); final long currentTime = System.currentTimeMillis(); heartbeatManager.receiveHeartbeat(target, null); - assertTrue(heartbeatManager.getLastHeartbeatFrom(target) >= currentTime); + assertThat(heartbeatManager.getLastHeartbeatFrom(target)) + .isGreaterThanOrEqualTo(currentTime); } finally { heartbeatManager.stop(); } @@ -429,10 +419,11 @@ public void testHeartbeatManagerTargetPayload() throws Exception { heartbeatManager.monitorTarget(specialTargetId, specialHeartbeatTarget); heartbeatManager.requestHeartbeat(someTargetId, null); - assertThat(someHeartbeatPayloadFuture.get(), is(payloads.get(someTargetId))); + assertThat(someHeartbeatPayloadFuture.get()).isEqualTo(payloads.get(someTargetId)); heartbeatManager.requestHeartbeat(specialTargetId, null); - assertThat(specialHeartbeatPayloadFuture.get(), is(payloads.get(specialTargetId))); + assertThat(specialHeartbeatPayloadFuture.get()) + .isEqualTo(payloads.get(specialTargetId)); } finally { heartbeatManager.stop(); } @@ -482,9 +473,10 @@ public void testHeartbeatManagerSenderTargetPayload() throws Exception { someTargetReceivedLatch.await(5, TimeUnit.SECONDS); specialTargetReceivedLatch.await(5, TimeUnit.SECONDS); - assertEquals(defaultResponse, someHeartbeatTarget.getLastRequestedHeartbeatPayload()); - assertEquals( - specialResponse, specialHeartbeatTarget.getLastRequestedHeartbeatPayload()); + assertThat(defaultResponse) + .isEqualTo(someHeartbeatTarget.getLastRequestedHeartbeatPayload()); + assertThat(specialResponse) + .isEqualTo(specialHeartbeatTarget.getLastRequestedHeartbeatPayload()); } finally { heartbeatManager.stop(); scheduledThreadPoolExecutor.shutdown(); @@ -568,9 +560,8 @@ public void testHeartbeatManagerMarksTargetUnreachableOnRecipientUnreachableExce for (int i = 0; i < failedRpcRequestsUntilUnreachable - 1; i++) { heartbeatManager.requestHeartbeat(someTargetId, null); - assertThat( - unreachableTargetFuture, - FlinkMatchers.willNotComplete(willNotCompleteWithin)); + FlinkAssertions.assertThatFuture(unreachableTargetFuture) + .willNotCompleteWithin(willNotCompleteWithin); } heartbeatManager.requestHeartbeat(someTargetId, null); @@ -619,8 +610,8 @@ public void testHeartbeatManagerIgnoresRecipientUnreachableExceptionIfDisabled() heartbeatManager.requestHeartbeat(someTargetId, null); } - assertThat( - unreachableTargetFuture, FlinkMatchers.willNotComplete(willNotCompleteWithin)); + FlinkAssertions.assertThatFuture(unreachableTargetFuture) + .willNotCompleteWithin(willNotCompleteWithin); } finally { heartbeatManager.stop(); } @@ -668,8 +659,8 @@ public void testHeartbeatManagerResetsFailedRpcCountOnSuccessfulRpc() throws Exc heartbeatManager.requestHeartbeat(someTargetId, null); } - assertThat( - unreachableTargetFuture, FlinkMatchers.willNotComplete(willNotCompleteWithin)); + FlinkAssertions.assertThatFuture(unreachableTargetFuture) + .willNotCompleteWithin(willNotCompleteWithin); } finally { heartbeatManager.stop(); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/DefaultExecutionPlanStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/DefaultExecutionPlanStoreTest.java index a3c25539b8d43..5454e5e59db4c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/DefaultExecutionPlanStoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/DefaultExecutionPlanStoreTest.java @@ -20,7 +20,6 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.core.testutils.FlinkMatchers; import org.apache.flink.runtime.checkpoint.TestingRetrievableStateStorageHelper; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobGraphTestUtils; @@ -36,7 +35,6 @@ import org.apache.flink.util.TestLogger; import org.apache.flink.util.concurrent.Executors; -import org.assertj.core.api.Assertions; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -55,13 +53,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; -import static org.hamcrest.Matchers.contains; -import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.notNullValue; -import static org.hamcrest.Matchers.nullValue; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertThrows; -import static org.junit.Assert.fail; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** * Tests for {@link DefaultExecutionPlanStore} with {@link TestingExecutionPlanStoreWatcher}, {@link @@ -104,8 +97,8 @@ public void testRecoverExecutionPlan() throws Exception { final ExecutionPlan recoveredExecutionPlan = executionPlanStore.recoverExecutionPlan(testingExecutionPlan.getJobID()); - assertThat(recoveredExecutionPlan, is(notNullValue())); - assertThat(recoveredExecutionPlan.getJobID(), is(testingExecutionPlan.getJobID())); + assertThat(recoveredExecutionPlan).isNotNull(); + assertThat(recoveredExecutionPlan.getJobID()).isEqualTo(testingExecutionPlan.getJobID()); } @Test @@ -123,7 +116,7 @@ public void testRecoverExecutionPlanWhenNotExist() throws Exception { final ExecutionPlan recoveredExecutionPlan = executionPlanStore.recoverExecutionPlan(testingExecutionPlan.getJobID()); - assertThat(recoveredExecutionPlan, is(nullValue())); + assertThat(recoveredExecutionPlan).isNull(); } @Test @@ -141,15 +134,13 @@ public void testRecoverExecutionPlanFailedShouldReleaseHandle() throws Exception final ExecutionPlanStore executionPlanStore = createAndStartExecutionPlanStore(stateHandleStore); - try { - executionPlanStore.recoverExecutionPlan(testingExecutionPlan.getJobID()); - fail( - "recoverExecutionPlan should fail when there is exception in getting the state handle."); - } catch (Exception ex) { - assertThat(ex, FlinkMatchers.containsCause(testException)); - String actual = releaseFuture.get(timeout, TimeUnit.MILLISECONDS); - assertThat(actual, is(testingExecutionPlan.getJobID().toString())); - } + assertThatThrownBy( + () -> + executionPlanStore.recoverExecutionPlan( + testingExecutionPlan.getJobID())) + .hasCause(testException); + String actual = releaseFuture.get(timeout, TimeUnit.MILLISECONDS); + assertThat(testingExecutionPlan.getJobID()).hasToString(actual); } @Test @@ -169,7 +160,7 @@ public void testPutExecutionPlanWhenNotExist() throws Exception { executionPlanStore.putExecutionPlan(testingExecutionPlan); final ExecutionPlan actual = addFuture.get(timeout, TimeUnit.MILLISECONDS); - assertThat(actual.getJobID(), is(testingExecutionPlan.getJobID())); + assertThat(actual.getJobID()).isEqualTo(testingExecutionPlan.getJobID()); } @Test @@ -200,9 +191,9 @@ public void testPutExecutionPlanWhenAlreadyExist() throws Exception { final Tuple3 actual = replaceFuture.get(timeout, TimeUnit.MILLISECONDS); - assertThat(actual.f0, is(testingExecutionPlan.getJobID().toString())); - assertThat(actual.f1, is(IntegerResourceVersion.valueOf(resourceVersion))); - assertThat(actual.f2.getJobID(), is(testingExecutionPlan.getJobID())); + assertThat(actual.f0).isEqualTo(testingExecutionPlan.getJobID().toString()); + assertThat(actual.f1).isEqualTo(IntegerResourceVersion.valueOf(resourceVersion)); + assertThat(actual.f2.getJobID()).isEqualTo(testingExecutionPlan.getJobID()); } @Test @@ -221,7 +212,7 @@ public void testGlobalCleanup() throws Exception { .globalCleanupAsync(testingExecutionPlan.getJobID(), Executors.directExecutor()) .join(); final JobID actual = removeFuture.get(timeout, TimeUnit.MILLISECONDS); - assertThat(actual, is(testingExecutionPlan.getJobID())); + assertThat(actual).isEqualTo(testingExecutionPlan.getJobID()); } @Test @@ -237,7 +228,7 @@ public void testGlobalCleanupWithNonExistName() throws Exception { .globalCleanupAsync(testingExecutionPlan.getJobID(), Executors.directExecutor()) .join(); - assertThat(removeFuture.isDone(), is(true)); + assertThat(removeFuture).isDone(); } @Test @@ -247,13 +238,14 @@ public void testGlobalCleanupFailsIfRemovalReturnsFalse() throws Exception { final ExecutionPlanStore executionPlanStore = createAndStartExecutionPlanStore(stateHandleStore); - assertThrows( - ExecutionException.class, - () -> - executionPlanStore - .globalCleanupAsync( - testingExecutionPlan.getJobID(), Executors.directExecutor()) - .get()); + assertThatThrownBy( + () -> + executionPlanStore + .globalCleanupAsync( + testingExecutionPlan.getJobID(), + Executors.directExecutor()) + .get()) + .isInstanceOf(ExecutionException.class); } @Test @@ -270,7 +262,7 @@ public void testGetJobIds() throws Exception { final ExecutionPlanStore executionPlanStore = createAndStartExecutionPlanStore(stateHandleStore); final Collection jobIds = executionPlanStore.getJobIds(); - assertThat(jobIds, contains(existingJobIds.toArray())); + assertThat(jobIds).containsAll(existingJobIds); } @Test @@ -283,7 +275,7 @@ public void testOnAddedExecutionPlanShouldNotProcessKnownExecutionPlans() throws executionPlanStore.putExecutionPlan(testingExecutionPlan); testingExecutionPlanStoreWatcher.addExecutionPlan(testingExecutionPlan.getJobID()); - assertThat(testingExecutionPlanListener.getAddedExecutionPlans().size(), is(0)); + assertThat(testingExecutionPlanListener.getAddedExecutionPlans()).isEmpty(); } @Test @@ -303,8 +295,8 @@ public void testOnAddedExecutionPlanShouldOnlyProcessUnknownExecutionPlans() thr // Unknown job final JobID unknownJobId = JobID.generate(); testingExecutionPlanStoreWatcher.addExecutionPlan(unknownJobId); - assertThat(testingExecutionPlanListener.getAddedExecutionPlans().size(), is(1)); - assertThat(testingExecutionPlanListener.getAddedExecutionPlans(), contains(unknownJobId)); + assertThat(testingExecutionPlanListener.getAddedExecutionPlans()).hasSize(1); + assertThat(testingExecutionPlanListener.getAddedExecutionPlans()).contains(unknownJobId); } @Test @@ -320,10 +312,9 @@ public void testOnRemovedExecutionPlanShouldOnlyProcessKnownExecutionPlans() thr testingExecutionPlanStoreWatcher.removeExecutionPlan(JobID.generate()); // Known job testingExecutionPlanStoreWatcher.removeExecutionPlan(testingExecutionPlan.getJobID()); - assertThat(testingExecutionPlanListener.getRemovedExecutionPlans().size(), is(1)); - assertThat( - testingExecutionPlanListener.getRemovedExecutionPlans(), - contains(testingExecutionPlan.getJobID())); + assertThat(testingExecutionPlanListener.getRemovedExecutionPlans()).hasSize(1); + assertThat(testingExecutionPlanListener.getRemovedExecutionPlans()) + .contains(testingExecutionPlan.getJobID()); } @Test @@ -334,7 +325,7 @@ public void testOnRemovedExecutionPlanShouldNotProcessUnknownExecutionPlans() th createAndStartExecutionPlanStore(stateHandleStore); testingExecutionPlanStoreWatcher.removeExecutionPlan(testingExecutionPlan.getJobID()); - assertThat(testingExecutionPlanListener.getRemovedExecutionPlans().size(), is(0)); + assertThat(testingExecutionPlanListener.getRemovedExecutionPlans()).isEmpty(); } @Test @@ -347,7 +338,7 @@ public void testOnAddedExecutionPlanIsIgnoredAfterBeingStop() throws Exception { executionPlanStore.stop(); testingExecutionPlanStoreWatcher.addExecutionPlan(testingExecutionPlan.getJobID()); - assertThat(testingExecutionPlanListener.getAddedExecutionPlans().size(), is(0)); + assertThat(testingExecutionPlanListener.getAddedExecutionPlans()).isEmpty(); } @Test @@ -361,7 +352,7 @@ public void testOnRemovedExecutionPlanIsIgnoredAfterBeingStop() throws Exception executionPlanStore.stop(); testingExecutionPlanStoreWatcher.removeExecutionPlan(testingExecutionPlan.getJobID()); - assertThat(testingExecutionPlanListener.getRemovedExecutionPlans().size(), is(0)); + assertThat(testingExecutionPlanListener.getRemovedExecutionPlans()).isEmpty(); } @Test @@ -374,7 +365,7 @@ public void testStoppingExecutionPlanStoreShouldReleaseAllHandles() throws Excep createAndStartExecutionPlanStore(stateHandleStore); executionPlanStore.stop(); - assertThat(completableFuture.isDone(), is(true)); + assertThat(completableFuture).isDone(); } @Test @@ -390,7 +381,7 @@ public void testLocalCleanupShouldReleaseHandle() throws Exception { .join(); final String actual = releaseFuture.get(); - assertThat(actual, is(testingExecutionPlan.getJobID().toString())); + assertThat(testingExecutionPlan.getJobID()).hasToString(actual); } @Test @@ -450,7 +441,7 @@ private static void assertStoredRequirementsAre( JobResourceRequirements.readFromExecutionPlan( Objects.requireNonNull( (JobGraph) executionPlanStore.recoverExecutionPlan(jobId))); - Assertions.assertThat(maybeRecovered).get().isEqualTo(expected); + assertThat(maybeRecovered).get().isEqualTo(expected); } @Test @@ -463,11 +454,11 @@ public void testPutJobResourceRequirementsOfNonExistentJob() throws Exception { .build(); final ExecutionPlanStore executionPlanStore = createAndStartExecutionPlanStore(stateHandleStore); - assertThrows( - NoSuchElementException.class, - () -> - executionPlanStore.putJobResourceRequirements( - new JobID(), JobResourceRequirements.empty())); + assertThatThrownBy( + () -> + executionPlanStore.putJobResourceRequirements( + new JobID(), JobResourceRequirements.empty())) + .isInstanceOf(NoSuchElementException.class); } private ExecutionPlanStore createAndStartExecutionPlanStore( diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogBaseTableResolutionTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogBaseTableResolutionTest.java index dea19ebb9486a..9746ea9bafb13 100644 --- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogBaseTableResolutionTest.java +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogBaseTableResolutionTest.java @@ -44,13 +44,11 @@ import java.util.List; import java.util.Map; -import static org.apache.flink.core.testutils.FlinkMatchers.containsMessage; import static org.apache.flink.table.utils.CatalogManagerMocks.DEFAULT_CATALOG; import static org.apache.flink.table.utils.CatalogManagerMocks.DEFAULT_DATABASE; import static org.apache.flink.table.utils.EncodingUtils.encodeBytesToBase64; import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.fail; -import static org.assertj.core.api.HamcrestCondition.matching; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** * Tests for {@link CatalogTable} to {@link ResolvedCatalogTable}, {@link CatalogMaterializedTable} @@ -252,41 +250,30 @@ void testPropertyDeSerialization() throws Exception { @Test void testPropertyDeserializationError() { - try { - final Map properties = catalogTableAsProperties(); - properties.remove("schema.4.data-type"); - CatalogTable.fromProperties(properties); - fail("unknown failure"); - } catch (Exception e) { - assertThat(e) - .satisfies( - matching( - containsMessage( - "Could not find property key 'schema.4.data-type'."))); - } + assertThatThrownBy( + () -> { + final Map properties = catalogTableAsProperties(); + properties.remove("schema.4.data-type"); + CatalogTable.fromProperties(properties); + }) + .hasRootCauseMessage("Could not find property key 'schema.4.data-type'."); } @Test void testInvalidPartitionKeys() { final CatalogTable catalogTable = - CatalogTable.of( - TABLE_SCHEMA, - null, - Arrays.asList("region", "countyINVALID"), - Collections.emptyMap()); - - try { - resolveCatalogBaseTable(ResolvedCatalogTable.class, catalogTable); - fail("Invalid partition keys expected."); - } catch (Exception e) { - assertThat(e) - .satisfies( - matching( - containsMessage( - "Invalid partition key 'countyINVALID'. A partition key must " - + "reference a physical column in the schema. Available " - + "columns are: [id, region, county]"))); - } + CatalogTable.newBuilder() + .schema(TABLE_SCHEMA) + .comment(null) + .partitionKeys(Arrays.asList("region", "countyINVALID")) + .options(Collections.emptyMap()) + .build(); + + assertThatThrownBy(() -> resolveCatalogBaseTable(ResolvedCatalogTable.class, catalogTable)) + .hasRootCauseMessage( + "Invalid partition key 'countyINVALID'. A partition key must " + + "reference a physical column in the schema. Available " + + "columns are: [id, region, county]"); } @Test @@ -317,18 +304,11 @@ void testInvalidDistributionKeys() { Collections.emptyMap(), null, TableDistribution.ofHash(Collections.singletonList("countyINVALID"), 6)); - try { - resolveCatalogBaseTable(ResolvedCatalogTable.class, catalogTable); - fail("Invalid bucket keys expected."); - } catch (Exception e) { - assertThat(e) - .satisfies( - matching( - containsMessage( - "Invalid bucket key 'countyINVALID'. A bucket key for a distribution must " - + "reference a physical column in the schema. " - + "Available columns are: [id, region, county]"))); - } + assertThatThrownBy(() -> resolveCatalogBaseTable(ResolvedCatalogTable.class, catalogTable)) + .hasRootCauseMessage( + "Invalid bucket key 'countyINVALID'. A bucket key for a distribution must " + + "reference a physical column in the schema. " + + "Available columns are: [id, region, county]"); } @Test @@ -342,17 +322,10 @@ void testInvalidDistributionBucketCount() { null, TableDistribution.ofHash(Collections.singletonList("id"), 0)); - try { - resolveCatalogBaseTable(ResolvedCatalogTable.class, catalogTable); - fail("Invalid bucket keys expected."); - } catch (Exception e) { - assertThat(e) - .satisfies( - matching( - containsMessage( - "Invalid bucket count '0'. The number of buckets for a " - + "distributed table must be at least 1."))); - } + assertThatThrownBy(() -> resolveCatalogBaseTable(ResolvedCatalogTable.class, catalogTable)) + .hasRootCauseMessage( + "Invalid bucket count '0'. The number of buckets for a " + + "distributed table must be at least 1."); } // -------------------------------------------------------------------------------------------- diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaResolutionTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaResolutionTest.java index 75f6fce9d3b82..f69bc791c0785 100644 --- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaResolutionTest.java +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaResolutionTest.java @@ -18,7 +18,6 @@ package org.apache.flink.table.catalog; -import org.apache.flink.core.testutils.FlinkMatchers; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.Schema; import org.apache.flink.table.expressions.CallExpression; @@ -47,8 +46,7 @@ import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.isTimeAttribute; import static org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType; import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.fail; -import static org.assertj.core.api.HamcrestCondition.matching; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Tests for {@link Schema}, {@link DefaultSchemaResolver}, and {@link ResolvedSchema}. */ class SchemaResolutionTest { @@ -434,12 +432,8 @@ private static void testError(Schema schema, String errorMessage) { } private static void testError(Schema schema, String errorMessage, boolean isStreaming) { - try { - resolveSchema(schema, isStreaming); - fail("Error message expected: " + errorMessage); - } catch (Throwable t) { - assertThat(t).satisfies(matching(FlinkMatchers.containsMessage(errorMessage))); - } + assertThatThrownBy(() -> resolveSchema(schema, isStreaming)) + .hasMessageContaining(errorMessage); } private static ResolvedSchema resolveSchema(Schema schema) { diff --git a/flink-table/flink-table-api-scala/src/test/scala/org/apache/flink/table/types/extraction/DataTypeExtractorScalaTest.scala b/flink-table/flink-table-api-scala/src/test/scala/org/apache/flink/table/types/extraction/DataTypeExtractorScalaTest.scala index 79880f76142d8..0115ad2285a78 100644 --- a/flink-table/flink-table-api-scala/src/test/scala/org/apache/flink/table/types/extraction/DataTypeExtractorScalaTest.scala +++ b/flink-table/flink-table-api-scala/src/test/scala/org/apache/flink/table/types/extraction/DataTypeExtractorScalaTest.scala @@ -22,7 +22,6 @@ import org.apache.flink.table.api.ValidationException import org.apache.flink.table.types.extraction.DataTypeExtractorTest._ import org.assertj.core.api.Assertions.assertThatThrownBy -import org.assertj.core.api.HamcrestCondition import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.MethodSource @@ -37,7 +36,7 @@ class DataTypeExtractorScalaTest { def testScalaExtraction(testSpec: DataTypeExtractorTest.TestSpec): Unit = { if (testSpec.hasErrorMessage) { assertThatThrownBy(() => runExtraction(testSpec)) - .is(HamcrestCondition.matching(errorMatcher(testSpec))) + .hasRootCauseMessage(testSpec.expectedErrorMessage) .isInstanceOf[ValidationException] } else { runExtraction(testSpec) diff --git a/flink-table/flink-table-code-splitter/src/test/java/org/apache/flink/table/codesplit/JavaCodeSplitterTest.java b/flink-table/flink-table-code-splitter/src/test/java/org/apache/flink/table/codesplit/JavaCodeSplitterTest.java index 22b7b6d5c64fa..f6a389b5a4394 100644 --- a/flink-table/flink-table-code-splitter/src/test/java/org/apache/flink/table/codesplit/JavaCodeSplitterTest.java +++ b/flink-table/flink-table-code-splitter/src/test/java/org/apache/flink/table/codesplit/JavaCodeSplitterTest.java @@ -17,7 +17,6 @@ package org.apache.flink.table.codesplit; -import org.apache.flink.core.testutils.FlinkMatchers; import org.apache.flink.util.FileUtils; import org.junit.jupiter.api.Disabled; @@ -28,7 +27,6 @@ import static org.apache.flink.table.codesplit.CodeSplitTestUtil.trimLines; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; -import static org.assertj.core.api.HamcrestCondition.matching; /** Tests for {@link JavaCodeSplitter}. */ class JavaCodeSplitterTest { @@ -46,15 +44,12 @@ void testNotSplitJavaCode() { @Test @Disabled("Disabled in because of https://issues.apache.org/jira/browse/FLINK-27702") void testInvalidJavaCode() { - try { - JavaCodeSplitter.split("public class InvalidClass { return 1; }", 4000, 10000); - } catch (Exception e) { - assertThat(e) - .satisfies( - matching( - FlinkMatchers.containsMessage( - "JavaCodeSplitter failed. This is a bug. Please file an issue."))); - } + assertThatThrownBy( + () -> + JavaCodeSplitter.split( + "public class InvalidClass { return 1; }", 4000, 10000)) + .hasMessageContaining( + "JavaCodeSplitter failed. This is a bug. Please file an issue."); } @Test diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/extraction/DataTypeExtractorTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/extraction/DataTypeExtractorTest.java index 21eed2f027712..5c27ae62a3e33 100644 --- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/extraction/DataTypeExtractorTest.java +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/extraction/DataTypeExtractorTest.java @@ -44,7 +44,6 @@ import org.apache.flink.table.types.utils.DataTypeFactoryMock; import org.apache.flink.types.Row; -import org.hamcrest.Matcher; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; @@ -64,7 +63,6 @@ import java.util.stream.Stream; import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches; -import static org.apache.flink.core.testutils.FlinkMatchers.containsCause; import static org.apache.flink.table.test.TableAssertions.assertThat; import static org.apache.flink.table.types.utils.DataTypeFactoryMock.dummyRaw; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -506,7 +504,7 @@ static class TestSpec { private @Nullable DataType expectedDataType; - private @Nullable String expectedErrorMessage; + @Nullable String expectedErrorMessage; private TestSpec( @Nullable String description, Function extractor) { @@ -626,10 +624,6 @@ static void runExtraction(TestSpec testSpec) { } } - static Matcher errorMatcher(TestSpec testSpec) { - return containsCause(new ValidationException(testSpec.expectedErrorMessage)); - } - /** Testing data type shared with the Scala tests. */ static DataType getSimplePojoDataType(Class simplePojoClass) { final StructuredType.Builder builder = StructuredType.newBuilder(simplePojoClass); diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/CodeSplitTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/CodeSplitTest.java index 49e158a34f41c..6f677a34a3e2e 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/CodeSplitTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/CodeSplitTest.java @@ -20,7 +20,6 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ReadableConfig; -import org.apache.flink.core.testutils.FlinkMatchers; import org.apache.flink.table.api.config.TableConfigOptions; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.binary.BinaryRowData; @@ -57,7 +56,6 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.fail; -import static org.assertj.core.api.HamcrestCondition.matching; /** Tests for code generations with code splitting. */ class CodeSplitTest { @@ -256,7 +254,7 @@ public void write(int b) throws IOException {} consumer.accept(noSplitTableConfig); fail("Expecting compiler exception"); } catch (Exception e) { - assertThat(e).satisfies(matching(FlinkMatchers.containsMessage("grows beyond 64 KB"))); + assertThat(e).hasRootCauseMessage("Code grows beyond 64 KB"); } finally { // set stdout back System.setOut(originalStdOut); diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/tvf/slicing/SliceAssignerTestBase.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/tvf/slicing/SliceAssignerTestBase.java index 44f030ddedf27..f2df69a09890c 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/tvf/slicing/SliceAssignerTestBase.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/tvf/slicing/SliceAssignerTestBase.java @@ -34,10 +34,8 @@ import java.util.Collection; import java.util.List; -import static org.apache.flink.core.testutils.FlinkMatchers.containsMessage; import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.fail; -import static org.assertj.core.api.HamcrestCondition.matching; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Utilities for testing {@link SliceAssigner}s. */ abstract class SliceAssignerTestBase { @@ -51,12 +49,7 @@ static Collection zoneIds() { } protected static void assertErrorMessage(Runnable runnable, String errorMessage) { - try { - runnable.run(); - fail("should fail."); - } catch (Exception e) { - assertThat(e).satisfies(matching(containsMessage(errorMessage))); - } + assertThatThrownBy(runnable::run).hasMessageContaining(errorMessage); } protected static long assignSliceEnd(SliceAssigner assigner, long timestamp) { diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/FlinkCompletableFutureAssert.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/FlinkCompletableFutureAssert.java index 8a8bb1cdebf7a..76abcbc43c5a2 100644 --- a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/FlinkCompletableFutureAssert.java +++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/FlinkCompletableFutureAssert.java @@ -77,6 +77,23 @@ public ThrowableAssertAlternative withThrowableOfType( (ThrowableAssertAlternative) throwableAssert; return cast; } + + /** + * Checks that the underlying throwable has cause of the given type and returns a {@link + * ThrowableAssertAlternative} to chain further assertions on the underlying throwable. + * + * @param cause the expected {@link Throwable} cause + * @param the expected {@link Throwable} cause + * @return a {@link ThrowableAssertAlternative} built with underlying throwable. + */ + public ThrowableAssertAlternative withCauseOfType(Class cause) { + final ThrowableAssertAlternative throwableAssert = + new ThrowableAssertAlternative<>(throwable).withCauseInstanceOf(cause); + @SuppressWarnings("unchecked") + final ThrowableAssertAlternative cast = + (ThrowableAssertAlternative) throwableAssert; + return cast; + } } FlinkCompletableFutureAssert(CompletableFuture actual) { @@ -121,8 +138,6 @@ public ThrowableAssertAlternative eventuallyFailsWith( /** * Assert that {@link CompletableFuture} will not complete within a fixed duration. * - *

This is a replacement for {@link FlinkMatchers#willNotComplete(Duration)} in assertj. - * * @return {@code this} assertion object. */ public FlinkCompletableFutureAssert willNotCompleteWithin(Duration duration) { diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/FlinkMatchers.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/FlinkMatchers.java deleted file mode 100644 index 2eaa65ae04a34..0000000000000 --- a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/FlinkMatchers.java +++ /dev/null @@ -1,406 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.core.testutils; - -import org.hamcrest.Description; -import org.hamcrest.Matcher; -import org.hamcrest.TypeSafeDiagnosingMatcher; - -import java.io.PrintWriter; -import java.io.StringWriter; -import java.time.Duration; -import java.util.Objects; -import java.util.Optional; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.function.Function; -import java.util.function.Predicate; - -/** - * Some reusable hamcrest matchers for Flink. - * - * @deprecated You should assertj assertions, which have built-in assertions for {@link - * CompletableFuture}. To check chains of {@link Throwable} causes, use {@link - * FlinkAssertions#anyCauseMatches(String)} or {@link FlinkAssertions#anyCauseMatches(Class, - * String)} - */ -@Deprecated -public class FlinkMatchers { - - // ------------------------------------------------------------------------ - // factories - // ------------------------------------------------------------------------ - - /** - * Checks whether {@link CompletableFuture} completed already exceptionally with a specific - * exception type. - */ - public static FutureFailedMatcher futureFailedWith( - Class exceptionType) { - Objects.requireNonNull(exceptionType, "exceptionType should not be null"); - return new FutureFailedMatcher<>(exceptionType); - } - - /** - * Checks whether {@link CompletableFuture} will completed exceptionally within a certain time. - */ - public static FutureWillFailMatcher futureWillCompleteExceptionally( - Class exceptionType, Duration timeout) { - Objects.requireNonNull(exceptionType, "exceptionType should not be null"); - Objects.requireNonNull(timeout, "timeout should not be null"); - return new FutureWillFailMatcher<>(exceptionType, timeout); - } - - /** - * Checks whether {@link CompletableFuture} will completed exceptionally within a certain time. - */ - public static FutureWillFailMatcher futureWillCompleteExceptionally( - Function exceptionCheck, - Duration timeout, - String checkDescription) { - Objects.requireNonNull(exceptionCheck, "exceptionType should not be null"); - Objects.requireNonNull(timeout, "timeout should not be null"); - return new FutureWillFailMatcher<>(exceptionCheck, timeout, checkDescription); - } - - /** - * Checks whether {@link CompletableFuture} will completed exceptionally within a certain time. - */ - public static FutureWillFailMatcher futureWillCompleteExceptionally(Duration timeout) { - return futureWillCompleteExceptionally(Throwable.class, timeout); - } - - /** Checks for a {@link Throwable} that matches by class. */ - public static Matcher containsCause(Class failureCause) { - return new ContainsCauseMatcher(failureCause); - } - - /** Checks for a {@link Throwable} that matches by class and message. */ - public static Matcher containsCause(Throwable failureCause) { - return new ContainsCauseAndMessageMatcher(failureCause); - } - - /** Checks for a {@link Throwable} that contains the expected error message. */ - public static Matcher containsMessage(String errorMessage) { - return new ContainsMessageMatcher(errorMessage); - } - - /** Checks that a {@link CompletableFuture} won't complete within the given timeout. */ - public static Matcher> willNotComplete(Duration timeout) { - return new WillNotCompleteMatcher(timeout); - } - - // ------------------------------------------------------------------------ - - /** This class should not be instantiated. */ - private FlinkMatchers() {} - - // ------------------------------------------------------------------------ - // matcher implementations - // ------------------------------------------------------------------------ - - private static final class FutureFailedMatcher - extends TypeSafeDiagnosingMatcher> { - - private final Class expectedException; - - FutureFailedMatcher(Class expectedException) { - super(CompletableFuture.class); - this.expectedException = expectedException; - } - - @Override - protected boolean matchesSafely( - CompletableFuture future, Description mismatchDescription) { - if (!future.isDone()) { - mismatchDescription.appendText("Future is not completed."); - return false; - } - - if (!future.isCompletedExceptionally()) { - Object result = future.getNow(null); - assert result != null; - mismatchDescription.appendText( - "Future did not complete exceptionally, but instead regularly with: " - + result); - return false; - } - - try { - future.getNow(null); - throw new Error(); - } catch (CompletionException e) { - if (e.getCause() != null - && expectedException.isAssignableFrom(e.getCause().getClass())) { - return true; - } - - mismatchDescription.appendText( - "Future completed with different exception: " + e.getCause()); - return false; - } - } - - @Override - public void describeTo(Description description) { - description.appendText( - "A CompletableFuture that failed with: " + expectedException.getName()); - } - } - - private static final class FutureWillFailMatcher - extends TypeSafeDiagnosingMatcher> { - - private final Function exceptionValidator; - - private final Duration timeout; - - private final String validationDescription; - - FutureWillFailMatcher(Class expectedException, Duration timeout) { - - super(CompletableFuture.class); - this.exceptionValidator = (e) -> expectedException.isAssignableFrom(e.getClass()); - this.timeout = timeout; - this.validationDescription = expectedException.getName(); - } - - FutureWillFailMatcher( - Function exceptionValidator, - Duration timeout, - String validationDescription) { - - super(CompletableFuture.class); - this.exceptionValidator = exceptionValidator; - this.timeout = timeout; - this.validationDescription = validationDescription; - } - - @Override - protected boolean matchesSafely( - CompletableFuture future, Description mismatchDescription) { - try { - final Object result = future.get(timeout.toMillis(), TimeUnit.MILLISECONDS); - mismatchDescription.appendText( - "Future did not complete exceptionally, but instead regularly with: " - + result); - return false; - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new Error("interrupted test"); - } catch (TimeoutException e) { - mismatchDescription.appendText( - "Future did not complete withing " + timeout.toMillis() + " milliseconds."); - return false; - } catch (ExecutionException e) { - final Throwable cause = e.getCause(); - if (cause != null && exceptionValidator.apply(cause)) { - return true; - } - - String otherDescription = "(null)"; - if (cause != null) { - final StringWriter stm = new StringWriter(); - try (PrintWriter wrt = new PrintWriter(stm)) { - cause.printStackTrace(wrt); - } - otherDescription = stm.toString(); - } - - mismatchDescription.appendText( - "Future completed with different exception: " + otherDescription); - return false; - } - } - - @Override - public void describeTo(Description description) { - description.appendText( - "A CompletableFuture that will have failed within " - + timeout.toMillis() - + " milliseconds with: " - + validationDescription); - } - } - - private static final class ContainsCauseMatcher extends TypeSafeDiagnosingMatcher { - - private final Class failureCause; - - private ContainsCauseMatcher(Class failureCause) { - this.failureCause = failureCause; - } - - @Override - protected boolean matchesSafely(Throwable throwable, Description description) { - final Optional optionalCause = - findThrowable(throwable, cause -> cause.getClass() == failureCause); - - if (!optionalCause.isPresent()) { - description - .appendText("The throwable ") - .appendValue(throwable) - .appendText(" does not contain the expected failure cause ") - .appendValue(failureCause.getSimpleName()); - } - - return optionalCause.isPresent(); - } - - @Override - public void describeTo(Description description) { - description - .appendText("Expected failure cause is ") - .appendValue(failureCause.getSimpleName()); - } - } - - private static final class ContainsCauseAndMessageMatcher - extends TypeSafeDiagnosingMatcher { - - private final Throwable failureCause; - - private ContainsCauseAndMessageMatcher(Throwable failureCause) { - this.failureCause = failureCause; - } - - @Override - protected boolean matchesSafely(Throwable throwable, Description description) { - final Optional optionalCause = - findThrowable( - throwable, - cause -> - cause.getClass() == failureCause.getClass() - && cause.getMessage() - .equals(failureCause.getMessage())); - - if (!optionalCause.isPresent()) { - description - .appendText("The throwable ") - .appendValue(throwable) - .appendText(" does not contain the expected failure cause ") - .appendValue(failureCause); - } - - return optionalCause.isPresent(); - } - - @Override - public void describeTo(Description description) { - description.appendText("Expected failure cause is ").appendValue(failureCause); - } - } - - private static final class ContainsMessageMatcher extends TypeSafeDiagnosingMatcher { - - private final String errorMessage; - - private ContainsMessageMatcher(String errorMessage) { - this.errorMessage = errorMessage; - } - - @Override - protected boolean matchesSafely(Throwable throwable, Description description) { - final Optional optionalCause = - findThrowable(throwable, this::containsErrorMessage); - - if (!optionalCause.isPresent()) { - description - .appendText("The throwable ") - .appendValue(throwable) - .appendText(" does not contain the expected error message ") - .appendValue(errorMessage); - } - - return optionalCause.isPresent(); - } - - @Override - public void describeTo(Description description) { - description.appendText("Expected error message is ").appendValue(errorMessage); - } - - private boolean containsErrorMessage(Throwable t) { - return t.getMessage() != null && t.getMessage().contains(errorMessage); - } - } - - // copied from flink-core to not mess up the dependency design too much, just for a little - // utility method - private static Optional findThrowable( - Throwable throwable, Predicate predicate) { - if (throwable == null || predicate == null) { - return Optional.empty(); - } - - Throwable t = throwable; - while (t != null) { - if (predicate.test(t)) { - return Optional.of(t); - } else { - t = t.getCause(); - } - } - - return Optional.empty(); - } - - private static final class WillNotCompleteMatcher - extends TypeSafeDiagnosingMatcher> { - - private final Duration timeout; - - private WillNotCompleteMatcher(Duration timeout) { - this.timeout = timeout; - } - - @Override - protected boolean matchesSafely( - CompletableFuture item, Description mismatchDescription) { - - try { - final Object value = item.get(timeout.toMillis(), TimeUnit.MILLISECONDS); - mismatchDescription - .appendText("The given future completed with ") - .appendValue(value); - } catch (TimeoutException timeoutException) { - return true; - } catch (InterruptedException e) { - mismatchDescription.appendText("The waiting thread was interrupted."); - } catch (ExecutionException e) { - mismatchDescription - .appendText("The given future was completed exceptionally: ") - .appendValue(e); - } - - return false; - } - - @Override - public void describeTo(Description description) { - description - .appendText("The given future should not complete within ") - .appendValue(timeout.toMillis()) - .appendText(" ms."); - } - } -} diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterITCase.java b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterITCase.java index 9e5638ccc4ef5..965d1ebef6bbe 100644 --- a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterITCase.java @@ -36,11 +36,11 @@ import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink; import org.apache.flink.util.TestLogger; +import org.assertj.core.api.Assertions; import org.junit.Test; -import static org.apache.flink.core.testutils.FlinkMatchers.containsMessage; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.fail; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Integration tests for the {@link JobMaster}. */ public class JobMasterITCase extends TestLogger { @@ -58,9 +58,9 @@ public void testRejectionOfEmptyJobGraphs() throws Exception { try { miniCluster.getMiniCluster().submitJob(jobGraph).get(); - fail("Expect failure"); + Assertions.fail("Expect failure"); } catch (Throwable t) { - assertThat(t, containsMessage("The given job is empty")); + assertThat(t).hasRootCauseMessage("The given job is empty"); } finally { miniCluster.after(); } @@ -85,11 +85,7 @@ public void testJobManagerInitializationExceptionsAreForwardedToTheUser() { see.fromSource(mySource, WatermarkStrategy.noWatermarks(), "MySourceName"); stream.sinkTo(new DiscardingSink<>()); - try { - see.execute(); - } catch (Exception e) { - assertThat(e, containsMessage("Context was not yet initialized")); - } + assertThatThrownBy(see::execute).hasRootCauseMessage("Context was not yet initialized"); } private static class FailOnInitializationSource implements Source { diff --git a/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveSchedulerITCase.java b/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveSchedulerITCase.java index f15cb467032c9..e2136169728d7 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveSchedulerITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveSchedulerITCase.java @@ -58,12 +58,12 @@ import org.apache.flink.streaming.api.functions.source.legacy.RichParallelSourceFunction; import org.apache.flink.streaming.util.RestartStrategyUtils; import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; import org.apache.flink.util.Preconditions; import org.apache.flink.util.SerializedValue; import org.apache.flink.util.TestLogger; -import org.assertj.core.api.Assertions; import org.junit.After; import org.junit.Before; import org.junit.ClassRule; @@ -84,15 +84,9 @@ import java.util.stream.Collectors; import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture; -import static org.apache.flink.core.testutils.FlinkMatchers.containsCause; -import static org.apache.flink.util.ExceptionUtils.assertThrowable; -import static org.hamcrest.CoreMatchers.containsString; -import static org.hamcrest.CoreMatchers.either; -import static org.hamcrest.CoreMatchers.is; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertThrows; -import static org.junit.Assert.fail; -import static org.junit.Assume.assumeTrue; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.Assumptions.assumeThat; /** Integration tests for the adaptive scheduler. */ public class AdaptiveSchedulerITCase extends TestLogger { @@ -124,7 +118,7 @@ private static Configuration getConfiguration() { @Before public void ensureAdaptiveSchedulerEnabled() { - assumeTrue(ClusterOptions.isAdaptiveSchedulerEnabled(configuration)); + assumeThat(ClusterOptions.isAdaptiveSchedulerEnabled(configuration)).isTrue(); } @After @@ -172,8 +166,8 @@ public void testStopWithSavepointNoError() throws Exception { savepointDirectory.getAbsolutePath(), SavepointFormatType.CANONICAL) .get(); - assertThat(savepoint, containsString(savepointDirectory.getAbsolutePath())); - assertThat(client.getJobStatus().get(), is(JobStatus.FINISHED)); + assertThat(savepoint).contains(savepointDirectory.getAbsolutePath()); + assertThat(client.getJobStatus().get()).isSameAs(JobStatus.FINISHED); } @Test @@ -187,16 +181,14 @@ public void testStopWithSavepointFailOnCheckpoint() throws Exception { JobClient client = env.executeAsync(); DummySource.awaitRunning(); - try { - client.stopWithSavepoint( - false, - tempFolder.newFolder("savepoint").getAbsolutePath(), - SavepointFormatType.CANONICAL) - .get(); - fail("Expect exception"); - } catch (ExecutionException e) { - assertThat(e, containsCause(FlinkException.class)); - } + assertThatThrownBy( + () -> + client.stopWithSavepoint( + false, + tempFolder.newFolder("savepoint").getAbsolutePath(), + SavepointFormatType.CANONICAL) + .get()) + .hasCauseInstanceOf(FlinkException.class); // expect job to run again (maybe restart) CommonTestUtils.waitUntilCondition(() -> client.getJobStatus().get() == JobStatus.RUNNING); } @@ -217,18 +209,18 @@ public void testStopWithSavepointFailOnStop() throws Throwable { false, tempFolder.newFolder("savepoint").getAbsolutePath(), SavepointFormatType.CANONICAL); - final Throwable savepointException = - assertThrows(ExecutionException.class, savepointCompleted::get).getCause(); - assertThrowable( - savepointException, - throwable -> - throwable instanceof StopWithSavepointStoppingException - && throwable - .getMessage() - .startsWith("A savepoint has been created at: ")); - assertThat( - client.getJobStatus().get(), - either(is(JobStatus.FAILED)).or(is(JobStatus.FAILING))); + assertThatThrownBy(savepointCompleted::get) + .isInstanceOf(ExecutionException.class) + .satisfies( + e -> + assertThat( + ExceptionUtils.findThrowable( + e, + StopWithSavepointStoppingException + .class) + .get()) + .hasMessageContaining("A savepoint has been created at: ")); + assertThat(client.getJobStatus().get()).isIn(JobStatus.FAILED, JobStatus.FAILING); } @Test @@ -248,16 +240,14 @@ public void testStopWithSavepointFailOnFirstSavepointSucceedOnSecond() throws Ex DummySource.awaitRunning(); DummySource.resetForParallelism(PARALLELISM); final File savepointDirectory = tempFolder.newFolder("savepoint"); - try { - client.stopWithSavepoint( - false, - savepointDirectory.getAbsolutePath(), - SavepointFormatType.CANONICAL) - .get(); - fail("Expect failure of operation"); - } catch (ExecutionException e) { - assertThat(e, containsCause(FlinkException.class)); - } + assertThatThrownBy( + () -> + client.stopWithSavepoint( + false, + savepointDirectory.getAbsolutePath(), + SavepointFormatType.CANONICAL) + .get()) + .hasCauseInstanceOf(FlinkException.class); DummySource.awaitRunning(); @@ -273,7 +263,7 @@ public void testStopWithSavepointFailOnFirstSavepointSucceedOnSecond() throws Ex savepointDirectory.getAbsolutePath(), SavepointFormatType.CANONICAL) .get(); - assertThat(savepoint, containsString(savepointDirectory.getAbsolutePath())); + assertThat(savepoint).contains(savepointDirectory.getAbsolutePath()); } @Test @@ -335,22 +325,20 @@ public void testGlobalFailureOnRestart() throws Exception { // there should be exactly 1 root exception in the history from the failing vertex, // as the global coordinator failure should be treated as a concurrent exception - Assertions.assertThat(jobExceptions.getExceptionHistory().getEntries()) + assertThat(jobExceptions.getExceptionHistory().getEntries()) .hasSize(1) .allSatisfy( rootExceptionInfo -> - Assertions.assertThat(rootExceptionInfo.getStacktrace()) + assertThat(rootExceptionInfo.getStacktrace()) .contains(FailingInvokable.localExceptionMsg) .doesNotContain( FailingCoordinatorProvider.globalExceptionMsg)) .allSatisfy( rootExceptionInfo -> - Assertions.assertThat(rootExceptionInfo.getConcurrentExceptions()) + assertThat(rootExceptionInfo.getConcurrentExceptions()) .anySatisfy( exceptionInfo -> - Assertions.assertThat( - exceptionInfo - .getStacktrace()) + assertThat(exceptionInfo.getStacktrace()) .contains( FailingCoordinatorProvider .globalExceptionMsg)));