Skip to content

Commit

Permalink
[FLINK-36948][tests] Remove deprecated FlinkMatchers
Browse files Browse the repository at this point in the history
  • Loading branch information
snuyanzin authored Dec 26, 2024
1 parent 95f9a16 commit bc9a99f
Show file tree
Hide file tree
Showing 18 changed files with 359 additions and 897 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.core.execution.CheckpointType;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.core.testutils.FlinkMatchers;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.rest.handler.async.CompletedOperationCache;
import org.apache.flink.runtime.rest.handler.async.OperationResult;
Expand All @@ -41,11 +41,8 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

import static org.apache.flink.core.testutils.FlinkMatchers.futureFailedWith;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** Tests for the {@link DispatcherCachedOperationsHandler} component. */
public class DispatcherCachedOperationsHandlerTest extends TestLogger {
Expand Down Expand Up @@ -124,18 +121,17 @@ public void triggerSavepointRepeatedly() throws ExecutionException, InterruptedE
TriggerSavepointMode.SAVEPOINT,
TIMEOUT);

assertThat(triggerSavepointFunction.getNumberOfInvocations(), is(1));
assertThat(
triggerSavepointFunction.getInvocationParameters().get(0),
is(
assertThat(triggerSavepointFunction.getNumberOfInvocations()).isOne();
assertThat(triggerSavepointFunction.getInvocationParameters().get(0))
.isEqualTo(
new Tuple4<>(
jobID,
targetDirectory,
SavepointFormatType.CANONICAL,
TriggerSavepointMode.SAVEPOINT)));
TriggerSavepointMode.SAVEPOINT));

assertThat(firstAcknowledge.get(), is(Acknowledge.get()));
assertThat(secondAcknowledge.get(), is(Acknowledge.get()));
assertThat(firstAcknowledge.get()).isEqualTo(Acknowledge.get());
assertThat(secondAcknowledge.get()).isEqualTo(Acknowledge.get());
}

@Test
Expand All @@ -155,18 +151,17 @@ public void stopWithSavepointRepeatedly() throws ExecutionException, Interrupted
TriggerSavepointMode.TERMINATE_WITH_SAVEPOINT,
TIMEOUT);

assertThat(stopWithSavepointFunction.getNumberOfInvocations(), is(1));
assertThat(
stopWithSavepointFunction.getInvocationParameters().get(0),
is(
assertThat(stopWithSavepointFunction.getNumberOfInvocations()).isOne();
assertThat(stopWithSavepointFunction.getInvocationParameters().get(0))
.isEqualTo(
new Tuple4<>(
jobID,
targetDirectory,
SavepointFormatType.CANONICAL,
TriggerSavepointMode.TERMINATE_WITH_SAVEPOINT)));
TriggerSavepointMode.TERMINATE_WITH_SAVEPOINT));

assertThat(firstAcknowledge.get(), is(Acknowledge.get()));
assertThat(secondAcknowledge.get(), is(Acknowledge.get()));
assertThat(firstAcknowledge.get()).isEqualTo(Acknowledge.get());
assertThat(secondAcknowledge.get()).isEqualTo(Acknowledge.get());
}

@Test
Expand All @@ -190,23 +185,22 @@ public void retryingCompletedOperationDoesNotMarkCacheEntryAsAccessed()
.get();

// should not complete because we wait for the result to be accessed
assertThat(
savepointTriggerCache.closeAsync(),
FlinkMatchers.willNotComplete(Duration.ofMillis(10)));
FlinkAssertions.assertThatFuture(savepointTriggerCache.closeAsync())
.willNotCompleteWithin(Duration.ofMillis(10));
}

@Test
public void throwsIfCacheIsShuttingDown() {
savepointTriggerCache.closeAsync();
assertThrows(
IllegalStateException.class,
() ->
handler.triggerSavepoint(
operationKey,
targetDirectory,
SavepointFormatType.CANONICAL,
TriggerSavepointMode.SAVEPOINT,
TIMEOUT));
assertThatThrownBy(
() ->
handler.triggerSavepoint(
operationKey,
targetDirectory,
SavepointFormatType.CANONICAL,
TriggerSavepointMode.SAVEPOINT,
TIMEOUT))
.isInstanceOf(IllegalStateException.class);
}

@Test
Expand All @@ -224,15 +218,17 @@ public void getStatus() throws ExecutionException, InterruptedException {
CompletableFuture<OperationResult<String>> statusFuture =
handler.getSavepointStatus(operationKey);

assertEquals(statusFuture.get(), OperationResult.success(savepointLocation));
assertThat(statusFuture.get()).isEqualTo(OperationResult.success(savepointLocation));
}

