Flusswerk makes it easy to create multithreaded workers for read-transform-write chains (aka ETL jobs). Workflows are coordinated via RabbitMQ, so it's easy to create chains of independent workflow jobs (each a new Java Application).
Maven:
<dependency>
<groupId>com.github.dbmdz</groupId>
<artifactId>framework</artifactId>
<version>4.0.0</version>
</dependency>
Gradle:
dependencies {
compile group: 'de.digitalcollections.flusswerk', name: 'dc-flusswerk-engine', version: '2.2.1'
}
Required libraries are Jackson and RabbitMQ Java API, the minimal Java version is 8 (will move to 9 in a few months). The Flusswerk engine itself never requires Spring, but there are examples how to integrate the engine in a Spring Boot Application.
Starting with version 2.0.0, the interface for flows sending many messages has been simplified. The writer now can use the message class generics directly:
class Writer implements java.util.function.Function<T, Collection<? extends Message>> {
@Override
public Collection<? extends Message> apply(T value) {
// ...
}
}
gets now
class Writer implements java.util.function.Function<T, Collection<Message>> {
@Override
public Collection<Message> apply(T value) {
// ...
}
}
If you have an local RabbitMQ instance with default configurations up and running, creating your first worker is as easy as
class Application {
public static void main(String[] args) {
MessageBroker messageBroker = new MessageBrokerBuilder()
.readFrom("your.input.queue")
.writeTo("your.output.queue")
.build();
Flow flow = new FlowBuilder<DefaultMessage, String, String>()
.read(message -> message.get("value"))
.transform(String::toUpperCase)
.writeAndSend(value -> new DefaultMessage("your.id").put("value", value))
.build();
Engine engine = new Engine(messageBroker, flow);
engine.start();
}
}
For more complex read, transform or write operations it is recommended to implement these as classes. Please keep in mind that these classes should not keep any state (or do so in a thread-safe way), as they are used from several worker threads at the same time:
class Reader implements Function<DefaultMessage, String> {
String apply(DefaultMessage message) {
return message.get("value");
}
}
class Transformer implements Function<String, String> {
String apply(String input) {
return input.toUpperCase();
}
}
class Writer implements Function<String, Message> {
Message apply(String output) {
return new DefaultMessage("your.id").put("value", output);
}
}
class Application {
public static void main(String[] args) {
MessageBroker messageBroker = new MessageBrokerBuilder()
.readFrom("your.input.queue")
.writeTo("your.output.queue")
.build();
Flow flow = new FlowBuilder<DefaultMessage, String, String>()
.read(new Reader())
.transform(new Transformer())
.writeAndSend(new Writer())
.build();
Engine engine = new Engine(messageBroker, flow);
engine.start();
}
}
Depending if you want to want to send one message, multiple messages or no message at all, the FlowBuilder has suitable API methods:
flowBuilder.write(Consumer<T>)
processes values of typeT
, but does not send messages returned by the writer.flowBuilder.writeAndSend(Function<T, Message>)
processes values of typeT
, and sends the message returned by the writer to the default output queue.flowBuilder.writeAndSendMany(Function<T, List<Message>>)
processes values of typeT
, and sends all messages in the list returned by the writer to the default output queue.
It is always possible to use MessageBroker.send("some.queue", Message)
anywhere to manually send messages to arbitrary queues.
The Writer always sends a message to the defined output queue, which satisfies most use cases. For more complex workflows the MessageBroker
can be used to send messages to any queue you like:
class Writer implements Function<String, Message> {
private final MessageBroker messageBroker;
public Writer(MessageBroker messageBroker) {
this.messageBroker = requireNonNull(messageBroker);
}
public Message apply(String value) {
// ...
// Notify other workflow jobs
messageBroker.send("ocr", new DefaultMessage("1000001"));
messageBroker.send("iiif", new DefaultMessage("1000001"));
messageBroker.send("import", new DefaultMessage("1000001"));
// ...
}
}
If you want to perform cleanups after processing of the message, e.g. for triggering a garbage collection, you can use the
cleanup()
method of the FlowBuilder:
class Application {
public static void main(String[] args) {
MessageBroker messageBroker = new MessageBrokerBuilder()
.readFrom("your.input.queue")
.writeTo("your.output.queue")
.build();
Flow flow = new FlowBuilder<DefaultMessage, String, String>()
.read(new Reader())
.transform(new Transformer())
.writeAndSend(new Writer())
.cleanup(() -> Runtime.getRuntime().gc())
.build();
Engine engine = new Engine(messageBroker, flow);
engine.start();
}
}
If incoming and outgoing message classes implement HasFlowId
, automated propagation of flow ids is available. If the option flowBuilder.propagateFlowId(true)
is set, Flusswerk copies flow ids from incoming to all outgoing messages.
It is recommended to implement your own message class to add type safety, expressiveness and easy to understand algorithms:
public class ExampleMessage extends FlusswerkMessage<Integer> {
private int priority;
private String[] tags;
public PowerMessage(Integer id, int priority, String... tags) {
this.priority = priority;
this.tags = requireNonNullElseGet(tags, () -> new String[]{});
}
public int getPriority() {
return priority;
}
public String[] getTags() {
return tags;
}
@Override
public String toString() {
return String.format("ExampleMessage{%d, %d, %s}", id, priority, Arrays.toString(tags));
}
}
The custom message implementation needs to be registered with the MessageBrokerBuilder:
class Application {
public static void main(String[] args) {
MessageBroker messageBroker = new MessageBrokerBuilder()
.useMessageClass(ExampleMessage.class)
.build();
/* ... */
}
}
For a more fine grained serialization control, a custom Jackson mixin is also possible:
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonInclude(Include.NON_EMPTY)
public interface ExampleMessageMixin {}
class Application {
public static void main(String[] args) {
MessageBroker messageBroker = new MessageBrokerBuilder()
.useMessageClass(ExampleMessage.class, ExampleMessageMixin.class)
.build();
/* ... */
}
}
By default, the message statuses (success, temporarily failed, finally failed) are just logged by the Logger of DefaultProcessReport
. If you want to customize this (e.g. for writing structured logging messages with only specific information), you can provide a custom implementation of the ProcessReport
interface and pass this to the Engine:
class Application {
public static void main(String[] args) {
//...
Engine engine = new Engine(messageBroker, flow, new MyProcessReport());
engine.start();
}
}
As default every input queue gets a retry and a failed queue with the same name as the input queue with suffixes .failure
and .retry
. Every message is retried 5 times and then moved to the failed queue.
To customize this behaviour one can set FailurePolicies
:
class Application {
public static void main(String[] args) {
MessageBroker messageBroker = new MessageBrokerBuilder()
.failurePolicy(new FailurePolicies("inputQueue", "retryRoutingKey", "failureRoutingKey", 42))
.build();
/* ... */
}
}
If messages should not be retried, set retryRoutingKey
to null
. If permanently failing messages should be discarded, set failureRoutingKey
to null
.
If your read/transform/write implementations, you can control the failure handling by failing a message temporarily or finally.
If a message shall fail finally, just throw an FinallyFailedProcessException
; if you want to fail temporarily and schedule a retry, throw a RetriableProcessException
.
If you want to process more than one message at the same time, you can customize, how many threads the engine uses:
class Application {
public static void main(String[] args) {
int concurrentWorkers = 42;
//...
Engine engine = new Engine(messageBroker, flow, concurrentWorkers);
engine.start();
//...
}
}
To collect metrics for a flow, one can register a Consumer<FlowStatus>
with the FlowBuilder:
Flow flow = new FlowBuilder<DefaultMessage, String, String>()
// ...
.measure(metrics)
.build();
If you use Spring Boot, there is a complete support for Micrometer Metrics
@Configuration
class FlusswerkConfig {
@Bean
public Flow<Message, String, String> flow(
Reader reader,
Transformer transformer,
Writer writer,
Metrics metrics) {
return FlowBuilder.with(Message.class, String.class, String.class)
// reader, transformer, writer,...
.measure(metrics)
.build();
}
}
To implement your own Metrics Bean, either use
class Metrics implements Consumer<FlowStatus> {
public void accept(FlowStatus flowInfo) {
// ...
}
}
or subclass Flusswerk BaseMetrics-Bean:
@Component
public class Metrics extends BaseMetrics {
private final Counter buzz;
public Metrics(MeterFactory meterFactory) {
super(meterFactory);
// your definitions here
this.buzz = meterFactory.counter("buzz");
}
@Override
public void accept(FlowStatus flowInfo) {
super.accept(flowInfo);
// do something with flowInfo (you don't have to
// override this method if you don't need flowInfo)
// ...
}
public void buzz() {
this.buzz.increment();
}
}
Property | Meaning |
---|---|
connectTo(String connectionStr) |
RabbitMQ host name and port, separated by a colon (default localhost:5672 ). Can also be a list, separated by comma |
username(String username) |
Username for authentication (default guest ) |
password(String password) |
Password for authentication (default guest ) |
virtualHost(String virtualHost) |
RabbitMQ virtual host |
Everything will be created if it does not exist:
Property | default | Meaning |
---|---|---|
readFrom(String inputQueue) |
- | Queue to read incoming messages from |
writeTo(String outputRoutingKey) |
- | Queue to write outgoing messages to |
addFailurePolicy(FailurePolicy policy) |
behaviour as in section Failure Policies | Policy for retrying messages |
exchange(String exchange) |
workflow |
RabbitMQ exchange for routing messages |
deadLetterExchange(String deadLetterExchange) |
workflow.retry |
RabbitMQ dead letter exchange to reroute failed messages |
maxRetries(int number) |
5 |
The number of retries until a message is routed to the failed queue |
Property | Meaning |
---|---|
jacksonModules(Module... modules) |
A list of Jackson Modules to configure arbitrary Mappings |
messageMapping(Class<? extends Message> messageClass, Class<?> messageMixin) |
Set a custom message type and its Jackson Mapping |