Skip to content

Latest commit

 

History

History
360 lines (284 loc) · 17.7 KB

getting-started.adoc

File metadata and controls

360 lines (284 loc) · 17.7 KB

Getting Started Decaton

Let’s start from the most basic usage of Decaton client/processor. Through this guide we use Gradle as a build system and Protocol Buffers for defining tasks, but you can choose arbitrary alternatives as you want.

Defining Task Schema

In this guide we’ll use Protocol Buffers since it’s our usual choice. While Decaton provides support for Protocol Buffers, you can choose arbitrary serialization format as you want. All you need is to implement Serializer and Deserializer and supply it to where Decaton API requires.

To make use of Protocol Buffers, you need to configure your build to compile Java classes from protobuf IDLs during build. We also apply gradle plugin for making shadow jar for ease of executing, but it’s completely optional in real projects.

build.gradle
plugins {
    id 'idea'
    id 'com.google.protobuf' version '0.8.18'
    id 'com.github.johnrengelman.shadow' version '7.1.1'
}

dependencies {
    implementation "com.google.protobuf:protobuf-java:$PROTOBUF_VERSION"
    implementation "org.apache.kafka:kafka-clients:$KAFKA_VERSION"
}

protobuf {
    protoc {
        artifact = "com.google.protobuf:protoc:$PROTOBUF_VERSION"
    }
}

idea {
    module {
        generatedSourceDirs += file('build/generated/source/proto/main/java')
    }
}

Now your build is ready to compile .proto files under src/main/proto. Next is to add .proto file containing task definigion - here just one very simple task - PrintMessageTask.

mytasks.proto
syntax = "proto3";
package com.linecorp.decaton.example.protocol;

message PrintMessageTask {
    string name = 1;
    int32 age = 2;
}

Implement the task producer

Add following required dependencies. decaton-protobuf is optional since it’s just a support module providing few convenient classes for serde-ing Protocol Buffers.

build.gradle
implementation "com.linecorp.decaton:decaton-common:$DECATON_VERSION"
implementation "com.linecorp.decaton:decaton-client:$DECATON_VERSION"
implementation "com.linecorp.decaton:decaton-protobuf:$DECATON_VERSION"

You can use DecatonClient to produce tasks into Kafka topic. It’s just a wrapper around kafka’s Producer interface, providing convenient methods to construct Kafka records in protocol expected by Decaton. DecatonClient should be created just once for whole application lifecycle, and it requires to call its #close for making sure that all tasks are flushed out of buffer.

ProducerMain.java
public final class ProducerMain {
    public static void main(String[] args) throws Exception {
        try (DecatonClient<PrintMessageTask> client = newClient()) {
            String name = args[0];
            int age = Integer.parseInt(args[1]);
            PrintMessageTask task = PrintMessageTask.newBuilder().setName(name).setAge(age).build();
            CompletableFuture<PutTaskResult> result = client.put(name, task); // (1)

            // Synchronously wait the result
            result.join();
            // Asynchronously observe the result
            result.whenComplete((r, e) -> {
                System.err.println("Producing task failed... " + e);
            });
        }
    }

    private static DecatonClient<PrintMessageTask> newClient() {
        Properties producerConfig = new Properties();
        producerConfig.setProperty(ProducerConfig.CLIENT_ID_CONFIG, "my-decaton-client");
        producerConfig.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                                   System.getProperty("bootstrap.servers"));

        return DecatonClient.producing("my-decaton-topic",
                                       new ProtocolBuffersSerializer<PrintMessageTask>())
                            .applicationId("MyApp") // (2)
                            // By default it sets local hostname but here we go explicit
                            .instanceId("localhost")
                            .producerConfig(producerConfig)
                            .build();
    }
}
  1. Use name as key as well. Key decides which partition does the task (record) to be routed to.

  2. applicationId is used to distinguish which application does the task comes from. Just select arbitrary unique identifier.

Implement the processor

Add following required dependencies additionally.

build.gradle
implementation "com.linecorp.decaton:decaton-processor:$DECATON_VERSION"

Implementing Decaton processor involves two steps. First, you need to make implementation for DecatonProcessor where its generics type T is the class of your task.

PrintMessageTaskProcessor.java
public class PrintMessageTaskProcessor implements DecatonProcessor<PrintMessageTask> {
    @Override
    public void process(ProcessingContext<PrintMessageTask> context, PrintMessageTask task) throws InterruptedException {
        System.out.printf("Noticed %s is %d years old\n", task.getName(), task.getAge());
    }
}

Second, you need to instantiate ProcessorSubscription which represents the running instance of Decaton processor. In below example we’re running it just 10 seconds and closing subscription to demonstrate. In realistic subscription gets terminated along with application’s termination.

