From 98c17bfd69a7c3983129c8d85b4f89e4624a3b79 Mon Sep 17 00:00:00 2001 From: Sebastian Just Date: Sat, 2 Apr 2016 13:38:41 -0400 Subject: [PATCH] Adding an ElementCollector to create batches to process elements --- .../sungard/dataflow/ElementCollector.java | 56 +++++++++++++++++++ 1 file changed, 56 insertions(+) create mode 100644 src/main/java/com/sungard/dataflow/ElementCollector.java diff --git a/src/main/java/com/sungard/dataflow/ElementCollector.java b/src/main/java/com/sungard/dataflow/ElementCollector.java new file mode 100644 index 0000000..3b00291 --- /dev/null +++ b/src/main/java/com/sungard/dataflow/ElementCollector.java @@ -0,0 +1,56 @@ +package com.sungard.dataflow; + +import java.util.ArrayList; +import java.util.List; + +import com.google.cloud.dataflow.sdk.transforms.Aggregator; +import com.google.cloud.dataflow.sdk.transforms.DoFn; +import com.google.cloud.dataflow.sdk.transforms.Sum; + +public class ElementCollector extends DoFn> { + private static final long serialVersionUID = 1L; + + public static interface ElementCollectorOptions { + int getBatchSize(); + void setBatchSize(int batchSize); + } + + private final Aggregator collectedElements; + private final Aggregator flushes; + + private final int batchSize; + private ArrayList buffer; + + public ElementCollector(ElementCollectorOptions options) { + this.batchSize = options.getBatchSize(); + + this.collectedElements = createAggregator("Collected elements", new Sum.SumIntegerFn()); + this.flushes = createAggregator("Buffer flushes", new Sum.SumIntegerFn()); + } + + @Override + public void startBundle(Context context) { + buffer = new ArrayList<>(batchSize); + } + + @Override + public void processElement(ProcessContext context) { + collectedElements.addValue(1); + buffer.add(context.element()); + + if (buffer.size() >= batchSize) { + flushBuffer(context); + } + } + + @Override + public void finishBundle(Context context) { + flushBuffer(context); + } + + private void flushBuffer(Context context) { + flushes.addValue(1); + context.output(buffer); + buffer.clear(); + } +}