This application simulates a simple indexing application to show the Flusswerk framework in action.
The data processing logic is split in three components:
The Reader
receives a message from RabbitMQ and loads the corresponding document (as instance of Document
).
@Component
public class Reader implements Function<IndexMessage, Document> {
@Override
public Document apply(IndexMessage indexMessage) {
Document document;
try {
document = loadDocument(indexMessage.getItemId());
} catch (IOException exception) {
throw new StopProcessingException(
"Could not load document for id {}", indexMessage.getItemId())
.causedBy(exception);
}
return document;
}
private Document loadDocument(String itemId) throws IOException {
// ...
}
}
The Transformer
then takes the document and builds the required data for the Indexing API (an IndexDocument
):
@Component
public class Transformer implements Function<Document, IndexDocument> {
@Override
public IndexDocument apply(Document document) {
IndexDocument indexDocument = new IndexDocument();
// ...
return indexDocument;
}
}
The Writer
finally takes the processed data, writes it to the Indexing API and sends messages to notify the next processing application.
@Component
public class Writer implements Function<IndexDocument, Message> {
private static final Logger LOGGER = LoggerFactory.getLogger(Writer.class);
@Override
public Message apply(IndexDocument indexDocument) {
String id = (String) indexDocument.get("id");
try {
sendToSearchService(indexDocument);
} catch (Exception exception) {
throw new RetryProcessingException(
"Could not index document for id %s, will try again later", id)
.causedBy(exception);
}
return new RefreshWebsiteMessage(id, "search");
}
}
To try yourself, get the repository and RabbitMQ-Server:
$ git clone https://github.com/dbmdz/flusswerk-example.git
$ cd flusswerk-example
$ docker-compose up
Then start the flusswerk-example
Application from your IDE and open the RabbitMQ-Management UI at http://localhost:15672 (Login in as guest
/guest
).
Drop the following message into the queue search.index
:
{ "itemId": "42", "tracingId": "12345" }
In the queue search.publish
, you will find the outgoing message send by the Writer
:
{
"envelope":{},
"tracingId":"12345",
"itemId":"42",
"source":"search"
}
The field envelope
contains Flusswerk specific metadata and is usually only used by the Framework itself.