Skip to content
Alan Cassar edited this page Apr 24, 2015 · 5 revisions

FIFO Queue Connector

A connector the provides a FIFO queue based on object store. The data is stored in the configured object store. Hence if the object store is persistent, and the application is interrupted, when restarted the data will re-appear on the queue.

Motivation

Why another FIFO queue when Mule provides the VM connector out of the box? This connector works brilliantly well. No dispute there, but the VM connector does not retain its FIFO feature when used on CloudHub with persistency turned on.

Some use cases, such as processing update messages, need to processed in order. Otherwise an older update might overwrite a newer update. If the flow is asynchronous, and message loss is not acceptable, this is not achievable on CloudHub unless some external transport is used.

Technical

The configuration of the connector is as follows. It requires an object store where the queue data is stored. In this example, we use the default object store that is provided with Mule.

<fifo-queue:config name="fifoQueueConnector" objectStore-ref="_defaultUserObjectStore" />

The queue is thread safe and uses no locks to achieve thread safety.

Features

Put

As the title suggests, put simple puts the payload on the configured queue.

<fifo-queue:put queue="queue1" config-ref="fifoQueueConnector"/>

Take

Returns and removes the first message of the queue.

<fifo-queue:take queue="queue1" config-ref="fifoQueueConnector"/>

Peek

Simply returns the first message of the queue, but does NOT remove it. One perfect use case of peek is when it is required to assert that a message is fully processed before it is removed from the queue. The flow can start with a peek, which returns the message. The message can be processed, and when done, perform a take. If an exception happens, simple never take the message. For this use case to work however, the flow has to be single threaded.

<fifo-queue:peek queue="queue1" config-ref="fifoQueueConnector"/>

Mark Error & Resolve Error

A queue can be marked as being problematic. When a queue is marked as error, then take and peek will never return any message, but put still adds messages to the queue.

This covers uses cases that allows human intervention to fix errors before message continue to be processed. For example, if 1 message threw an exception, the queue can be marked as error. Human intervention can occur to fix the error. Once the error is fixed, the queue can be marked back as resolved and normal processing will resume.

<fifo-queue:mark-error queue="queue1" config-ref="fifoQueueConnector"/>
<fifo-queue:resolve-error queue="queue1" config-ref="fifoQueueConnector"/>

Status

Returns the status of the queue. True for OK, false for error.

<fifo-queue:status queue="queue1" config-ref="fifoQueueConnector"/>

Peek Listener

Inbound endpoint that peeks the message. Returns the head of the queue but does not remove it from the queue.

<fifo-queue:peek-listener queue="queue1" config-ref="fifoQueueConnector"/>

Take Listener

Inbound endpoint that takes the message. Returns and removes the head of the queue.

<fifo-queue:take-listener queue="queue2" config-ref="fifoQueueConnector"/>

Peek All Listener

Peek inbound endpoint for ALL queues. Any message that was put on any of the queues configured with this connector, will trigger this listener. The result is a Map, key will be the name of the queue, the value will be the head of each queue.

<fifo-queue:peek-all-listener config-ref="fifoQueueConnector" />

Take All Listener

Take inbound endpoint for ALL queues.Any message that was put on any of the queues configured with this connector, will trigger this listener. The result is a Map, key will be the name of the queue, the value will be the head of each queue.

<fifo-queue:take-all-listener config-ref="fifoQueueConnector" />

Drain

Take all elements of a particular queue. The result is a list of messages in the same order as the messages were put on the queue.

<fifo-queue:drain queue="queue1" config-ref="fifoQueueConnector" />

Drain All

Take all elements of all queues. The result is a Map of Lists. Each list will be all elements of 1 queue in the same order as the messages were put on the queue. The key of the Map is the name of the queue.

<fifo-queue:drain-all />