-
Notifications
You must be signed in to change notification settings - Fork 2
Home
About Streamroot.
Problem at hand / Data pipeline WHY
Raw data -> group by something (stream, isp, ...) -> aggregate over 5-minute windows
Data pipeline HOW.
Detail Flink mechanisms for resilient sinks.
Detail code organisation (available on github, maven deps, testable with local docker compose).
First of all let's write a simple implementation of a sink writing data point to InfluxDB.
public class InfluxSinkV1 extends RichSinkFunction<Point> {
private static final String RETENTION_POLICY = "";
private transient InfluxDB influx;
private final String connUrl;
private final String user;
private final String password;
private final String database;
public InfluxSinkV1(String connUrl, String user, String password, String database) {
this.connUrl = connUrl;
this.user = user;
this.password = password;
this.database = database;
}
@Override
public void open(Configuration parameters) {
influx = InfluxDBFactory.connect(connUrl, user, password);
}
@Override
public void invoke(Point point, Context context) {
influx.write(database, RETENTION_POLICY, point);
}
@Override
public void close() {
influx.close();
}
}
A sink class must implement SinkFunction<IN>
, however here we choose to extend RichSinkFunction<IN>
,
the reason is that the rich version provides a public void open(...)
method that can be used to initialize
any kind of complex/non-serializable state. In this case the transient InfluxDB
client is not Serializable
which is why we have to create it through this open
method.
The main sink method invoke
is quite simple, it takes a Point
and uses influx
client to send it.
Let's check that everything is alright with a quick unit test:
@Test
public void testWritingData() {
// create an InfluxSink
InfluxSinkV1 sink = new InfluxSinkV1("http://localhost:" + INFLUX_PORT, USER, PASSWORD, DB_DATA);
sink.open(null); // initialize without any special config
// create a data point and send it through the sink
sink.invoke(Point.measurement("telemetry")
.time(Instant.now().toEpochMilli(), TimeUnit.MILLISECONDS)
.addField("temperature", 42)
.tag("location", "Paris")
.build(), null);
// query influxdb to select all telemetry data
QueryResult res = influx.query(new Query("SELECT * FROM telemetry", DB_DATA));
QueryResult.Series series = res.getResults()
.get(0)
.getSeries()
.stream()
.filter(s -> s.getName().equals("telemetry"))
.findFirst()
.orElseThrow(() -> new AssertionError("No telemetry series"));
Map<String, Object> data = zip(series.getColumns(), series.getValues().get(0));
// check that the data point is existing and correct
assertEquals("Paris", data.get("location"));
assertEquals(42.0, data.get("temperature"));
sink.close();
}
Assuming that you have docker-compose
running, as mentioned in the introduction, run this test with:
mvn clean test -Dtest=InfluxSinkV1Test
...
[INFO] Running io.streamroot.InfluxSinkV1Test
[INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.58 s - in io.streamroot.InfluxSinkV1Test
[INFO]
[INFO] Results:
[INFO]
[INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 4.244 s
[INFO] Finished at: 2019-04-15T16:43:49+02:00
[INFO] ------------------------------------------------------------------------
Everything looks good! However there's an issue, writing data points one by one, at each call of invoke
, is quite
inefficient for InfluxDB so let's see what we can do about that.