Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add config to enable PITR #1365

Merged
merged 2 commits into from
Jul 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,8 @@ public class KinesisClientLibConfiguration {
private AwsCredentialsProvider cloudWatchCredentialsProvider;
private long failoverTimeMillis;
private boolean enablePriorityLeaseAssignment;
private boolean leaseTableDeletionProtectionEnabled;
private boolean leaseTablePitrEnabled;
private String workerIdentifier;
private long shardSyncIntervalMillis;
private int maxRecords;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,12 @@ public void setWorkerId(String workerId) {
@ConfigurationSettable(configurationClass = LeaseManagementConfig.class)
private Boolean enablePriorityLeaseAssignment;

@ConfigurationSettable(configurationClass = LeaseManagementConfig.class)
private Boolean leaseTableDeletionProtectionEnabled;

@ConfigurationSettable(configurationClass = LeaseManagementConfig.class)
private Boolean leaseTablePitrEnabled;

@ConfigurationSettable(configurationClass = LeaseManagementConfig.class)
private long shardSyncIntervalMillis;

Expand Down
Copy link

@dcpavel dcpavel Jul 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We check that leaseTableDeletionProtectionEnabled() and leaseTablePitrEnabled() equal true, but this will give a false positive if the default changes to true and setLeaseTable...Enabled() methods break.

This is a bit of a nit since it appears this methodology was repeated with other tests. Just including it for consideration.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, I've added two more tests to set the false value as well

Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;

Expand Down Expand Up @@ -99,6 +100,50 @@ public void testSetEnablePriorityLeaseAssignment() {
assertThat(resolvedConfiguration.leaseManagementConfig.enablePriorityLeaseAssignment(), equalTo(false));
}

@Test
public void testSetLeaseTableDeletionProtectionEnabledToTrue() {
MultiLangDaemonConfiguration configuration = baseConfiguration();
configuration.setLeaseTableDeletionProtectionEnabled(true);

MultiLangDaemonConfiguration.ResolvedConfiguration resolvedConfiguration =
configuration.resolvedConfiguration(shardRecordProcessorFactory);

assertTrue(resolvedConfiguration.leaseManagementConfig.leaseTableDeletionProtectionEnabled());
}

@Test
public void testSetLeaseTablePitrEnabledToTrue() {
MultiLangDaemonConfiguration configuration = baseConfiguration();
configuration.setLeaseTablePitrEnabled(true);

MultiLangDaemonConfiguration.ResolvedConfiguration resolvedConfiguration =
configuration.resolvedConfiguration(shardRecordProcessorFactory);

assertTrue(resolvedConfiguration.leaseManagementConfig.leaseTablePitrEnabled());
}

@Test
public void testSetLeaseTableDeletionProtectionEnabledToFalse() {
MultiLangDaemonConfiguration configuration = baseConfiguration();
configuration.setLeaseTableDeletionProtectionEnabled(false);

MultiLangDaemonConfiguration.ResolvedConfiguration resolvedConfiguration =
configuration.resolvedConfiguration(shardRecordProcessorFactory);

assertFalse(resolvedConfiguration.leaseManagementConfig.leaseTableDeletionProtectionEnabled());
}

@Test
public void testSetLeaseTablePitrEnabledToFalse() {
MultiLangDaemonConfiguration configuration = baseConfiguration();
configuration.setLeaseTablePitrEnabled(false);

MultiLangDaemonConfiguration.ResolvedConfiguration resolvedConfiguration =
configuration.resolvedConfiguration(shardRecordProcessorFactory);

assertFalse(resolvedConfiguration.leaseManagementConfig.leaseTablePitrEnabled());
}

@Test
public void testDefaultRetrievalConfig() {
MultiLangDaemonConfiguration configuration = baseConfiguration();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ public class LeaseManagementConfig {
public static final long DEFAULT_GARBAGE_LEASE_CLEANUP_INTERVAL_MILLIS =
Duration.ofMinutes(30).toMillis();
public static final long DEFAULT_PERIODIC_SHARD_SYNC_INTERVAL_MILLIS = 2 * 60 * 1000L;
public static final boolean DEFAULT_LEASE_TABLE_DELETION_PROTECTION_ENABLED = false;
public static final boolean DEFAULT_LEASE_TABLE_PITR_ENABLED = false;
public static final boolean DEFAULT_ENABLE_PRIORITY_LEASE_ASSIGNMENT = true;
public static final int DEFAULT_CONSECUTIVE_HOLES_FOR_TRIGGERING_LEASE_RECOVERY = 3;

Expand Down Expand Up @@ -208,11 +210,20 @@ public class LeaseManagementConfig {
private BillingMode billingMode = BillingMode.PAY_PER_REQUEST;

/**
* Whether to enabled deletion protection on the DynamoDB lease table created by KCL.
* Whether to enable deletion protection on the DynamoDB lease table created by KCL. This does not update
* already existing tables.
*
* <p>Default value: false
*/
private boolean leaseTableDeletionProtectionEnabled = false;
private boolean leaseTableDeletionProtectionEnabled = DEFAULT_LEASE_TABLE_DELETION_PROTECTION_ENABLED;

/**
* Whether to enable PITR (point in time recovery) on the DynamoDB lease table created by KCL. If true, this can
* update existing table's PITR.
*
* <p>Default value: false
*/
private boolean leaseTablePitrEnabled = DEFAULT_LEASE_TABLE_PITR_ENABLED;

/**
* The list of tags to be applied to the DynamoDB table created for lease management.
Expand Down Expand Up @@ -424,6 +435,7 @@ public LeaseManagementFactory leaseManagementFactory(
dynamoDbRequestTimeout(),
billingMode(),
leaseTableDeletionProtectionEnabled(),
leaseTablePitrEnabled(),
tags(),
leaseSerializer,
customShardDetectorProvider(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
private final Duration dynamoDbRequestTimeout;
private final BillingMode billingMode;
private final boolean leaseTableDeletionProtectionEnabled;
private final boolean leaseTablePitrEnabled;
private final Collection<Tag> tags;
private final boolean isMultiStreamMode;
private final LeaseCleanupConfig leaseCleanupConfig;
Expand Down Expand Up @@ -707,7 +708,7 @@ private DynamoDBLeaseManagementFactory(
tableCreatorCallback,
dynamoDbRequestTimeout,
billingMode,
false,
LeaseManagementConfig.DEFAULT_LEASE_TABLE_DELETION_PROTECTION_ENABLED,
DefaultSdkAutoConstructList.getInstance(),
leaseSerializer);
}
Expand Down Expand Up @@ -945,6 +946,76 @@ public DynamoDBLeaseManagementFactory(
* @param isMultiStreamMode
* @param leaseCleanupConfig
*/
@Deprecated
public DynamoDBLeaseManagementFactory(
final KinesisAsyncClient kinesisClient,
final DynamoDbAsyncClient dynamoDBClient,
final String tableName,
final String workerIdentifier,
final ExecutorService executorService,
final long failoverTimeMillis,
final boolean enablePriorityLeaseAssignment,
final long epsilonMillis,
final int maxLeasesForWorker,
final int maxLeasesToStealAtOneTime,
final int maxLeaseRenewalThreads,
final boolean cleanupLeasesUponShardCompletion,
final boolean ignoreUnexpectedChildShards,
final long shardSyncIntervalMillis,
final boolean consistentReads,
final long listShardsBackoffTimeMillis,
final int maxListShardsRetryAttempts,
final int maxCacheMissesBeforeReload,
final long listShardsCacheAllowedAgeInSeconds,
final int cacheMissWarningModulus,
final long initialLeaseTableReadCapacity,
final long initialLeaseTableWriteCapacity,
final HierarchicalShardSyncer deprecatedHierarchicalShardSyncer,
final TableCreatorCallback tableCreatorCallback,
Duration dynamoDbRequestTimeout,
BillingMode billingMode,
final boolean leaseTableDeletionProtectionEnabled,
Collection<Tag> tags,
LeaseSerializer leaseSerializer,
Function<StreamConfig, ShardDetector> customShardDetectorProvider,
boolean isMultiStreamMode,
LeaseCleanupConfig leaseCleanupConfig) {
this(
kinesisClient,
dynamoDBClient,
tableName,
workerIdentifier,
executorService,
failoverTimeMillis,
enablePriorityLeaseAssignment,
epsilonMillis,
maxLeasesForWorker,
maxLeasesToStealAtOneTime,
maxLeaseRenewalThreads,
cleanupLeasesUponShardCompletion,
ignoreUnexpectedChildShards,
shardSyncIntervalMillis,
consistentReads,
listShardsBackoffTimeMillis,
maxListShardsRetryAttempts,
maxCacheMissesBeforeReload,
listShardsCacheAllowedAgeInSeconds,
cacheMissWarningModulus,
initialLeaseTableReadCapacity,
initialLeaseTableWriteCapacity,
deprecatedHierarchicalShardSyncer,
tableCreatorCallback,
dynamoDbRequestTimeout,
billingMode,
leaseTableDeletionProtectionEnabled,
LeaseManagementConfig.DEFAULT_LEASE_TABLE_PITR_ENABLED,
tags,
leaseSerializer,
customShardDetectorProvider,
isMultiStreamMode,
leaseCleanupConfig);
}

public DynamoDBLeaseManagementFactory(
final KinesisAsyncClient kinesisClient,
final DynamoDbAsyncClient dynamoDBClient,
Expand Down Expand Up @@ -973,6 +1044,7 @@ public DynamoDBLeaseManagementFactory(
Duration dynamoDbRequestTimeout,
BillingMode billingMode,
final boolean leaseTableDeletionProtectionEnabled,
final boolean leaseTablePitrEnabled,
Collection<Tag> tags,
LeaseSerializer leaseSerializer,
Function<StreamConfig, ShardDetector> customShardDetectorProvider,
Expand Down Expand Up @@ -1005,6 +1077,7 @@ public DynamoDBLeaseManagementFactory(
this.dynamoDbRequestTimeout = dynamoDbRequestTimeout;
this.billingMode = billingMode;
this.leaseTableDeletionProtectionEnabled = leaseTableDeletionProtectionEnabled;
this.leaseTablePitrEnabled = leaseTablePitrEnabled;
this.leaseSerializer = leaseSerializer;
this.customShardDetectorProvider = customShardDetectorProvider;
this.isMultiStreamMode = isMultiStreamMode;
Expand Down Expand Up @@ -1091,6 +1164,7 @@ public DynamoDBLeaseRefresher createLeaseRefresher() {
dynamoDbRequestTimeout,
billingMode,
leaseTableDeletionProtectionEnabled,
leaseTablePitrEnabled,
tags);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import software.amazon.awssdk.services.dynamodb.model.ScanResponse;
import software.amazon.awssdk.services.dynamodb.model.TableStatus;
import software.amazon.awssdk.services.dynamodb.model.Tag;
import software.amazon.awssdk.services.dynamodb.model.UpdateContinuousBackupsRequest;
import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;
import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
Expand Down Expand Up @@ -81,6 +82,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
private final Duration dynamoDbRequestTimeout;
private final BillingMode billingMode;
private final boolean leaseTableDeletionProtectionEnabled;
private final boolean leaseTablePitrEnabled;
private final Collection<Tag> tags;

private boolean newTableCreated = false;
Expand Down Expand Up @@ -159,7 +161,7 @@ public DynamoDBLeaseRefresher(
tableCreatorCallback,
dynamoDbRequestTimeout,
BillingMode.PAY_PER_REQUEST,
false);
LeaseManagementConfig.DEFAULT_LEASE_TABLE_DELETION_PROTECTION_ENABLED);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for this.

}

/**
Expand Down Expand Up @@ -207,6 +209,41 @@ public DynamoDBLeaseRefresher(
* @param leaseTableDeletionProtectionEnabled
* @param tags
*/
@Deprecated
public DynamoDBLeaseRefresher(
final String table,
final DynamoDbAsyncClient dynamoDBClient,
final LeaseSerializer serializer,
final boolean consistentReads,
@NonNull final TableCreatorCallback tableCreatorCallback,
Duration dynamoDbRequestTimeout,
final BillingMode billingMode,
final boolean leaseTableDeletionProtectionEnabled,
final Collection<Tag> tags) {
this(
table,
dynamoDBClient,
serializer,
consistentReads,
tableCreatorCallback,
dynamoDbRequestTimeout,
billingMode,
leaseTableDeletionProtectionEnabled,
LeaseManagementConfig.DEFAULT_LEASE_TABLE_PITR_ENABLED,
tags);
}

/**
* Constructor.
* @param table
* @param dynamoDBClient
* @param serializer
* @param consistentReads
* @param tableCreatorCallback
* @param dynamoDbRequestTimeout
* @param billingMode
* @param leaseTableDeletionProtectionEnabled
*/
public DynamoDBLeaseRefresher(
final String table,
final DynamoDbAsyncClient dynamoDBClient,
Expand All @@ -216,6 +253,7 @@ public DynamoDBLeaseRefresher(
Duration dynamoDbRequestTimeout,
final BillingMode billingMode,
final boolean leaseTableDeletionProtectionEnabled,
final boolean leaseTablePitrEnabled,
final Collection<Tag> tags) {
this.table = table;
this.dynamoDBClient = dynamoDBClient;
Expand All @@ -225,6 +263,7 @@ public DynamoDBLeaseRefresher(
this.dynamoDbRequestTimeout = dynamoDbRequestTimeout;
this.billingMode = billingMode;
this.leaseTableDeletionProtectionEnabled = leaseTableDeletionProtectionEnabled;
this.leaseTablePitrEnabled = leaseTablePitrEnabled;
this.tags = tags;
}

Expand Down Expand Up @@ -252,7 +291,33 @@ public boolean createLeaseTableIfNotExists(@NonNull final Long readCapacity, @No
public boolean createLeaseTableIfNotExists() throws ProvisionedThroughputException, DependencyException {
final CreateTableRequest request = createTableRequestBuilder().build();

return createTableIfNotExists(request);
boolean tableExists = createTableIfNotExists(request);

if (leaseTablePitrEnabled) {
enablePitr();
log.info("Enabled PITR on table {}", table);
}

return tableExists;
}

private void enablePitr() throws DependencyException {
final UpdateContinuousBackupsRequest request = UpdateContinuousBackupsRequest.builder()
.tableName(table)
.pointInTimeRecoverySpecification(builder -> builder.pointInTimeRecoveryEnabled(true))
.build();

final AWSExceptionManager exceptionManager = createExceptionManager();
exceptionManager.add(ResourceNotFoundException.class, t -> t);
exceptionManager.add(ProvisionedThroughputExceededException.class, t -> t);

try {
FutureUtils.resolveOrCancelFuture(dynamoDBClient.updateContinuousBackups(request), dynamoDbRequestTimeout);
} catch (ExecutionException e) {
throw exceptionManager.apply(e.getCause());
} catch (InterruptedException | DynamoDbException | TimeoutException e) {
throw new DependencyException(e);
}
}

private boolean createTableIfNotExists(CreateTableRequest request)
Expand Down
Loading
Loading