@Test
public void getStatusFailsIfKeyUnknown() throws InterruptedException {
public void getStatusFailsIfKeyUnknown() {
CompletableFuture<OperationResult<String>> statusFuture =
handler.getSavepointStatus(operationKey);

assertThat(statusFuture, futureFailedWith(UnknownOperationKeyException.class));
FlinkAssertions.assertThatFuture(statusFuture)
.eventuallyFails()
.withCauseOfType(UnknownOperationKeyException.class);
}

private abstract static class TriggerCheckpointSpyFunction
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.configuration.CleanupOptions;
import org.apache.flink.core.execution.RecoveryClaimMode;
import org.apache.flink.core.testutils.FlinkMatchers;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.EmbeddedCompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.PerJobCheckpointRecoveryFactory;
Expand Down Expand Up @@ -53,10 +52,7 @@
import org.apache.flink.runtime.testutils.TestingJobResultStore;
import org.apache.flink.util.concurrent.FutureUtils;

import org.hamcrest.CoreMatchers;
import org.hamcrest.collection.IsEmptyCollection;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

Expand All @@ -65,16 +61,14 @@
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.IsEqual.equalTo;
import static org.junit.Assert.assertTrue;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** An integration test for various fail-over scenarios of the {@link Dispatcher} component. */
public class DispatcherCleanupITCase extends AbstractDispatcherTest {
Expand All @@ -95,8 +89,8 @@ public void setUp() throws Exception {
// First job cleanup still succeeded for the
// CompletedCheckpointStore because the JobGraph cleanup happens
// after the JobManagerRunner closing
assertTrue(previous.getShutdownStatus().isPresent());
assertTrue(previous.getAllCheckpoints().isEmpty());
assertThat(previous.getShutdownStatus()).isPresent();
assertThat(previous.getAllCheckpoints()).isEmpty();
return new EmbeddedCompletedCheckpointStore(
maxCheckpoints,
previous.getAllCheckpoints(),
Expand Down Expand Up @@ -191,12 +185,11 @@ public void testCleanupThroughRetries() throws Exception {

successfulCleanupLatch.await();

assertThat(actualGlobalCleanupCallCount.get(), equalTo(numberOfErrors + 1));
assertThat(actualGlobalCleanupCallCount.get()).isEqualTo(numberOfErrors + 1);

assertThat(
"The JobGraph should be removed from ExecutionPlanStore.",
haServices.getExecutionPlanStore().getJobIds(),
IsEmptyCollection.empty());
assertThat(haServices.getExecutionPlanStore().getJobIds())
.as("The JobGraph should be removed from ExecutionPlanStore.")
.isEmpty();

CommonTestUtils.waitUntilCondition(
() -> haServices.getJobResultStore().hasJobResultEntryAsync(jobId).get());
Expand Down Expand Up @@ -233,20 +226,15 @@ public void testCleanupNotCancellable() throws Exception {

CommonTestUtils.waitUntilCondition(() -> jobManagerRunnerEntry.get() != null);

assertThat(
"The JobResultStore should have this job still marked as dirty.",
haServices.getJobResultStore().hasDirtyJobResultEntryAsync(jobId).get(),
CoreMatchers.is(true));
assertThat(haServices.getJobResultStore().hasDirtyJobResultEntryAsync(jobId).get())
.as("The JobResultStore should have this job still marked as dirty.")
.isTrue();

final DispatcherGateway dispatcherGateway =
dispatcher.getSelfGateway(DispatcherGateway.class);

try {
dispatcherGateway.cancelJob(jobId, TIMEOUT).get();
Assert.fail("Should fail because cancelling the cleanup is not allowed.");
} catch (ExecutionException e) {
assertThat(e, FlinkMatchers.containsCause(JobCancellationFailedException.class));
}
assertThatThrownBy(() -> dispatcherGateway.cancelJob(jobId, TIMEOUT).get())
.hasCauseInstanceOf(JobCancellationFailedException.class);
jobManagerRunnerCleanupFuture.complete(null);

CommonTestUtils.waitUntilCondition(
Expand Down Expand Up @@ -305,25 +293,22 @@ public void testCleanupAfterLeadershipChange() throws Exception {
waitForJobToFinish(confirmedLeaderInformation, dispatcherGateway, jobId);
firstCleanupTriggered.await();

assertThat(
"The cleanup should have been triggered only once.",
actualGlobalCleanupCallCount.get(),
equalTo(1));
assertThat(
"The cleanup should not have reached the successful cleanup code path.",
successfulJobGraphCleanup.isDone(),
equalTo(false));
assertThat(actualGlobalCleanupCallCount.get())
.as("The cleanup should have been triggered only once.")
.isOne();
assertThat(successfulJobGraphCleanup.isDone())
.as("The cleanup should not have reached the successful cleanup code path.")
.isFalse();

assertThat(haServices.getExecutionPlanStore().getJobIds())
.as("The JobGraph is still stored in the ExecutionPlanStore.")
.containsExactly(jobId);
assertThat(
"The JobGraph is still stored in the ExecutionPlanStore.",
haServices.getExecutionPlanStore().getJobIds(),
equalTo(Collections.singleton(jobId)));
assertThat(
"The JobResultStore should have this job marked as dirty.",
haServices.getJobResultStore().getDirtyResults().stream()
.map(JobResult::getJobId)
.collect(Collectors.toSet()),
equalTo(Collections.singleton(jobId)));
haServices.getJobResultStore().getDirtyResults().stream()
.map(JobResult::getJobId)
.collect(Collectors.toSet()))
.as("The JobResultStore should have this job marked as dirty.")
.containsExactly(jobId);

// Run a second dispatcher, that restores our finished job.
final Dispatcher secondDispatcher =
Expand All @@ -338,17 +323,16 @@ public void testCleanupAfterLeadershipChange() throws Exception {
CommonTestUtils.waitUntilCondition(
() -> haServices.getJobResultStore().getDirtyResults().isEmpty());

assertThat(
"The JobGraph is not stored in the ExecutionPlanStore.",
haServices.getExecutionPlanStore().getJobIds(),
IsEmptyCollection.empty());
assertTrue(
"The JobResultStore has the job listed as clean.",
haServices.getJobResultStore().hasJobResultEntryAsync(jobId).get());
assertThat(haServices.getExecutionPlanStore().getJobIds())
.as("The JobGraph is not stored in the ExecutionPlanStore.")
.isEmpty();
assertThat(haServices.getJobResultStore().hasJobResultEntryAsync(jobId).get())
.as("The JobResultStore has the job listed as clean.")
.isTrue();

assertThat(successfulJobGraphCleanup.get(), equalTo(jobId));
assertThat(successfulJobGraphCleanup.get()).isEqualTo(jobId);

assertThat(actualGlobalCleanupCallCount.get(), equalTo(2));
assertThat(actualGlobalCleanupCallCount.get()).isEqualTo(2);
}

private void waitForJobToFinish(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,8 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.apache.flink.core.testutils.FlinkMatchers.containsCause;
import static org.apache.flink.core.testutils.FlinkMatchers.containsMessage;
import static org.apache.flink.runtime.dispatcher.AbstractDispatcherTest.awaitStatus;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
Expand Down Expand Up @@ -272,12 +271,7 @@ public void testGlobalCleanupWhenJobSubmissionFails() throws Exception {
startDispatcher(new FailingJobManagerRunnerFactory(new FlinkException("Test exception")));
final CompletableFuture<Acknowledge> submissionFuture = submitJob();

try {
submissionFuture.get();
fail("Job submission was expected to fail.");
} catch (ExecutionException ee) {
assertThat(ee, containsCause(JobSubmissionException.class));
}
assertThatThrownBy(submissionFuture::get).hasCauseInstanceOf(JobSubmissionException.class);

assertGlobalCleanupTriggered(jobId);
}
Expand Down Expand Up @@ -647,15 +641,12 @@ public void testFailingJobManagerRunnerCleanup() throws Exception {

// submit and fail during job master runner construction
queue.offer(Optional.of(testException));
try {
dispatcherGateway.submitJob(jobGraph, Duration.ofMinutes(1)).get();
fail("A FlinkException is expected");
} catch (Throwable expectedException) {
assertThat(expectedException, containsCause(FlinkException.class));
assertThat(expectedException, containsMessage(testException.getMessage()));
// make sure we've cleaned up in correct order (including HA)
assertGlobalCleanupTriggered(jobId);
}
assertThatThrownBy(() -> dispatcherGateway.submitJob(jobGraph, Duration.ofMinutes(1)).get())
.hasCauseInstanceOf(FlinkException.class)
.hasRootCauseMessage(testException.getMessage());

// make sure we've cleaned up in correct order (including HA)
assertGlobalCleanupTriggered(jobId);

// don't fail this time
queue.offer(Optional.empty());
Expand Down
Loading

0 comments on commit bc9a99f

Please sign in to comment.