Skip to content

Commit

Permalink
Adding an ElementCollector to create batches to process elements
Browse files Browse the repository at this point in the history
  • Loading branch information
Sebastian Just authored and Sebastian Just committed Apr 2, 2016
1 parent 7f834a2 commit 98c17bf
Showing 1 changed file with 56 additions and 0 deletions.
56 changes: 56 additions & 0 deletions src/main/java/com/sungard/dataflow/ElementCollector.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package com.sungard.dataflow;

import java.util.ArrayList;
import java.util.List;

import com.google.cloud.dataflow.sdk.transforms.Aggregator;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.transforms.Sum;

public class ElementCollector<T> extends DoFn<T, List<T>> {
private static final long serialVersionUID = 1L;

public static interface ElementCollectorOptions {
int getBatchSize();
void setBatchSize(int batchSize);
}

private final Aggregator<Integer, Integer> collectedElements;
private final Aggregator<Integer, Integer> flushes;

private final int batchSize;
private ArrayList<T> buffer;

public ElementCollector(ElementCollectorOptions options) {
this.batchSize = options.getBatchSize();

this.collectedElements = createAggregator("Collected elements", new Sum.SumIntegerFn());
this.flushes = createAggregator("Buffer flushes", new Sum.SumIntegerFn());
}

@Override
public void startBundle(Context context) {
buffer = new ArrayList<>(batchSize);
}

@Override
public void processElement(ProcessContext context) {
collectedElements.addValue(1);
buffer.add(context.element());

if (buffer.size() >= batchSize) {
flushBuffer(context);
}
}

@Override
public void finishBundle(Context context) {
flushBuffer(context);
}

private void flushBuffer(Context context) {
flushes.addValue(1);
context.output(buffer);
buffer.clear();
}
}

0 comments on commit 98c17bf

Please sign in to comment.