From a1cc3075ea7bf26676a59d71bd49ff5de83e1a23 Mon Sep 17 00:00:00 2001 From: Simon Podlipsky Date: Wed, 30 Sep 2020 10:56:29 +0200 Subject: [PATCH] Cleanup Streams init Remove handling of "Streams never finished rebalancing on startup" as it's no longer required --- .../exercises/EmailService.java | 18 +----------------- .../exercises/FraudService.java | 17 ----------------- .../exercises/InventoryService.java | 16 ---------------- .../exercises/OrdersService.java | 16 ---------------- .../ValidationsAggregatorService.java | 16 ---------------- 5 files changed, 1 insertion(+), 82 deletions(-) diff --git a/microservices-orders/exercises/EmailService.java b/microservices-orders/exercises/EmailService.java index cd2e4886e..80a6de154 100644 --- a/microservices-orders/exercises/EmailService.java +++ b/microservices-orders/exercises/EmailService.java @@ -21,9 +21,6 @@ import java.io.IOException; import java.util.Optional; import java.util.Properties; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import org.apache.kafka.streams.KafkaStreams.State; import org.apache.kafka.streams.kstream.GlobalKTable; import org.apache.kafka.streams.kstream.JoinWindows; import org.apache.kafka.streams.kstream.KStream; @@ -58,23 +55,10 @@ public void start(final String bootstrapServers, final Properties defaultConfig) { streams = processStreams(bootstrapServers, stateDir, defaultConfig); streams.cleanUp(); //don't do this in prod as it clears your state stores - final CountDownLatch startLatch = new CountDownLatch(1); - streams.setStateListener((newState, oldState) -> { - if (newState == State.RUNNING && oldState != KafkaStreams.State.RUNNING) { - startLatch.countDown(); - } - }); streams.start(); - try { - if (!startLatch.await(60, TimeUnit.SECONDS)) { - throw new RuntimeException("Streams never finished rebalancing on startup"); - } - } catch (final InterruptedException e) { - Thread.currentThread().interrupt(); - } - log.info("Started Service " + SERVICE_APP_ID); + log.info("Started Service " + SERVICE_APP_ID); } private KafkaStreams processStreams(final String bootstrapServers, diff --git a/microservices-orders/exercises/FraudService.java b/microservices-orders/exercises/FraudService.java index b72e6266d..5eca62749 100644 --- a/microservices-orders/exercises/FraudService.java +++ b/microservices-orders/exercises/FraudService.java @@ -8,15 +8,12 @@ import java.io.IOException; import java.util.Optional; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; import org.apache.commons.cli.*; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; -import org.apache.kafka.streams.KafkaStreams.State; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.Consumed; @@ -61,23 +58,9 @@ public void start(final String bootstrapServers, final Properties defaultConfig) { streams = processStreams(bootstrapServers, stateDir, defaultConfig); streams.cleanUp(); //don't do this in prod as it clears your state stores - final CountDownLatch startLatch = new CountDownLatch(1); - streams.setStateListener((newState, oldState) -> { - if (newState == State.RUNNING && oldState != KafkaStreams.State.RUNNING) { - startLatch.countDown(); - } - }); streams.start(); - try { - if (!startLatch.await(60, TimeUnit.SECONDS)) { - throw new RuntimeException("Streams never finished rebalancing on startup"); - } - } catch (final InterruptedException e) { - Thread.currentThread().interrupt(); - } - log.info("Started Service " + getClass().getSimpleName()); } diff --git a/microservices-orders/exercises/InventoryService.java b/microservices-orders/exercises/InventoryService.java index 4fbe710b8..6b577094b 100644 --- a/microservices-orders/exercises/InventoryService.java +++ b/microservices-orders/exercises/InventoryService.java @@ -3,15 +3,12 @@ import java.io.IOException; import java.util.Optional; import java.util.Properties; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; import io.confluent.examples.streams.microservices.domain.Schemas; import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; import org.apache.commons.cli.*; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.KafkaStreams.State; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.kstream.Consumed; @@ -64,22 +61,9 @@ public void start(final String bootstrapServers, final Properties defaultConfig) { streams = processStreams(bootstrapServers, stateDir, defaultConfig); streams.cleanUp(); //don't do this in prod as it clears your state stores - final CountDownLatch startLatch = new CountDownLatch(1); - streams.setStateListener((newState, oldState) -> { - if (newState == State.RUNNING && oldState != KafkaStreams.State.RUNNING) { - startLatch.countDown(); - } - }); streams.start(); - try { - if (!startLatch.await(60, TimeUnit.SECONDS)) { - throw new RuntimeException("Streams never finished rebalancing on startup"); - } - } catch (final InterruptedException e) { - Thread.currentThread().interrupt(); - } log.info("Started Service " + getClass().getSimpleName()); } diff --git a/microservices-orders/exercises/OrdersService.java b/microservices-orders/exercises/OrdersService.java index d1a5aafab..8cbe729a0 100644 --- a/microservices-orders/exercises/OrdersService.java +++ b/microservices-orders/exercises/OrdersService.java @@ -2,8 +2,6 @@ import java.io.IOException; import java.util.Optional; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; import org.apache.commons.cli.*; @@ -16,7 +14,6 @@ import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.kstream.Consumed; -import org.apache.kafka.streams.KafkaStreams.State; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Predicate; @@ -342,22 +339,9 @@ private KafkaStreams startKStreams(final String bootstrapServers, config(bootstrapServers, defaultConfig)); metadataService = new MetadataService(streams); streams.cleanUp(); //don't do this in prod as it clears your state stores - final CountDownLatch startLatch = new CountDownLatch(1); - streams.setStateListener((newState, oldState) -> { - if (newState == State.RUNNING && oldState != KafkaStreams.State.RUNNING) { - startLatch.countDown(); - } - }); streams.start(); - try { - if (!startLatch.await(60, TimeUnit.SECONDS)) { - throw new RuntimeException("Streams never finished rebalancing on startup"); - } - } catch (final InterruptedException e) { - Thread.currentThread().interrupt(); - } return streams; } diff --git a/microservices-orders/exercises/ValidationsAggregatorService.java b/microservices-orders/exercises/ValidationsAggregatorService.java index 80e8398cb..94345b432 100644 --- a/microservices-orders/exercises/ValidationsAggregatorService.java +++ b/microservices-orders/exercises/ValidationsAggregatorService.java @@ -32,8 +32,6 @@ import java.time.Duration; import java.util.Optional; import java.util.Properties; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; /** @@ -66,26 +64,12 @@ public class ValidationsAggregatorService implements Service { public void start(final String bootstrapServers, final String stateDir, final Properties defaultConfig) { - final CountDownLatch startLatch = new CountDownLatch(1); streams = aggregateOrderValidations(bootstrapServers, stateDir, defaultConfig); streams.cleanUp(); //don't do this in prod as it clears your state stores - streams.setStateListener((newState, oldState) -> { - if (newState == KafkaStreams.State.RUNNING && oldState != KafkaStreams.State.RUNNING) { - startLatch.countDown(); - } - }); streams.start(); - try { - if (!startLatch.await(60, TimeUnit.SECONDS)) { - throw new RuntimeException("Streams never finished rebalancing on startup"); - } - } catch (final InterruptedException e) { - Thread.currentThread().interrupt(); - } - log.info("Started Service " + getClass().getSimpleName()); }