Skip to content
This repository has been archived by the owner on May 3, 2024. It is now read-only.

Commit

Permalink
Spring Boot: Add configuration to reference a subscription by ID (#440)
Browse files Browse the repository at this point in the history
* feat(config): add subscription id as a config property

* feat(config): update readme with subscription config

* feat(config): revert README.md styling

* feat(config): change subscription-id to subscription-by-id

* feat(config): fix README.md

* feat(config): add default string for subscription by id

* Minor grammar improvement in README

---------

Co-authored-by: Kunal Mangaraj <[email protected]>
  • Loading branch information
otrosien and coolguy1990 authored Nov 7, 2023
1 parent 284e534 commit f36e217
Show file tree
Hide file tree
Showing 8 changed files with 149 additions and 134 deletions.
253 changes: 127 additions & 126 deletions fahrschein-spring-boot-starter/README.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -130,19 +130,25 @@ private Subscription getSubscription() throws IOException {
}

SubscriptionBuilder sb = nakadiClient
.subscription(consumerConfig.getApplicationName(), new HashSet<>(consumerConfig.getTopics()))
.withConsumerGroup(consumerConfig.getConsumerGroup())
.withAuthorization(authorization()
.withAdmins(adminAttributes)
.withReaders(readerAttributes)
.build());
.subscription(consumerConfig.getApplicationName(), new HashSet<>(consumerConfig.getTopics()));

if (END.equals(consumerConfig.getReadFrom())) {
sb = sb.readFromEnd();
} else {
sb = sb.readFromBegin();
}
return sb.subscribe();

if (consumerConfig.getSubscriptionById() != null && !consumerConfig.getSubscriptionById().isEmpty()) {
return sb.subscribe(consumerConfig.getSubscriptionById());
} else {
sb
.withConsumerGroup(consumerConfig.getConsumerGroup())
.withAuthorization(authorization()
.withAdmins(adminAttributes)
.withReaders(readerAttributes)
.build());
return sb.subscribe();
}
}

protected StreamParameters getStreamParameters() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ public abstract class AbstractConfig {

private String objectMapperRef;

private String subscriptionById;

@NestedConfigurationProperty
protected OAuthConfig oauth = OAuthConfig.defaultOAuthConfig();

Expand All @@ -37,5 +39,4 @@ public abstract class AbstractConfig {
protected BackoffConfig backoff = BackoffConfig.defaultBackoffConfig();

protected ThreadConfig threads = new ThreadConfig();

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ void mergeWithDefaultConfig(DefaultConsumerConfig defaultConsumerConfig) {
this.setReadFrom(ofNullable(this.getReadFrom()).orElse(defaultConsumerConfig.getReadFrom()));
this.setRecordMetrics(ofNullable(this.getRecordMetrics()).orElse(defaultConsumerConfig.getRecordMetrics()));
this.setObjectMapperRef(ofNullable(this.getObjectMapperRef()).orElse(defaultConsumerConfig.getObjectMapperRef()));
this.setSubscriptionById(merge(this.getSubscriptionById(), defaultConsumerConfig.getSubscriptionById()));

// oauth
if(defaultConsumerConfig.getOauth().getEnabled() && !this.getOauth().getEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public DefaultConsumerConfig() {
setReadFrom(Position.END);
setRecordMetrics(Boolean.FALSE);
setObjectMapperRef(FAHRSCHEIN_OBJECT_MAPPER_REF_NAME);
setSubscriptionById("");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ public void hasProperlyDeserializedConfigProperties() {
.hasFieldOrPropertyWithValue("nakadiUrl", dc.getNakadiUrl())
.hasFieldOrPropertyWithValue("consumerGroup", dc.getConsumerGroup())
.hasFieldOrPropertyWithValue("readFrom", dc.getReadFrom())
.hasFieldOrPropertyWithValue("subscriptionById", "test-by-id")
.hasNoNullFieldsOrProperties();

// consumer oauth config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ public class ConsumerConfigTest {
private static final String ACCESS_TOKEN_ID = "testAccessTokenId";
private static final String NAKADI_URI = "http://localhost";
private static final String APPLICATION_NAME = "TestApp";
private static final String SUBSCRIPTION_BY_ID = "test-by-id";

@Test
public void testInitialConsumerConfig() {
Expand Down Expand Up @@ -108,6 +109,7 @@ public void testMergedConsumerConfig() {
assertThat(cc.getStreamParameters().getStreamTimeout()).isEqualTo(STREAM_TIMEOUT);

assertThat(cc.getHttp().getContentEncoding()).isEqualTo(ContentEncoding.GZIP);
assertThat(cc.getSubscriptionById()).isEqualTo(SUBSCRIPTION_BY_ID);
}

private DefaultConsumerConfig getDefaultConsumerConfig() {
Expand Down Expand Up @@ -135,6 +137,7 @@ private DefaultConsumerConfig getDefaultConsumerConfig() {
dc.getStreamParameters().setStreamKeepAliveLimit(STREAM_KEEP_ALIVE_LIMIT);
dc.getStreamParameters().setStreamLimit(STREAM_LIMIT);
dc.getStreamParameters().setStreamTimeout(STREAM_TIMEOUT);
dc.setSubscriptionById(SUBSCRIPTION_BY_ID);

return dc;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,4 @@ fahrschein:
content-encoding: identity
topics:
- first.first-update
subscription-by-id: "test-by-id"

0 comments on commit f36e217

Please sign in to comment.