Skip to content
Leroux Romain edited this page Jun 5, 2019 · 31 revisions

Introduction

We have talked a lot about data on the Streamroot tech blog, and for good reason. Powering millions of video sessions online every day, we have a lot of material to work with. Delivering video content - especially with hybrid CDN and peer-to-peer systems - is a highly complex undertaking that requires us to continuously collect and process feedback events to gauge its performance and make sure everything is working properly.

As we discussed in previous posts, Streamroot uses its data pipeline to AB test new features, to understand the effects of external factors on our delivery solution, and to improve the overall quality of our products. It is also used to expose certain traffic data to our customers via our graphic user interface, which is what I’d like to talk about today.

Put simply, we send feedback metrics as json payloads to a Kafka broker from where various consumers process those metrics for different purposes. One of our critical consumers is responsible for feeding our customer-facing time series database InfluxDB. This database is accessed by our customer dashboard, which displays a variety of peer-to-peer delivery and quality of service data for the broadcaster’s streams, platforms, ISPs, etc. In this post I'd like to focus on this critical consumer whose job is to pre-aggregate time series data (hence its name, the aggregator) so that InfluxDB isn't overloaded and can respond smoothly.

This aggregator is actually a Flink job that:

  • Reads data from Kafka
  • Groups data by some key of interest (for instance a stream id)
  • Aggregates metrics for each key (for instance counting unique users on a given stream)
  • Does all that on time windows of 5 minutes
  • Sends the results to InfluxDB

Flink has really good built-in windowing and aggregation support so most steps weren't too difficult to implement. However, the last step - that is, sending results to InfluxDB - was tricky to get right as many things can go wrong. For instance, InfluxDB can be down, a Flink machine can fail, etc. And since the resulting data will be exposed to our customers we needed to have strong guarantees such as exactly-once processing and no data loss.

To ensure that stateful computations are resilient, Flink provides a checkpoint based-mechanism. At regular intervals, it snapshots the states of the stateful components of a given job and saves it to an external backend (for instance files on a distributed storage such as HDFS). As an end user, what we have to do is essentially implement interfaces for our custom components and Flink will do the rest!

In this post we'll implement a fully resilient InfluxDB sink (Flink's abstraction for outputting data); we'll start with a bare-bones implementation and improve it gradually. All of the code is available on Github and the tests can be run locally through a docker-compose setup that is also provided (it will spin up Flink and InfluxDB).

Basic implementation - InfluxSinkV1

First of all, let's write a simple implementation of a sink writing data points to InfluxDB.

public class InfluxSinkV1 extends RichSinkFunction<Point> {

    private static final String RETENTION_POLICY = "";

    private transient InfluxDB influx;

    private final String connUrl;
    private final String user;
    private final String password;
    private final String database;

    public InfluxSinkV1(String connUrl, String user, String password, String database) {
        this.connUrl = connUrl;
        this.user = user;
        this.password = password;
        this.database = database;
    }

    @Override
    public void open(Configuration parameters) {
        influx = InfluxDBFactory.connect(connUrl, user, password);
    }

    @Override
    public void invoke(Point point, Context context) {
        influx.write(database, RETENTION_POLICY, point);
    }

    @Override
    public void close() {
        influx.close();
    }
}

A sink class must implement SinkFunction<IN>; however, here we choose to extend RichSinkFunction<IN>. The reason for this is that the rich version provides a public void open(...) method that can be used to initialize any kind of complex/non-serializable state. In this case, the transient InfluxDB client is not Serializable which is why we have to create it through this open(...) method.

The main sink method invoke(...) is quite simple: it takes a Point and uses the influx client to send it.

Let's check that everything is working correctly with a quick unit test:

