Message Queue Integration Library for Java
- RabbitMQ
- JMS based Message Brokers
In order to use the library, include the following in the pom.xml
of your project:
<dependencies>
<dependency>
<groupId>com.github.libgraviton</groupId>
<artifactId>messaging</artifactId>
<version>LATEST</version>
</dependency>
</dependencies>
Make sure that version
points to the newest release on maven central (see badge above).
To publish messages you need an instance of QueueConnection
, which represents the connection to the Message Queue.
Once you have a QueueConnection
, you can simply do the following to publish a message:
QueueConnection connection = new RabbitMqConnection.Builder().queueName("your-queue").build();
try {
connection.publish("the message");
} catch (CannotPublishMessage e) {
// Message publishment failed for some reason.
fail(String.format("An exception occurred: '%s'", e.getClass().getName()));
}
To consume messages you need an instance of QueueConnection
, which represents the connection to the Message Queue.
Once you have a QueueConnection
, you can simply do the following to consume a message:
Consumer consumer = new Consumer() {
@Override
public void consume(String messageId, String message) throws CannotConsumeMessage {
System.out.println(String.format("Received message with id '%s': '%s'", messageId, message));
}
};
QueueConnection connection = new RabbitMqConnection.Builder().queueName("your-queue").build();
try {
connection.consume(consumer);
} catch (CannotRegisterConsumer e) {
// Consumer registration failed for some reason.
fail(String.format("An exception occurred: '%s'", e.getClass().getName()));
}
In this case, each message gets automatically acknowledged. If you want to handle message acknowledgment yourself, you need to register an AcknowledgingConsumer
:
Consumer consumer = new AcknowledgingConsumer() {
private MessageAcknowledger acknowledger;
@Override
public void setAcknowledger(MessageAcknowledger acknowledger) {
this.acknowledger = acknowledger;
}
@Override
public void consume(String messageId, String message) throws CannotConsumeMessage {
System.out.println(String.format("Received message with id '%s': '%s'", messageId, message));
try {
acknowledger.acknowledge(messageId);
} catch (CannotAcknowledgeMessage e) {
// Message Acknowledgment failed for some reason
throw new CannotConsumeMessage(messageId, message, e);
}
}
};
QueueConnection connection = new RabbitMqConnection.Builder().queueName("your-queue").build();
try {
connection.consume(consumer);
} catch (CannotRegisterConsumer e) {
// Consumer registration failed for some reason.
fail(String.format("An exception occurred: '%s'", e.getClass().getName()));
}