At Celonis, we utilize a service-oriented architecture where RabbitMQ plays an important role for asynchronous communication between services. Messages can be expressed as a command about what the receiving services need to do or an event about what has happened with a service that a receiver is interested in. In most cases, an event is a result of the command that a publisher has sent before. Since most of the services at Celonis are implemented as Spring Boot applications we use Spring AMQP which makes it easier to publish and consume messages, for example by taking care of serialization and deserialization. While working with messaging we experienced two issues that needed to be addressed:
Supporting multiple types of messages within a queue
Breaking changes in the message structure.
Let us have a look at these two issues and how RabbitHandler has helped us to overcome them.
To make it clear, there is no restriction on RabbitMQ side or Spring AMQP to have a message with multiple types in a single RabbitMQ queue. However, the challenge is dealing with multiple types in a clean and maintainable way. Let us have a look couple of examples:
@RabbitListener(queues = "operator_queue")
public void receiveMessage(@Payload OperatorMessage message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
if (message instanceOf CancelMessage) {
// do cancel
} else if (message instanceOf CleanupMessage)
// do cleanup
} else if (message instanceOf StartProcessMessage){
// start processing
}
// ack message if acknowledgment is manual
}
In this example, the "operator queue" contains commands describing an operation that the receiver needs to do: starting a process, canceling a process, or doing a cleanup. To facilitate maintenance, we decided to use a single queue for these operations by having a base class "OperatorMessage" which contains a number of fields common to all sub-types. For serialization/deserialization Jackson is used which supports inheritance with the @JsonSubTypes annotation. The RabbitListener can recognize the base class and with an underlying type attribute, the message can be deserialized to the correct class. Another alternative is to have a type field as enum and do a switch statement.
@RabbitListener(queues = "operator_queue")
public void receiveMessage(@Payload OperatorMessage message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
var type = message.getType()
switch(type) {
case CANCEL:
// do cancel
break;
case CLEANUP:
// do cleanup
break;
case START:
// start processing
break;
}
// ack message if acknowledgment is manual
}
Even though the above code snippets serve as examples an engineer might be thinking: "what if I want to introduce a new type or what if we forgot the break statement"? Can this code be described as maintainable? As engineers, we always seek to improve our code base and look for better ways to implement it. In this case, we can leverage the RabbitHandler annotation provided by Spring AMQP to simplify our code. Here is the result:
@RabbitListener(queues = "operator_queue")
public class OperatorQueueListener {
@RabbitHandler
public void receiveCancelMessage(@Payload CancelMessage message) {
// do cancel
}
@RabbitHandler
public void receiveCleanupMessage(@Payload CleanupMessage message) {
// do cleanup
}
@RabbitHandler
public void receiveStartMessage(@Payload StartProcessMessage message) {
// start processing
}
}
RabbitHandler gives us the capability to define methods based on the type without requiring us to do a type check and casting. A listener with RabbitHandler can deserialize and forward to the correct method as long as the method signature is unique and corresponds to the single type. For a new type, only a new method with a unique signature needs to be defined. The same signature rules apply as @RabbitListener is used on the method level. Compared to the previous two examples the code with RabbitHandler is much more readable and maintainable.
The message structure inevitably needs to be extended to support new features. Having an additional field on the message payload is not a problem. However, in some cases, the old message structure might not satisfy the new requirements so we're forced to change it. Here is an example case: A service is responsible for exporting data to the given location in a file system when consuming a command message. Export Command Message
{
"exportLocation": "/usr/share/export" // type string
}
Later, a new requirement comes up to enable the service to export files to an AWS S3 bucket, and as part of the design decision the location should be expressed as "java.lang.URI" type. This would mean the message structure needs a change. Let us look at the two approaches.
A non-breaking can be done by introducing a new field with the type URI and marking the old field as deprecated. Export Command Message with Non-Breaking Change
{
"exportLocation": "/usr/share/export", // type string deprecated
"exportLocationURI": "s3://region/export" // type java.lang.URI
}
The consumer needs to handle the incoming message by processing the new field. However, in this stage, the old field will stay null, and removing it will require to do the deployment in multiple stages: one for adding the new field and one for removing the old field once we make sure all consumers are updated. In case of a fanout or topic exchange, this would require coordination of consumer updates.
The other option is to create a new message as V2 with exportLocation having URI type. Export Command Message V2
{
"exportLocation": "s3://region/export" // type java.lang.URI
}
With the help of RabbitHandler, we can send this message to the same queue as the previous version. In this case, the listener code would look like this:
@RabbitListener(queues = "queue")
public class Listener {
@RabbitHandler
public void receive(@Payload ExportMessage message) {
// do v1 logic
}
@RabbitHandler
public void receiveURI(@Payload ExportMessageV2 message) {
// do v2 logic
}
}
The code above works automatically because Spring AMQP transfers the type information from the consumer to the provider. This is achieved under the hood by adding a custom header "_TypeId_" whose value is the fully qualified name of the class. Thus, even if you use the same class name for the evolution of the Message Type, it will be able to discriminate if they are in different packages.
RabbitHandler from Spring AMQP is very powerful and enables us to implement consumers capable of handling queues with multiple types and allows for the seamless evolution of messages without sacrificing readability and maintainability.