@Test
public void testWritingData() {
    // create an InfluxSink
    InfluxSinkV1 sink = new InfluxSinkV1("http://localhost:" + INFLUX_PORT, USER, PASSWORD, DB_DATA);
    sink.open(null); // initialize without any special config

    // create a data point and send it through the sink
    sink.invoke(Point.measurement("telemetry")
            .time(Instant.now().toEpochMilli(), TimeUnit.MILLISECONDS)
            .addField("temperature", 42)
            .tag("location", "Paris")
            .build(), null);

    // query influxdb to select all telemetry data
    QueryResult res = influx.query(new Query("SELECT * FROM telemetry", DB_DATA));
    QueryResult.Series series = res.getResults()
            .get(0)
            .getSeries()
            .stream()
            .filter(s -> s.getName().equals("telemetry"))
            .findFirst()
            .orElseThrow(() -> new AssertionError("No telemetry series"));

    Map<String, Object> data = zip(series.getColumns(), series.getValues().get(0));

    // check that the data point is existing and correct
    assertEquals("Paris", data.get("location"));
    assertEquals(42.0, data.get("temperature"));

    sink.close();
}

Assuming that you have docker-compose running, as mentioned in the introduction, you can run this test with:

mvn clean test -Dtest=InfluxSinkV1Test