ProcessorMain.java
public final class ProcessorMain {
    public static void main(String[] args) throws Exception {
        Properties consumerConfig = new Properties();
        consumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "my-decaton-processor");
        consumerConfig.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                                   System.getProperty("bootstrap.servers"));
        consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-decaton-processor");

        ProcessorSubscription subscription =
                SubscriptionBuilder.newBuilder("my-decaton-processor") // (1)
                                   .processorsBuilder(
                                           ProcessorsBuilder.consuming(
                                                   "my-decaton-topic",
                                                   new ProtocolBuffersDeserializer<>(PrintMessageTask.parser()))
                                                            .thenProcess(new PrintMessageTaskProcessor())
                                   )
                                   .consumerConfig(consumerConfig)
                                   .buildAndStart();

        Thread.sleep(10000);
        subscription.close();
    }
}
  1. subscriptionId is Decaton’s internal-use only ID for metric labeling, logging and so on as of current version.

Test it

Preparing topic

Before we run the processor and producer, we have to prepare Kafka topic my-decaton-topic. Here we created it with 3 partitions and 3 replicas which is enough to demonstrate our example.

Running processor and producer

Now everything are ready to test it. Let’s try running processor, put one task and see what happens.

$ ./gradlew shadowJar

$ java -cp build/libs/example-*-all.jar -Dbootstrap.servers=$KAFKA_BOOTSTRAP_SERVERS example.ProcessorMain &

$ java -cp build/libs/example-*-all.jar -Dbootstrap.servers=$KAFKA_BOOTSTRAP_SERVERS example.ProducerMain motoko 25
Put task succeeded: my-decaton-topic-1-5

Noticed motoko is 25 years old

Worked :) Decaton processor running got a task from producer through Kafka topic and processed it - print message. Now we got a working example of Decaton application from producer to processor. Although we continue a bit more to demonstrate Decaton’s one of the most important capability - concurrent processing of tasks.

Higher processing concurrency for higher throughput

Decaton supports concurrent processing of tasks in one partition. The reason we need this is described in README so here I just go through to show how is it effective.

Simulating high-throughput, high-latency IO processing

To simulate processing that involves IO with external system, we prepare another processor implementation PrintMessageTaskProcessor2. It simulates blocking behavior of IO by sleeping tens of few milliseconds in processing task.

PrintMessageTaskProcessor2.java
public class PrintMessageTaskProcessor2 implements DecatonProcessor<PrintMessageTask> {
    @Override
    public void process(ProcessingContext<PrintMessageTask> context, PrintMessageTask task) throws InterruptedException {
        long deliveryLatencyMs = System.currentTimeMillis() - context.metadata().timestampMillis();
        simulateSlowIO();
        System.out.printf("Task for %s delivered in %d ms\n", task.getName(), deliveryLatencyMs);
    }

    private static void simulateSlowIO() throws InterruptedException {
        Thread.sleep(30);
    }
}

We also change ProcessorMain a bit, to configure Decaton how many threads to use for processing one partition.

ProcessorMain2.java
        int partitionConcurrency = Integer.parseInt(System.getProperty("concurrency"));
        ProcessorSubscription subscription =
                SubscriptionBuilder.newBuilder("my-decaton-processor")
                                   .processorsBuilder(
                                           ProcessorsBuilder.consuming(
                                                   "my-decaton-topic",
                                                   new ProtocolBuffersDeserializer<>(PrintMessageTask.parser()))
                                                            .thenProcess(new PrintMessageTaskProcessor2())
                                   )
                                   .consumerConfig(consumerConfig)
                                   .properties(
                                           StaticPropertySupplier.of(
                                                   Property.ofStatic(
                                                           ProcessorProperties.CONFIG_PARTITION_CONCURRENCY, // (1)
                                                           partitionConcurrency),
                                                   Property.ofStatic(
                                                           ProcessorProperties.CONFIG_MAX_PENDING_RECORDS, // (2)
                                                           100)))
                                   .buildAndStart();
  1. Use CONFIG_PARTITION_CONCURRENCY to set number of threads to use for processing one partition. If it’s set to 10 and an instance got 3 partitions assigned, there will 30 threads to be created.

  2. It’s a good idea to set CONFIG_MAX_PENDING_RECORDS that configures how many offset do you wanna allow it to forward without waiting current lowest offset to have completed. More pending records potentially consumes more memory and makes amount of re-procssed records huge when fail-over occurs, but it reduces possibility of your processor got stuck by tasks taking outlying processing latency.

In above example we set just few properties. You can visit ProcessorProperties to view the list of customizable properties.

We also make ProducerMain to produce many generated tasks to follow realistic workload.

