From 103961c105523ac1a1aa09f79415eb8639f5dd8b Mon Sep 17 00:00:00 2001 From: Lucien Luc Date: Thu, 11 Jul 2024 16:13:47 -0700 Subject: [PATCH 1/2] Add config to enable PITR --- .../KinesisClientLibConfiguration.java | 2 + .../config/MultiLangDaemonConfiguration.java | 6 ++ .../MultiLangDaemonConfigurationTest.java | 22 ++++++ .../kinesis/leases/LeaseManagementConfig.java | 16 +++- .../DynamoDBLeaseManagementFactory.java | 76 ++++++++++++++++++- .../dynamodb/DynamoDBLeaseRefresher.java | 72 +++++++++++++++++- .../dynamodb/DynamoDBLeaseRefresherTest.java | 69 +++++++++++++++-- 7 files changed, 251 insertions(+), 12 deletions(-) diff --git a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/coordinator/KinesisClientLibConfiguration.java b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/coordinator/KinesisClientLibConfiguration.java index 179eb9e49..95c82569b 100644 --- a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/coordinator/KinesisClientLibConfiguration.java +++ b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/coordinator/KinesisClientLibConfiguration.java @@ -218,6 +218,8 @@ public class KinesisClientLibConfiguration { private AwsCredentialsProvider cloudWatchCredentialsProvider; private long failoverTimeMillis; private boolean enablePriorityLeaseAssignment; + private boolean leaseTableDeletionProtectionEnabled; + private boolean leaseTablePitrEnabled; private String workerIdentifier; private long shardSyncIntervalMillis; private int maxRecords; diff --git a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfiguration.java b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfiguration.java index 08c11d26b..3336be887 100644 --- a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfiguration.java +++ b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfiguration.java @@ -88,6 +88,12 @@ public void setWorkerId(String workerId) { @ConfigurationSettable(configurationClass = LeaseManagementConfig.class) private Boolean enablePriorityLeaseAssignment; + @ConfigurationSettable(configurationClass = LeaseManagementConfig.class) + private Boolean leaseTableDeletionProtectionEnabled; + + @ConfigurationSettable(configurationClass = LeaseManagementConfig.class) + private Boolean leaseTablePitrEnabled; + @ConfigurationSettable(configurationClass = LeaseManagementConfig.class) private long shardSyncIntervalMillis; diff --git a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfigurationTest.java b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfigurationTest.java index b98db83a0..ebc5fb17c 100644 --- a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfigurationTest.java +++ b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfigurationTest.java @@ -99,6 +99,28 @@ public void testSetEnablePriorityLeaseAssignment() { assertThat(resolvedConfiguration.leaseManagementConfig.enablePriorityLeaseAssignment(), equalTo(false)); } + @Test + public void testSetLeaseTableDeletionProtectionEnabled() { + MultiLangDaemonConfiguration configuration = baseConfiguration(); + configuration.setLeaseTableDeletionProtectionEnabled(true); + + MultiLangDaemonConfiguration.ResolvedConfiguration resolvedConfiguration = + configuration.resolvedConfiguration(shardRecordProcessorFactory); + + assertThat(resolvedConfiguration.leaseManagementConfig.leaseTableDeletionProtectionEnabled(), equalTo(true)); + } + + @Test + public void testSetLeaseTablePitrEnabled() { + MultiLangDaemonConfiguration configuration = baseConfiguration(); + configuration.setLeaseTablePitrEnabled(true); + + MultiLangDaemonConfiguration.ResolvedConfiguration resolvedConfiguration = + configuration.resolvedConfiguration(shardRecordProcessorFactory); + + assertThat(resolvedConfiguration.leaseManagementConfig.leaseTablePitrEnabled(), equalTo(true)); + } + @Test public void testDefaultRetrievalConfig() { MultiLangDaemonConfiguration configuration = baseConfiguration(); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java index c8c49a19b..2d4e041c0 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java @@ -59,6 +59,8 @@ public class LeaseManagementConfig { public static final long DEFAULT_GARBAGE_LEASE_CLEANUP_INTERVAL_MILLIS = Duration.ofMinutes(30).toMillis(); public static final long DEFAULT_PERIODIC_SHARD_SYNC_INTERVAL_MILLIS = 2 * 60 * 1000L; + public static final boolean DEFAULT_LEASE_TABLE_DELETION_PROTECTION_ENABLED = false; + public static final boolean DEFAULT_LEASE_TABLE_PITR_ENABLED = false; public static final boolean DEFAULT_ENABLE_PRIORITY_LEASE_ASSIGNMENT = true; public static final int DEFAULT_CONSECUTIVE_HOLES_FOR_TRIGGERING_LEASE_RECOVERY = 3; @@ -208,11 +210,20 @@ public class LeaseManagementConfig { private BillingMode billingMode = BillingMode.PAY_PER_REQUEST; /** - * Whether to enabled deletion protection on the DynamoDB lease table created by KCL. + * Whether to enable deletion protection on the DynamoDB lease table created by KCL. This does not update + * already existing tables. * *

Default value: false */ - private boolean leaseTableDeletionProtectionEnabled = false; + private boolean leaseTableDeletionProtectionEnabled = DEFAULT_LEASE_TABLE_DELETION_PROTECTION_ENABLED; + + /** + * Whether to enable PITR (point in time recovery) on the DynamoDB lease table created by KCL. If true, this can + * update existing table's PITR. + * + *

Default value: false + */ + private boolean leaseTablePitrEnabled = DEFAULT_LEASE_TABLE_PITR_ENABLED; /** * The list of tags to be applied to the DynamoDB table created for lease management. @@ -424,6 +435,7 @@ public LeaseManagementFactory leaseManagementFactory( dynamoDbRequestTimeout(), billingMode(), leaseTableDeletionProtectionEnabled(), + leaseTablePitrEnabled(), tags(), leaseSerializer, customShardDetectorProvider(), diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java index 2eb3e707f..e5435bfc7 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java @@ -99,6 +99,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { private final Duration dynamoDbRequestTimeout; private final BillingMode billingMode; private final boolean leaseTableDeletionProtectionEnabled; + private final boolean leaseTablePitrEnabled; private final Collection tags; private final boolean isMultiStreamMode; private final LeaseCleanupConfig leaseCleanupConfig; @@ -707,7 +708,7 @@ private DynamoDBLeaseManagementFactory( tableCreatorCallback, dynamoDbRequestTimeout, billingMode, - false, + LeaseManagementConfig.DEFAULT_LEASE_TABLE_DELETION_PROTECTION_ENABLED, DefaultSdkAutoConstructList.getInstance(), leaseSerializer); } @@ -945,6 +946,76 @@ public DynamoDBLeaseManagementFactory( * @param isMultiStreamMode * @param leaseCleanupConfig */ + @Deprecated + public DynamoDBLeaseManagementFactory( + final KinesisAsyncClient kinesisClient, + final DynamoDbAsyncClient dynamoDBClient, + final String tableName, + final String workerIdentifier, + final ExecutorService executorService, + final long failoverTimeMillis, + final boolean enablePriorityLeaseAssignment, + final long epsilonMillis, + final int maxLeasesForWorker, + final int maxLeasesToStealAtOneTime, + final int maxLeaseRenewalThreads, + final boolean cleanupLeasesUponShardCompletion, + final boolean ignoreUnexpectedChildShards, + final long shardSyncIntervalMillis, + final boolean consistentReads, + final long listShardsBackoffTimeMillis, + final int maxListShardsRetryAttempts, + final int maxCacheMissesBeforeReload, + final long listShardsCacheAllowedAgeInSeconds, + final int cacheMissWarningModulus, + final long initialLeaseTableReadCapacity, + final long initialLeaseTableWriteCapacity, + final HierarchicalShardSyncer deprecatedHierarchicalShardSyncer, + final TableCreatorCallback tableCreatorCallback, + Duration dynamoDbRequestTimeout, + BillingMode billingMode, + final boolean leaseTableDeletionProtectionEnabled, + Collection tags, + LeaseSerializer leaseSerializer, + Function customShardDetectorProvider, + boolean isMultiStreamMode, + LeaseCleanupConfig leaseCleanupConfig) { + this( + kinesisClient, + dynamoDBClient, + tableName, + workerIdentifier, + executorService, + failoverTimeMillis, + enablePriorityLeaseAssignment, + epsilonMillis, + maxLeasesForWorker, + maxLeasesToStealAtOneTime, + maxLeaseRenewalThreads, + cleanupLeasesUponShardCompletion, + ignoreUnexpectedChildShards, + shardSyncIntervalMillis, + consistentReads, + listShardsBackoffTimeMillis, + maxListShardsRetryAttempts, + maxCacheMissesBeforeReload, + listShardsCacheAllowedAgeInSeconds, + cacheMissWarningModulus, + initialLeaseTableReadCapacity, + initialLeaseTableWriteCapacity, + deprecatedHierarchicalShardSyncer, + tableCreatorCallback, + dynamoDbRequestTimeout, + billingMode, + leaseTableDeletionProtectionEnabled, + LeaseManagementConfig.DEFAULT_LEASE_TABLE_PITR_ENABLED, + tags, + leaseSerializer, + customShardDetectorProvider, + isMultiStreamMode, + leaseCleanupConfig); + } + public DynamoDBLeaseManagementFactory( final KinesisAsyncClient kinesisClient, final DynamoDbAsyncClient dynamoDBClient, @@ -973,6 +1044,7 @@ public DynamoDBLeaseManagementFactory( Duration dynamoDbRequestTimeout, BillingMode billingMode, final boolean leaseTableDeletionProtectionEnabled, + final boolean leaseTablePitrEnabled, Collection tags, LeaseSerializer leaseSerializer, Function customShardDetectorProvider, @@ -1005,6 +1077,7 @@ public DynamoDBLeaseManagementFactory( this.dynamoDbRequestTimeout = dynamoDbRequestTimeout; this.billingMode = billingMode; this.leaseTableDeletionProtectionEnabled = leaseTableDeletionProtectionEnabled; + this.leaseTablePitrEnabled = leaseTablePitrEnabled; this.leaseSerializer = leaseSerializer; this.customShardDetectorProvider = customShardDetectorProvider; this.isMultiStreamMode = isMultiStreamMode; @@ -1091,6 +1164,7 @@ public DynamoDBLeaseRefresher createLeaseRefresher() { dynamoDbRequestTimeout, billingMode, leaseTableDeletionProtectionEnabled, + leaseTablePitrEnabled, tags); } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java index 838d2d154..c380468ea 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java @@ -40,6 +40,7 @@ import software.amazon.awssdk.services.dynamodb.model.GetItemRequest; import software.amazon.awssdk.services.dynamodb.model.GetItemResponse; import software.amazon.awssdk.services.dynamodb.model.LimitExceededException; +import software.amazon.awssdk.services.dynamodb.model.PointInTimeRecoverySpecification; import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughput; import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughputExceededException; import software.amazon.awssdk.services.dynamodb.model.PutItemRequest; @@ -49,6 +50,7 @@ import software.amazon.awssdk.services.dynamodb.model.ScanResponse; import software.amazon.awssdk.services.dynamodb.model.TableStatus; import software.amazon.awssdk.services.dynamodb.model.Tag; +import software.amazon.awssdk.services.dynamodb.model.UpdateContinuousBackupsRequest; import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest; import software.amazon.awssdk.utils.CollectionUtils; import software.amazon.kinesis.annotations.KinesisClientInternalApi; @@ -81,6 +83,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { private final Duration dynamoDbRequestTimeout; private final BillingMode billingMode; private final boolean leaseTableDeletionProtectionEnabled; + private final boolean leaseTablePitrEnabled; private final Collection tags; private boolean newTableCreated = false; @@ -159,7 +162,7 @@ public DynamoDBLeaseRefresher( tableCreatorCallback, dynamoDbRequestTimeout, BillingMode.PAY_PER_REQUEST, - false); + LeaseManagementConfig.DEFAULT_LEASE_TABLE_DELETION_PROTECTION_ENABLED); } /** @@ -207,6 +210,41 @@ public DynamoDBLeaseRefresher( * @param leaseTableDeletionProtectionEnabled * @param tags */ + @Deprecated + public DynamoDBLeaseRefresher( + final String table, + final DynamoDbAsyncClient dynamoDBClient, + final LeaseSerializer serializer, + final boolean consistentReads, + @NonNull final TableCreatorCallback tableCreatorCallback, + Duration dynamoDbRequestTimeout, + final BillingMode billingMode, + final boolean leaseTableDeletionProtectionEnabled, + final Collection tags) { + this( + table, + dynamoDBClient, + serializer, + consistentReads, + tableCreatorCallback, + dynamoDbRequestTimeout, + billingMode, + leaseTableDeletionProtectionEnabled, + LeaseManagementConfig.DEFAULT_LEASE_TABLE_PITR_ENABLED, + tags); + } + + /** + * Constructor. + * @param table + * @param dynamoDBClient + * @param serializer + * @param consistentReads + * @param tableCreatorCallback + * @param dynamoDbRequestTimeout + * @param billingMode + * @param leaseTableDeletionProtectionEnabled + */ public DynamoDBLeaseRefresher( final String table, final DynamoDbAsyncClient dynamoDBClient, @@ -216,6 +254,7 @@ public DynamoDBLeaseRefresher( Duration dynamoDbRequestTimeout, final BillingMode billingMode, final boolean leaseTableDeletionProtectionEnabled, + final boolean leaseTablePitrEnabled, final Collection tags) { this.table = table; this.dynamoDBClient = dynamoDBClient; @@ -225,6 +264,7 @@ public DynamoDBLeaseRefresher( this.dynamoDbRequestTimeout = dynamoDbRequestTimeout; this.billingMode = billingMode; this.leaseTableDeletionProtectionEnabled = leaseTableDeletionProtectionEnabled; + this.leaseTablePitrEnabled = leaseTablePitrEnabled; this.tags = tags; } @@ -252,7 +292,35 @@ public boolean createLeaseTableIfNotExists(@NonNull final Long readCapacity, @No public boolean createLeaseTableIfNotExists() throws ProvisionedThroughputException, DependencyException { final CreateTableRequest request = createTableRequestBuilder().build(); - return createTableIfNotExists(request); + boolean tableExists = createTableIfNotExists(request); + + if (leaseTablePitrEnabled) { + enablePitr(); + log.info("Enabled PITR on table {}", table); + } + + return tableExists; + } + + private void enablePitr() throws DependencyException { + final UpdateContinuousBackupsRequest request = UpdateContinuousBackupsRequest.builder() + .tableName(table) + .pointInTimeRecoverySpecification(PointInTimeRecoverySpecification.builder() + .pointInTimeRecoveryEnabled(true) + .build()) + .build(); + + final AWSExceptionManager exceptionManager = createExceptionManager(); + exceptionManager.add(ResourceNotFoundException.class, t -> t); + exceptionManager.add(ProvisionedThroughputExceededException.class, t -> t); + + try { + FutureUtils.resolveOrCancelFuture(dynamoDBClient.updateContinuousBackups(request), dynamoDbRequestTimeout); + } catch (ExecutionException e) { + throw exceptionManager.apply(e.getCause()); + } catch (InterruptedException | DynamoDbException | TimeoutException e) { + throw new DependencyException(e); + } } private boolean createTableIfNotExists(CreateTableRequest request) diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherTest.java index e757c93b0..f385cf301 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherTest.java @@ -31,6 +31,7 @@ import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.runners.MockitoJUnitRunner; +import software.amazon.awssdk.core.util.DefaultSdkAutoConstructList; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.dynamodb.model.AttributeValue; import software.amazon.awssdk.services.dynamodb.model.BillingMode; @@ -44,6 +45,7 @@ import software.amazon.awssdk.services.dynamodb.model.GetItemRequest; import software.amazon.awssdk.services.dynamodb.model.GetItemResponse; import software.amazon.awssdk.services.dynamodb.model.LimitExceededException; +import software.amazon.awssdk.services.dynamodb.model.PointInTimeRecoverySpecification; import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughput; import software.amazon.awssdk.services.dynamodb.model.PutItemRequest; import software.amazon.awssdk.services.dynamodb.model.PutItemResponse; @@ -54,6 +56,8 @@ import software.amazon.awssdk.services.dynamodb.model.TableDescription; import software.amazon.awssdk.services.dynamodb.model.TableStatus; import software.amazon.awssdk.services.dynamodb.model.Tag; +import software.amazon.awssdk.services.dynamodb.model.UpdateContinuousBackupsRequest; +import software.amazon.awssdk.services.dynamodb.model.UpdateContinuousBackupsResponse; import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest; import software.amazon.awssdk.services.dynamodb.model.UpdateItemResponse; import software.amazon.kinesis.leases.Lease; @@ -80,6 +84,10 @@ public class DynamoDBLeaseRefresherTest { private static final String TABLE_NAME = "test"; private static final boolean CONSISTENT_READS = true; private static final boolean DELETION_PROTECTION_ENABLED = false; + private static final boolean PITR_ENABLED = true; + private static final Collection EMPTY_TAGS = DefaultSdkAutoConstructList.getInstance(); + private static final Collection TAGS = + Collections.singletonList(Tag.builder().key("foo").value("bar").build()); @Mock private DynamoDbAsyncClient dynamoDbClient; @@ -111,6 +119,9 @@ public class DynamoDBLeaseRefresherTest { @Mock private CompletableFuture mockCreateTableFuture; + @Mock + private CompletableFuture mockUpdateContinuousBackupsFuture; + @Mock private Lease lease; @@ -120,8 +131,7 @@ public class DynamoDBLeaseRefresherTest { private DynamoDBLeaseRefresher leaseRefresher; private DescribeTableRequest describeTableRequest; private CreateTableRequest createTableRequest; - private Collection tags; - + private UpdateContinuousBackupsRequest updateContinuousBackupsRequest; private Map serializedLease; @Before @@ -139,6 +149,12 @@ public void setup() throws Exception { .billingMode(BillingMode.PAY_PER_REQUEST) .deletionProtectionEnabled(DELETION_PROTECTION_ENABLED) .build(); + updateContinuousBackupsRequest = UpdateContinuousBackupsRequest.builder() + .tableName(TABLE_NAME) + .pointInTimeRecoverySpecification(PointInTimeRecoverySpecification.builder() + .pointInTimeRecoveryEnabled(PITR_ENABLED) + .build()) + .build(); } @Test @@ -353,7 +369,6 @@ public void testCreateLeaseTableProvisionedBillingModeIfNotExists() throws Excep @Test public void testCreateLeaseTableWithTagsIfNotExists() throws Exception { - tags = Collections.singletonList(Tag.builder().key("foo").value("bar").build()); leaseRefresher = new DynamoDBLeaseRefresher( TABLE_NAME, dynamoDbClient, @@ -363,7 +378,7 @@ public void testCreateLeaseTableWithTagsIfNotExists() throws Exception { LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT, BillingMode.PROVISIONED, DELETION_PROTECTION_ENABLED, - tags); + TAGS); when(dynamoDbClient.describeTable(describeTableRequest)).thenReturn(mockDescribeTableFuture); when(mockDescribeTableFuture.get( @@ -382,7 +397,7 @@ public void testCreateLeaseTableWithTagsIfNotExists() throws Exception { .attributeDefinitions(leaseSerializer.getAttributeDefinitions()) .provisionedThroughput(throughput) .deletionProtectionEnabled(DELETION_PROTECTION_ENABLED) - .tags(tags) + .tags(TAGS) .build(); when(dynamoDbClient.createTable(createTableRequest)).thenReturn(mockCreateTableFuture); when(mockCreateTableFuture.get(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS)) @@ -423,9 +438,49 @@ public void testCreateLeaseTableIfNotExists() throws Exception { Assert.assertTrue(result); } + @Test + public void testCreateLeaseTableIfNotExistsWithPitrEnabled() throws Exception { + DynamoDBLeaseRefresher leaseRefresherWithEnabledPitr = new DynamoDBLeaseRefresher( + TABLE_NAME, + dynamoDbClient, + leaseSerializer, + CONSISTENT_READS, + tableCreatorCallback, + LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT, + BillingMode.PAY_PER_REQUEST, + DELETION_PROTECTION_ENABLED, + PITR_ENABLED, + EMPTY_TAGS); + when(dynamoDbClient.describeTable(describeTableRequest)).thenReturn(mockDescribeTableFuture); + when(mockDescribeTableFuture.get( + eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS))) + .thenThrow(ResourceNotFoundException.builder() + .message("Table doesn't exist") + .build()); + when(dynamoDbClient.createTable(createTableRequest)).thenReturn(mockCreateTableFuture); + when(mockCreateTableFuture.get( + eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS))) + .thenReturn(null); + when(dynamoDbClient.updateContinuousBackups(updateContinuousBackupsRequest)) + .thenReturn(mockUpdateContinuousBackupsFuture); + when(mockUpdateContinuousBackupsFuture.get( + eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS))) + .thenReturn(null); + final boolean result = leaseRefresherWithEnabledPitr.createLeaseTableIfNotExists(); + + verify(dynamoDbClient, times(1)).describeTable(describeTableRequest); + verify(dynamoDbClient, times(1)).createTable(createTableRequest); + verify(dynamoDbClient, times(1)).updateContinuousBackups(updateContinuousBackupsRequest); + verify(mockDescribeTableFuture, times(1)) + .get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS)); + verify(mockCreateTableFuture, times(1)) + .get(eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS)); + Assert.assertTrue(result); + } + @Test public void testCreateLeaseTableProvisionedWithDeletionProtectionIfNotExists() throws Exception { - leaseRefresher = new DynamoDBLeaseRefresher( + DynamoDBLeaseRefresher leaseRefresherWithEnabledDeletionProtection = new DynamoDBLeaseRefresher( TABLE_NAME, dynamoDbClient, leaseSerializer, @@ -458,7 +513,7 @@ public void testCreateLeaseTableProvisionedWithDeletionProtectionIfNotExists() t eq(LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT.toMillis()), eq(TimeUnit.MILLISECONDS))) .thenReturn(null); - final boolean result = leaseRefresher.createLeaseTableIfNotExists(10L, 10L); + final boolean result = leaseRefresherWithEnabledDeletionProtection.createLeaseTableIfNotExists(10L, 10L); verify(dynamoDbClient, times(1)).describeTable(describeTableRequest); verify(dynamoDbClient, times(1)).createTable(createTableRequest); From 2729eddd6f4c2a086358fdfdaa3252425bea8881 Mon Sep 17 00:00:00 2001 From: Lucien Luc Date: Mon, 15 Jul 2024 14:02:48 -0700 Subject: [PATCH 2/2] Add more unit tests and shorthand builder --- .../MultiLangDaemonConfigurationTest.java | 31 ++++++++++++++++--- .../dynamodb/DynamoDBLeaseRefresher.java | 5 +-- .../dynamodb/DynamoDBLeaseRefresherTest.java | 5 +-- 3 files changed, 29 insertions(+), 12 deletions(-) diff --git a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfigurationTest.java b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfigurationTest.java index ebc5fb17c..1c45eb6e8 100644 --- a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfigurationTest.java +++ b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfigurationTest.java @@ -33,6 +33,7 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.instanceOf; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; @@ -100,25 +101,47 @@ public void testSetEnablePriorityLeaseAssignment() { } @Test - public void testSetLeaseTableDeletionProtectionEnabled() { + public void testSetLeaseTableDeletionProtectionEnabledToTrue() { MultiLangDaemonConfiguration configuration = baseConfiguration(); configuration.setLeaseTableDeletionProtectionEnabled(true); MultiLangDaemonConfiguration.ResolvedConfiguration resolvedConfiguration = configuration.resolvedConfiguration(shardRecordProcessorFactory); - assertThat(resolvedConfiguration.leaseManagementConfig.leaseTableDeletionProtectionEnabled(), equalTo(true)); + assertTrue(resolvedConfiguration.leaseManagementConfig.leaseTableDeletionProtectionEnabled()); } @Test - public void testSetLeaseTablePitrEnabled() { + public void testSetLeaseTablePitrEnabledToTrue() { MultiLangDaemonConfiguration configuration = baseConfiguration(); configuration.setLeaseTablePitrEnabled(true); MultiLangDaemonConfiguration.ResolvedConfiguration resolvedConfiguration = configuration.resolvedConfiguration(shardRecordProcessorFactory); - assertThat(resolvedConfiguration.leaseManagementConfig.leaseTablePitrEnabled(), equalTo(true)); + assertTrue(resolvedConfiguration.leaseManagementConfig.leaseTablePitrEnabled()); + } + + @Test + public void testSetLeaseTableDeletionProtectionEnabledToFalse() { + MultiLangDaemonConfiguration configuration = baseConfiguration(); + configuration.setLeaseTableDeletionProtectionEnabled(false); + + MultiLangDaemonConfiguration.ResolvedConfiguration resolvedConfiguration = + configuration.resolvedConfiguration(shardRecordProcessorFactory); + + assertFalse(resolvedConfiguration.leaseManagementConfig.leaseTableDeletionProtectionEnabled()); + } + + @Test + public void testSetLeaseTablePitrEnabledToFalse() { + MultiLangDaemonConfiguration configuration = baseConfiguration(); + configuration.setLeaseTablePitrEnabled(false); + + MultiLangDaemonConfiguration.ResolvedConfiguration resolvedConfiguration = + configuration.resolvedConfiguration(shardRecordProcessorFactory); + + assertFalse(resolvedConfiguration.leaseManagementConfig.leaseTablePitrEnabled()); } @Test diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java index c380468ea..123f4068d 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java @@ -40,7 +40,6 @@ import software.amazon.awssdk.services.dynamodb.model.GetItemRequest; import software.amazon.awssdk.services.dynamodb.model.GetItemResponse; import software.amazon.awssdk.services.dynamodb.model.LimitExceededException; -import software.amazon.awssdk.services.dynamodb.model.PointInTimeRecoverySpecification; import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughput; import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughputExceededException; import software.amazon.awssdk.services.dynamodb.model.PutItemRequest; @@ -305,9 +304,7 @@ public boolean createLeaseTableIfNotExists() throws ProvisionedThroughputExcepti private void enablePitr() throws DependencyException { final UpdateContinuousBackupsRequest request = UpdateContinuousBackupsRequest.builder() .tableName(table) - .pointInTimeRecoverySpecification(PointInTimeRecoverySpecification.builder() - .pointInTimeRecoveryEnabled(true) - .build()) + .pointInTimeRecoverySpecification(builder -> builder.pointInTimeRecoveryEnabled(true)) .build(); final AWSExceptionManager exceptionManager = createExceptionManager(); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherTest.java index f385cf301..2668918c9 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherTest.java @@ -45,7 +45,6 @@ import software.amazon.awssdk.services.dynamodb.model.GetItemRequest; import software.amazon.awssdk.services.dynamodb.model.GetItemResponse; import software.amazon.awssdk.services.dynamodb.model.LimitExceededException; -import software.amazon.awssdk.services.dynamodb.model.PointInTimeRecoverySpecification; import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughput; import software.amazon.awssdk.services.dynamodb.model.PutItemRequest; import software.amazon.awssdk.services.dynamodb.model.PutItemResponse; @@ -151,9 +150,7 @@ public void setup() throws Exception { .build(); updateContinuousBackupsRequest = UpdateContinuousBackupsRequest.builder() .tableName(TABLE_NAME) - .pointInTimeRecoverySpecification(PointInTimeRecoverySpecification.builder() - .pointInTimeRecoveryEnabled(PITR_ENABLED) - .build()) + .pointInTimeRecoverySpecification(builder -> builder.pointInTimeRecoveryEnabled(PITR_ENABLED)) .build(); }