...
[INFO] Running io.streamroot.InfluxSinkV1Test
[INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.58 s - in io.streamroot.InfluxSinkV1Test
[INFO] 
[INFO] Results:
[INFO] 
[INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0
[INFO] 
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  4.244 s
[INFO] Finished at: 2019-04-15T16:43:49+02:00
[INFO] ------------------------------------------------------------------------

Everything looks good! However, there's an issue. Writing data points one by one, every time that invoke(...) is called is quite inefficient for InfluxDB. So let's see what we can do about that.

Adding batching support - InfluxSinkV2

It turns out that the influx client has built-in support for batching! To use it all we need to do is to call enableBatch(...) as follows:

@Override
public void open(Configuration parameters) {
    influx = InfluxDBFactory.connect(connUrl, user, password);
    influx.enableBatch(BatchOptions.DEFAULTS.flushDuration(100)); // flush batch every 100 ms
}

As for the unit test, we need to change it a bit to take into account the batching duration:

// ... same as before ...

Thread.sleep(150); // before querying influx, wait 100ms for flushDuration + 50ms for safety
QueryResult res = influx.query(new Query("SELECT * FROM telemetry", DB_DATA));

// ... same as before ...

You can test it with:

mvn clean test -Dtest=InfluxSinkV2Test

But is it really that simple to handle batching properly? What happens if a failure occurs between two batches? It's very likely that data points being buffered will get lost as Flink has no idea that we are buffering them.

So let's see how we can handle a stateful sink properly with Flink.

Fully resilient implementation - InfluxSinkV3

To achieve resiliency we need to have the following mechanisms in place:

  1. Explicitly handle buffered points (leverage Flink's checkpoints)
  2. Control batches of data points sent to InfluxDB
  3. Retry in case of failure (stop processing until recovery succeeds)

Explicitly handle buffered points

Flink's documentation mentions managed operator states for handling this situation. This mechanism requires us to implement CheckpointedFunction to leverage Flink's checkpoints and thus be able to snapshot/recover the data points that are being buffered and batch sent.

Here are the two new methods that we will have to implement:

/**
 * This method is called when a snapshot for a checkpoint is requested. This acts as a hook to the function to
 * ensure that all state is exposed by means previously offered through {@link FunctionInitializationContext} when
 * the Function was initialized, or offered now by {@link FunctionSnapshotContext} itself.
 *
 * @param context the context for drawing a snapshot of the operator
 * @throws Exception
 */
void snapshotState(FunctionSnapshotContext context) throws Exception;

/**
 * This method is called when the parallel function instance is created during distributed
 * execution. Functions typically set up their state storing data structures in this method.
 *
 * @param context the context for initializing the operator
 * @throws Exception
 */
void initializeState(FunctionInitializationContext context) throws Exception;

So we have to buffer points within our sink implementation. Let's do that with a simple List<Point>:

// Let's buffer data points ourselves now (instead of relying on the influx client built-in mechanism)
private final List<Point> bufferedPoints = new ArrayList<>();
private final int batchSize;

// Let's also add an internal checkpointedState that will be used by Flink
private transient ListState<Point> checkpointedState;
private final String descriptorId;

In this new version our sink's main method invoke(...) is now simply buffering points up to batchSize:

@Override
public void invoke(Point point, Context context) throws Exception {
    bufferedPoints.add(point);
    if (bufferedPoints.size() == batchSize) {
        // This will be detailed just after,
        // but as the name suggests it sends data points to InfluxDB
        batchWrite(bufferedPoints); 
        bufferedPoints.clear();
    }
}

Let's implement snapshotState(...) and initializeState(...) to respectively backup and restore bufferedPoints through checkpointedState:

@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
    checkpointedState.clear();
    for (Point point : bufferedPoints) {
        checkpointedState.add(point);
    }
}

@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
    ListStateDescriptor<Point> descriptor = new ListStateDescriptor<>(descriptorId, TypeInformation.of(Point.class));
    checkpointedState = context.getOperatorStateStore().getListState(descriptor);
    if (context.isRestored()) {
        for (Point point : checkpointedState.get()) {
            bufferedPoints.add(point);
        }
    }
}

Control batches of data points sent to InfluxDB

Since we stopped relying on the influx client's built-in batching feature, we will have to roll our own. Luckily we can leverage OkHttpClient, on top of which influx client is built, to achieve batching.

To this end, we'll start by defining a custom InfluxBatchService as follows:

// A helper for http requests
private static final MediaType MEDIA_TYPE_STRING = MediaType.parse("text/plain");

// Our new batching service
private transient InfluxBatchService influx;

@Override
public void open(Configuration parameters) {
    // Create our batching service here
    influx = makeBatchService(connUrl);
}

// Batching service creation using OkHttpClient
private static InfluxBatchService makeBatchService(String url) {
    return new Retrofit.Builder()
            .baseUrl(url)
            .client(new OkHttpClient.Builder().build())
            .addConverterFactory(MoshiConverterFactory.create())
            .build()
            .create(InfluxBatchService.class);
}

// Batching service definition, this is the InfluxDB http requests format
public interface InfluxBatchService {
    String U = "u";
    String P = "p";
    String DB = "db";
    String RP = "rp";
    String PRECISION = "precision";
    String CONSISTENCY = "consistency";

    @POST("/write")
    Call<ResponseBody> writePoints(
            @Query(U) String username,
            @Query(P) String password,
            @Query(DB) String database,
            @Query(RP) String retentionPolicy,
            @Query(PRECISION) String precision,
            @Query(CONSISTENCY) String consistency,
            @Body RequestBody batchPoints);
}

Now let's come back to the mysterious batchWrite(bufferedPoints) that we skipped earlier in the new invoke(...) method and let's see how it uses our new InfluxBatchService:

private Response<ResponseBody> batchWrite(Iterable<Point> points) throws IOException {
    return influx.writePoints(
                user, password, database,
                RETENTION_POLICY,
                TimeUtil.toTimePrecision(TimeUnit.NANOSECONDS),
                InfluxDB.ConsistencyLevel.ONE.value(),
                RequestBody.create(
                        MEDIA_TYPE_STRING,
                        StreamSupport.stream(points.spliterator(), false)
                                .map(Point::lineProtocol)
                                .collect(Collectors.joining("\n"))))
                .execute();
}

It simply iterates through data points and call lineProtocol to format them to the InfluxDB protocol before batch sending them with a POST request.

There has been a lot of changes since InfluxSinkV1 already, but are we done? Not yet! With our new batching approach we're flushing data points only when our buffer is full. What if we buffer points that are the result of 10-minute windowed aggregations and that our buffer isn't full? Well, they won't be flushed until the next 10-minute window ... Not very nice.

So let's add a final piece to our custom batching implementation: a time limit. The idea is that we want to flush our buffered point as soon as one of the two following conditions is met: either the batchSize is reached, or batchFreqMs has elapsed.

To achieve that we can make use of a SingleThreadScheduledExecutor that will regularly flush the buffered points. To make sure that everything is thread-safe we can simply add synchronized to all methods interacting with bufferedPoints.

private transient ScheduledExecutorService scheduledExec;

@Override
public void open(Configuration parameters) {
    influx = makeBatchService(connUrl);

    // Simple flushPoints at batchFreqMs interval
    scheduledExec = Executors.newSingleThreadScheduledExecutor();
    scheduledExec.scheduleAtFixedRate(this::flushPoints, 0, batchFreqMs, TimeUnit.MILLISECONDS);
}

Let's not forget to clean up the scheduled executor when the sink is being stopped:

@Override
public void close() {
    boolean isStopped = false;
    try {
        isStopped = scheduledExec.awaitTermination(batchFreqMs, TimeUnit.MILLISECONDS);
    } catch (Throwable e) {
        // slurp timeout
    } finally {
        if(!isStopped) {
            scheduledExec.shutdownNow();
        }
    }
}

With this new mechanism, invoke(...) will also have to make use of flushPoints():

@Override
public synchronized void invoke(Point point, Context context) {
    bufferedPoints.add(point);
    if (bufferedPoints.size() == batchSize) {
        flushPoints(); // delegates flushing
    }
}

And now flushPoints() is responsible for triggering batchWrite(bufferedPoints) and clearing the buffer:

private synchronized void flushPoints()  {
    if (bufferedPoints.size() > 0) {
        try {
            batchWrite(bufferedPoints); // May throw IOException ...
            bufferedPoints.clear();
        } catch (IOException e) {
            LOG.error("We will handle that correctly in the next section ...");
        }
    }
}

We finally have a complete batching mechanism in place! But as you can see from the last line in flushPoints(), we're not doing much when an exception occurs... This brings us to our last feature: proper error handling and retries.

Retry in case of failure

The last piece of this InfluxSinkV3 is the retry mechanism that will make use of the Failsafe library. What we want is to stop processing when an InfluxDB related error occurs, and have a retry policy to help recover the situation.

Let's define a retry policy that will just keep on retrying to send the current batch of data at regular intervals:

private transient RetryPolicy<Response<ResponseBody>> retryPolicy;
private final int retryFreqMs;
private final AtomicLong retrying = new AtomicLong(0);

@Override
public void open(Configuration parameters) {
    // ... same as before ...

    retryPolicy = new RetryPolicy<Response<ResponseBody>>()
            .withMaxRetries(-1) // inifinite retries
            .handle(IOException.class)
            .handleResultIf((Response<ResponseBody> r) -> {
                if (!r.isSuccessful()) {
                    String errMessage = "";
                    try (ResponseBody errorBody = r.errorBody()) {
                        if (null != errorBody) errMessage = errorBody.string();
                    } catch (IOException e) {
                        LOG.error("Couldn't read response errorBody: ", e.getMessage());
                    }
                    LOG.error("Error {} from Influx: {}", r.code(), errMessage);
                    return true; // will retry
                } else {
                    return false; // don't retry
                }
            })
            .withDelay(Duration.ofMillis(retryFreqMs));
}

Now we wrap our call to batchWrite(bufferedPoints) with a Failsafe.with(retryPolicy) call. It will handle IOExceptions and errors returned by InfluxDB by retrying infinitely.

private synchronized void flushPoints()  {
    if (bufferedPoints.size() > 0) {
        retrying.getAndSet(0);
        Failsafe.with(retryPolicy).get(batchWrite(bufferedPoints)); 
        if (retrying.get() > 1) {
            LOG.info("Batch successfully recovered");
        }
        bufferedPoints.clear();
    }
}

Last but not least, let's modify batchWrite(...) so that it can be used by Failsafe functions. Let's also keep track of the number of retries.

private CheckedSupplier<Response<ResponseBody>> batchWrite(Iterable<Point> points) {
    // Wraps the call to influx.writePoints(...) in a CheckedSupplier for Failsafe
    return () -> {
        // Keep track of the number of retries
        long retryNb = retrying.getAndIncrement();
        if (retryNb > 0) {
            LOG.warn("Retrying batch (" + retryNb + ")");
        }
        return influx.writePoints(
                user, password, database,
                RETENTION_POLICY,
                TimeUtil.toTimePrecision(TimeUnit.NANOSECONDS),
                InfluxDB.ConsistencyLevel.ONE.value(),
                RequestBody.create(
                        MEDIA_TYPE_STRING,
                        StreamSupport.stream(points.spliterator(), false)
                                .map(Point::lineProtocol)
                                .collect(Collectors.joining("\n"))))
                .execute();
    };
}

And at last we're all done! For completeness sake here is the final version of InfluxSinkV3.

Testing resiliency

Let's summarize all the features we implemented:

  • Back up and restore state with bufferedPoints / checkpointedState
  • Custom InfluxBatchService with configurable flushing of batchSize / batchFreqMs
  • Error handling and retries with Failsafe

Our features look good; now we'd like to verify them with some unit tests. Let's at least check that our retry mechanism is working well.

For that we'll use a library called NetCrusher that can simulate a network failure. Let's modify our test setup so as to proxy connections to our local InfluxDB through a TcpCrusher so that we can stop and restart TCP traffic at will.

@BeforeClass
public static void setup() throws Exception {
    influx = makeInfluxConn();
    initInflux(influx);

    reactor = new NioReactor(10); // tick im ms
    tcpCrusher = TcpCrusherBuilder.builder()
            .withReactor(reactor)
            .withBindAddress("localhost", PROXIED_PORT)
            .withConnectAddress("localhost", INFLUX_PORT)
            .buildAndOpen();
}

Our new resiliency test looks like this:

@Test
public void testResilience() throws Exception {
    // Now connect to InfluxDB through the TcpCrusher proxy
    InfluxSinkV3 sink = new InfluxSinkV3("http://localhost:" + PROXIED_PORT, USER, PASSWORD, DB_DATA);
    sink.open(null);

    // stop proxying to influx
    tcpCrusher.freeze();

    // try writing point
    Instant now = Instant.now();
    sink.invoke(Point.measurement("monitoring")
            .time(now.toEpochMilli(), TimeUnit.MILLISECONDS)
            .addField("warnings", 12)
            .tag("datacenter", "abc")
            .build(), null);
    Thread.sleep(150); // wait for batch to complete (a bit more than batchFreqMs)

    // simulate influx/network failure
    tcpCrusher.close();
    Thread.sleep(300);
    tcpCrusher.open();

    Thread.sleep(200); // wait for retry to complete (a bit more than batchFreqMs + retryFreqMs)
    QueryResult res = influx.query(new Query("SELECT * FROM monitoring", DB_DATA));
    QueryResult.Series series = res.getResults()
            .get(0)
            .getSeries()
            .stream()
            .filter(s -> s.getName().equals("monitoring"))
            .findFirst()
            .orElseThrow(() -> new AssertionError("No monitoring series"));

    Map<String, Object> data = zip(series.getColumns(), series.getValues().get(0));

    assertEquals("abc", data.get("datacenter"));
    assertEquals(12.0, data.get("warnings"));
    assertEquals(now, Instant.parse((String) data.get("time")));

    sink.close();
}

Again assuming that you have docker-compose running, you can run this test with:

mvn clean test -Dtest=InfluxSinkV3Test

The test should pass and we can see in the output that it has retried many times before successfully sending the current batch:

[INFO] Running io.streamroot.InfluxSinkV3Test
18:36:19,286 INFO  org.netcrusher.tcp.TcpCrusher   - TcpCrusher <localhost/127.0.0.1:8085>-<localhost/127.0.0.1:8086> is open
18:36:19,784 INFO  org.netcrusher.tcp.TcpCrusher   - TcpCrusher <localhost/127.0.0.1:8085>-<localhost/127.0.0.1:8086> is closed
18:36:19,811 WARN  io.streamroot.InfluxSinkV3      - Retrying batch (1)
18:36:19,840 WARN  io.streamroot.InfluxSinkV3      - Retrying batch (2)
18:36:19,869 WARN  io.streamroot.InfluxSinkV3      - Retrying batch (3)
18:36:19,902 WARN  io.streamroot.InfluxSinkV3      - Retrying batch (4)
18:36:19,934 WARN  io.streamroot.InfluxSinkV3      - Retrying batch (5)
18:36:19,966 WARN  io.streamroot.InfluxSinkV3      - Retrying batch (6)
18:36:19,994 WARN  io.streamroot.InfluxSinkV3      - Retrying batch (7)
18:36:20,024 WARN  io.streamroot.InfluxSinkV3      - Retrying batch (8)
18:36:20,054 WARN  io.streamroot.InfluxSinkV3      - Retrying batch (9)
18:36:20,081 WARN  io.streamroot.InfluxSinkV3      - Retrying batch (10)
18:36:20,085 INFO  org.netcrusher.tcp.TcpCrusher   - TcpCrusher <localhost/127.0.0.1:8085>-<localhost/127.0.0.1:8086> is open
18:36:20,110 WARN  io.streamroot.InfluxSinkV3      - Retrying batch (11)
18:36:20,199 INFO  io.streamroot.InfluxSinkV3      - Batch successfully recovered
18:36:20,431 INFO  org.netcrusher.tcp.TcpCrusher   - TcpCrusher <localhost/127.0.0.1:8085>-<localhost/127.0.0.1:8086> is closed

Improving usability with a builder - InfluxSinkV4

We ended up with a lot of parameters for our InfluxSink which make it a bit cumbersome to use. A simple solution for that is to use a builder to set up all parameters in a clean and fluent fashion.

public static class Builder implements Serializable {
    private String descriptorId;
    private String connUrl;
    private String user;
    private String password;
    private String database;
    private int batchSize;
    private int batchFreqMs;
    private int retryFreqMs;

    private Builder() {}

    public Builder connUrl(String connUrl) {
        this.connUrl = connUrl;
        return this;
    }

    // Similar methods setting some field and returning this

    public InfluxSinkV4 build() {
        return new InfluxSinkV4(this);
    }
}

// Factory method for the Builder
public static Builder builder() {
    return new Builder();
}

// Now InfluxSinkV4 contructor is private and uses a Builder
private InfluxSinkV4(Builder b) {
    this.connUrl = b.connUrl;
    this.user = b.user;
    this.password = b.password;
    this.database = b.database;
    this.descriptorId = b.descriptorId;
    this.batchSize = b.batchSize;
    this.batchFreqMs = b.batchFreqMs;
    this.retryFreqMs = b.retryFreqMs;
}

Let's see it in action with a complete Flink job that will:

  • Read and parse integer data points from a network socket
  • Sum data over 10-second windows
  • Send the results to a local InfluxDB (running in docker)

Complete instructions to run this job can be found on Github.

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.socketTextStream("netcat", Integer.parseInt(args[0]))
    .map(Ints::tryParse)
    .filter(Objects::nonNull)
    .timeWindowAll(Time.seconds(10))
    .reduce((d1, d2) -> d1 + d2,
            (AllWindowFunction<Integer, Tuple2<Long, Integer>, TimeWindow>)
            (window, res, out) -> out.collect(
                    Tuple2.of(window.getEnd(), res.iterator().next())))
    .filter(Objects::nonNull)
    .map(d -> Point.measurement("records")
            .time(d.f0, TimeUnit.MILLISECONDS) // d.f0 = window.getEnd()
            .addField("sum", d.f1)             // d.f1 = sum(data) within 10 sec
            .tag("window", "10-sec")
            .build())
    .addSink(InfluxSinkV4.builder()
            .descriptorId("influx-sink")
            .connUrl("http://influxdb:8086")
            .user("root")
            .password("root")
            .database("data")
            .batchSize(100)      // batch send every 100 points
            .batchFreqMs(1000)   // or when 1 second has elapsed
            .retryFreqMs(5000)
            .build())
    .setParallelism(1);

Conclusion

It's been quite a journey from the naive InfluxSinkV1 to the fully resilient InfluxSinkV4 but it was worth it! We've addressed efficiency issues and stateful computation pitfalls when working with Flink and InfluxDB:

  • Batch write data points to InfluxDB
  • Regularly back up buffered points with Flink's checkpoints
  • Handle errors with a fine-tuned retry policy

These mechanisms and the resulting exactly-once processing guarantees have helped us build reliable and scalable data pipelines that give our customers valuable insights about their video traffic. I hope the local demo application highlighted it and that you can use this as a basis for your own Flink applications. Stay tuned for more insights on how we fine tune our data pipeline, and if you are interested in working on these types of challenges, drop us a line at [email protected].