BatchProducerMain.java
public final class BatchProducerMain {
    public static void main(String[] args) throws Exception {
        try (DecatonClient<PrintMessageTask> client = newClient()) {
            for (int i = 0; i < 100; i++) {
                String name = "name:" + i;
                PrintMessageTask task = PrintMessageTask.newBuilder().setName(name).setAge(i).build();
                client.put(name, task)
                      .whenComplete((r, e) -> {
                          if (e != null) {
                              System.err.println("Producing task failed... " + e);
                          }
                      });
            }
        }
    }
...

First we run it with setting partition concurrency to 1 which is the default.

$ java -cp build/libs/example-*-all.jar -Dbootstrap.servers=$KAFKA_BOOTSTRAP_SERVERS -Dconcurrency=1 example.ProcessorMain2

$ java -cp build/libs/example-*-all.jar -Dbootstrap.servers=$KAFKA_BOOTSTRAP_SERVERS example.BatchProducerMain

Task for name:1 delivered in 37 ms
Task for name:0 delivered in 199 ms
Task for name:3 delivered in 41 ms
...
Task for name:95 delivered in 1287 ms
Task for name:96 delivered in 1322 ms

As you can see, milliseconds of processing latency accumulates and finally causing those tasks which are processed later to get over 1000 ms of latency before it gets delivered to processing logic. This is natural because Kafka’s topic-partition is a queue and by default records in partition are processed sequentially. Hence preceding tasks’s processing latency applies to following tasks’s delivery latency. This also impacts processing throughput negatively because of it gets capped by high latency in processing time, making machine resource idle while awaiting IO response from external systems.

The point here is actually those tasks are having different keys. If what we care is just about to preserve processing order and sequentiality based on their keys, we should be able to process them in parallel.

Increasing processing concurrency

So next we try to increase concurrency to 20, means it uses 20 threads to process tasks coming from one partition.

$ java -cp build/libs/example-*-all.jar -Dbootstrap.servers=$KAFKA_BOOTSTRAP_SERVERS -Dconcurrency=20 example.ProcessorMain2

$ java -cp build/libs/example-*-all.jar -Dbootstrap.servers=$KAFKA_BOOTSTRAP_SERVERS example.BatchProducerMain

Task for name:10 delivered in 41 ms
Task for name:36 delivered in 37 ms
...
Task for name:84 delivered in 160 ms
Task for name:89 delivered in 183 ms

As you can see, variance of delivery latencies between first two and last two are pretty much shorter than when we were using just 1 thread for processing them. Of course this contributes to higher throughput as well.

Note

Even though Decaton processes tasks from one partition concurrently, it preserves ordering guarantee based on their keys. As long as two tasks sharing the same key, they’re guaranteed to be processed in-order and sequentially.

Caution
Do not find any interpretation in absolute latencies those shown in above output. The most part of it is from network latency between my laptop and Kafka servers I used. In reality it could be much shorter or longer depending on various parameters.

Asynchronous processing completion

Decaton processor API supports asynchronous completion of tasks too. Task "completion" is Decaton-specific concept to express that "task is ready for committing its record offset". By default Decaton considers the task is "completed" when DecatonProcessor#process method returns for the given topic. However you can defer this timing by manually declaring that you wanna defer the completion of this task.

This is useful particularly when you’re using some middleware’s async-client which has its own queue and callback support for telling the result of middleware request.

One good example is Kafka’s Producer client. It supports async callback for supplying the result of production. Below is the example of processor which leverages Decaton’s "defer completion" system for completing task asynchronously.

public class PrintMessageTaskProcessorAsync implements DecatonProcessor<PrintMessageTask> {
    Producer<String, String> producer;

    @Override
    public void process(ProcessingContext<PrintMessageTask> context, PrintMessageTask task) throws InterruptedException {
        Completion completion = context.deferCompletion(); // (1)
        producer.send(new ProducerRecord<>("next-topic", "Hello" + task.getName()),
                      (metadata, exception) -> completion.complete());
    }
}
  1. By calling ProcessingContext#deferCompletion, your code will take full responsibility of calling its #complete exactly. If your code misses it by any reason (called completion leak), soon your processor stucks and stops consuming further records.

By leveraging Decaton’s deferred completion and async-client of your middleware which multiplexes IO with servers, it would works efficiently to lead higher throughput and you will likely need to give less CONFIG_PARTITION_CONCURRENCY.

Where to go from here

Now you know the basics and ready to start implementing Decaton apps!

If you’re attempting to consume existing topic which contains records in schema other than Decaton’s task protocol, or maybe you want to use task schema that can be understandable even for non-decaton consumers. In case visit Consuming Arbitrary Topic to see how.

For those thinking to run Decaton on production, Monitoring might helps to always ensure your Decaton processors doing good.

If you’re using Spring for running your applications, you might wanna take a look at Spring Integration.

Besides its main functionality, Decaton offers a lot of features made out of actual requirement for building services. Go back to Index and find the list.