Skip to content

Commit

Permalink
Fix an issue in configuring idleTimeBetweenReadsInMillis in MultiLang…
Browse files Browse the repository at this point in the history
…Daemon (#1230)

Fix an issue where the idleTimeBetweenReadInMillis configured
via MultiLangDaemon was not taking effect because it used
the auto-generated setter from Lombok to set the configured value,
while there is a custom setter that must be invoked to set the
value correctly.

There is also a general confusion between using Lombok's setter vs
custom setter in java.

Unifying the approach to use the custom Lombok-fluent-style setter
and deprecating the previously added custom setIdleTimeBetweenReadsInMillis

Correct way to configure idleTimeBetweenReadsInMillis for MultiLang is
to add this in the properties file:
idleTimeBetweenReadsInMillis = 10000 # 10 seconds

Correct way to configure for java:
configsBuilder.retrievalConfig().retrievalSpecificConfig(
    new PollingConfig(streamName, kinesisClient)
        .idleTimeBetweenReadsInMillis(Duration.ofSeconds(10).toMillis())

Issues: #999, #950, #515
  • Loading branch information
akidambisrinivasan authored Nov 23, 2023
1 parent a48f543 commit 44837b7
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;

import org.apache.commons.beanutils.BeanUtilsBean;
import org.apache.commons.beanutils.ConvertUtilsBean;
Expand Down Expand Up @@ -103,12 +105,20 @@ public void testDefaultRetrievalConfig() {
public void testDefaultRetrievalConfigWithPollingConfigSet() {
MultiLangDaemonConfiguration configuration = baseConfiguration();
configuration.setMaxRecords(10);
configuration.setIdleTimeBetweenReadsInMillis(60000);

MultiLangDaemonConfiguration.ResolvedConfiguration resolvedConfiguration = configuration
.resolvedConfiguration(shardRecordProcessorFactory);

assertThat(resolvedConfiguration.getRetrievalConfig().retrievalSpecificConfig(),
instanceOf(PollingConfig.class));
assertEquals(10,
((PollingConfig) resolvedConfiguration.getRetrievalConfig().retrievalSpecificConfig()).maxRecords());
assertEquals(60000,
((PollingConfig) resolvedConfiguration.getRetrievalConfig().retrievalSpecificConfig())
.idleTimeBetweenReadsInMillis());
assertTrue(((PollingConfig) resolvedConfiguration.getRetrievalConfig().retrievalSpecificConfig())
.usePollingConfigIdleTimeValue());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import java.time.Duration;
import java.util.Optional;
import java.util.function.Function;

import lombok.AccessLevel;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NonNull;
Expand Down Expand Up @@ -86,11 +88,15 @@ public PollingConfig(String streamName, KinesisAsyncClient kinesisClient) {
* The value for how long the ShardConsumer should sleep in between calls to
* {@link KinesisAsyncClient#getRecords(GetRecordsRequest)}.
*
* If this is not set using {@link PollingConfig#idleTimeBetweenReadsInMillis},
* it defaults to 1500 ms.
*
* <p>
* Default value: 1000L
* Default value: 1500L
* </p>
*/
private long idleTimeBetweenReadsInMillis = 1000L;
@Setter(AccessLevel.NONE)
private long idleTimeBetweenReadsInMillis = 1500L;

/**
* Time to wait in seconds before the worker retries to get a record.
Expand Down Expand Up @@ -119,14 +125,23 @@ public PollingConfig(String streamName, KinesisAsyncClient kinesisClient) {
*/
private RecordsFetcherFactory recordsFetcherFactory = new SimpleRecordsFetcherFactory();

/**
* @Deprecated Use {@link PollingConfig#idleTimeBetweenReadsInMillis} instead
*/
@Deprecated
public void setIdleTimeBetweenReadsInMillis(long idleTimeBetweenReadsInMillis) {
idleTimeBetweenReadsInMillis(idleTimeBetweenReadsInMillis);
}

/**
* Set the value for how long the ShardConsumer should sleep in between calls to
* {@link KinesisAsyncClient#getRecords(GetRecordsRequest)}. If this is not specified here the value provided in
* {@link RecordsFetcherFactory} will be used.
*/
public void setIdleTimeBetweenReadsInMillis(long idleTimeBetweenReadsInMillis) {
public PollingConfig idleTimeBetweenReadsInMillis(long idleTimeBetweenReadsInMillis) {
usePollingConfigIdleTimeValue = true;
this.idleTimeBetweenReadsInMillis = idleTimeBetweenReadsInMillis;
return this;
}

/**
Expand Down

0 comments on commit 44837b7

Please sign in to comment.