Note
|
Produce messages to and consume messages from [Confluent Cloud](https://www.confluent.io/confluent-cloud/?utm_source=github&utm_medium=demo&utm_campaign=ch.examples_type.community_content.clients-ccloud) using the Kotlin version of Java Producer and Consumer, and Kafka Streams API. |
-
Java 1.8 or higher to run the demo application
-
Access to a [Confluent Cloud](https://www.confluent.io/confluent-cloud/?utm_source=github&utm_medium=demo&utm_campaign=ch.examples_type.community_content.clients-ccloud) cluster
-
Initialize a properties file at
$HOME/.ccloud/config
with configuration to your Confluent Cloud cluster:
$ cat $HOME/.ccloud/config
bootstrap.servers=<BROKER ENDPOINT>
ssl.endpoint.identification.algorithm=https
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username\="<API KEY>" password\="<API SECRET>";
In this example, the producer writes Kafka data to a topic in Confluent Cloud.
Each record has a key representing a username (e.g. alice
) and a value of a count, formatted as json (e.g. {"count": 0}
).
The consumer reads the same topic from Confluent Cloud and keeps a rolling sum of the counts as it processes each record.
The Kafka Streams API reads the same topic from Confluent Cloud and does a stateful sum aggregation, also a rolling sum of the counts as it processes each record.
-
Run the producer, passing in arguments for (a) the local file with configuration parameters to connect to your Confluent Cloud instance and (b) the topic name:
# Build the client examples $ ./gradlew clean build # Run the producer $ ./gradlew runApp -PmainClass="io.confluent.examples.clients.cloud.ProducerExample" \ -PconfigPath="$HOME/.ccloud/config" \ -Ptopic="test1"
You should see:
... Producing record: alice {"count":0} Producing record: alice {"count":1} Producing record: alice {"count":2} Producing record: alice {"count":3} Producing record: alice {"count":4} Producing record: alice {"count":5} Producing record: alice {"count":6} Producing record: alice {"count":7} Producing record: alice {"count":8} Producing record: alice {"count":9} Produced record to topic test1 partition [0] @ offset 0 Produced record to topic test1 partition [0] @ offset 1 Produced record to topic test1 partition [0] @ offset 2 Produced record to topic test1 partition [0] @ offset 3 Produced record to topic test1 partition [0] @ offset 4 Produced record to topic test1 partition [0] @ offset 5 Produced record to topic test1 partition [0] @ offset 6 Produced record to topic test1 partition [0] @ offset 7 Produced record to topic test1 partition [0] @ offset 8 Produced record to topic test1 partition [0] @ offset 9 10 messages were produced to topic test1 ...
-
Run the consumer, passing in arguments for (a) the local file with configuration parameters to connect to your Confluent Cloud instance and (b) the same topic name as used above. Verify that the consumer received all the messages:
# Build the client examples $ ./gradlew clean build # Run the consumer $ ./gradlew runApp -PmainClass="io.confluent.examples.clients.cloud.ConsumerExample"\ -PconfigPath="$HOME/.ccloud/config"\ -Ptopic="test1"
You should see:
... Consumed record with key alice and value {"count":0}, and updated total count to 0 Consumed record with key alice and value {"count":1}, and updated total count to 1 Consumed record with key alice and value {"count":2}, and updated total count to 3 Consumed record with key alice and value {"count":3}, and updated total count to 6 Consumed record with key alice and value {"count":4}, and updated total count to 10 Consumed record with key alice and value {"count":5}, and updated total count to 15 Consumed record with key alice and value {"count":6}, and updated total count to 21 Consumed record with key alice and value {"count":7}, and updated total count to 28 Consumed record with key alice and value {"count":8}, and updated total count to 36 Consumed record with key alice and value {"count":9}, and updated total count to 45
When you are done, press
<ctrl>-c
. -
Run the Kafka Streams application, , passing in arguments for (a) the local file with configuration parameters to connect to your Confluent Cloud instance and (b) the same topic name as used above. Verify that the consumer received all the messages:
```shell # Build the client examples $ ./gradlew clean build # Run the consumer $ ./gradlew runApp -PmainClass="io.confluent.examples.clients.cloud.StreamsExample" \ -PconfigPath="$HOME/.ccloud/config" \ -Ptopic="test1" ```
You should see:
... [Consumed record]: alice, 0 [Consumed record]: alice, 1 [Consumed record]: alice, 2 [Consumed record]: alice, 3 [Consumed record]: alice, 4 [Consumed record]: alice, 5 [Consumed record]: alice, 6 [Consumed record]: alice, 7 [Consumed record]: alice, 8 [Consumed record]: alice, 9 ... [Running count]: alice, 0 [Running count]: alice, 1 [Running count]: alice, 3 [Running count]: alice, 6 [Running count]: alice, 10 [Running count]: alice, 15 [Running count]: alice, 21 [Running count]: alice, 28 [Running count]: alice, 36 [Running count]: alice, 45 ...
When you are done, press `<ctrl>-c`.