Patterns with Spring and RabbitMQ Hero Image

Patterns with Spring and RabbitMQ Part 1: RabbitHandler and its Usages

At Celonis, we utilize a service-oriented architecture where RabbitMQ plays an important role for asynchronous communication between services. Messages can be expressed as a command about what the receiving services need to do or an event about what has happened with a service that a receiver is interested in. In most cases, an event is a result of the command that a publisher has sent before. Since most of the services at Celonis are implemented as Spring Boot applications we use Spring AMQP which makes it easier to publish and consume messages, for example by taking care of serialization and deserialization. While working with messaging we experienced two issues that needed to be addressed:

  1. Supporting multiple types of messages within a queue

  2. Breaking changes in the message structure.

Let us have a look at these two issues and how RabbitHandler has helped us to overcome them.

Supporting Multiple Types within a queue

To make it clear, there is no restriction on RabbitMQ side or Spring AMQP to have a message with multiple types in a single RabbitMQ queue. However, the challenge is dealing with multiple types in a clean and maintainable way. Let us have a look couple of examples:

        @RabbitListener(queues = "operator_queue")
    public void receiveMessage(@Payload OperatorMessage message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
        if (message instanceOf CancelMessage) {
            // do cancel
        } else if (message instanceOf CleanupMessage)
            // do cleanup
        } else if (message instanceOf StartProcessMessage){
            // start processing
        }
        // ack message if acknowledgment is manual    
    }
  

In this example, the "operator queue" contains commands describing an operation that the receiver needs to do: starting a process, canceling a process, or doing a cleanup. To facilitate maintenance, we decided to use a single queue for these operations by having a base class "OperatorMessage" which contains a number of fields common to all sub-types. For serialization/deserialization Jackson is used which supports inheritance with the @JsonSubTypes annotation. The RabbitListener can recognize the base class and with an underlying type attribute, the message can be deserialized to the correct class. Another alternative is to have a type field as enum and do a switch statement.

        @RabbitListener(queues = "operator_queue")
    public void receiveMessage(@Payload OperatorMessage message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
        var type = message.getType()
        switch(type) {
            case CANCEL:
               // do cancel
               break;
            case CLEANUP:
              // do cleanup
              break;
            case START:
              // start processing
              break;
        }
        // ack message if acknowledgment is manual    
    }
  

Even though the above code snippets serve as examples an engineer might be thinking: "what if I want to introduce a new type or what if we forgot the break statement"? Can this code be described as maintainable? As engineers, we always seek to improve our code base and look for better ways to implement it. In this case, we can leverage the RabbitHandler annotation provided by Spring AMQP to simplify our code. Here is the result:

        @RabbitListener(queues = "operator_queue")
    public class OperatorQueueListener {
        
        @RabbitHandler
        public void receiveCancelMessage(@Payload CancelMessage message) {
            // do cancel
        }

        @RabbitHandler
        public void receiveCleanupMessage(@Payload CleanupMessage message) {
            // do cleanup
        }

        @RabbitHandler
        public void receiveStartMessage(@Payload StartProcessMessage message) {
            // start processing 
        }
        
    }
  

RabbitHandler gives us the capability to define methods based on the type without requiring us to do a type check and casting. A listener with RabbitHandler can deserialize and forward to the correct method as long as the method signature is unique and corresponds to the single type. For a new type, only a new method with a unique signature needs to be defined. The same signature rules apply as @RabbitListener is used on the method level. Compared to the previous two examples the code with RabbitHandler is much more readable and maintainable.

Breaking changes in the message structure

The message structure inevitably needs to be extended to support new features. Having an additional field on the message payload is not a problem. However, in some cases, the old message structure might not satisfy the new requirements so we're forced to change it. Here is an example case: A service is responsible for exporting data to the given location in a file system when consuming a command message. Export Command Message

    {
    "exportLocation": "/usr/share/export" // type string
}
  

Later, a new requirement comes up to enable the service to export files to an AWS S3 bucket, and as part of the design decision the location should be expressed as "java.lang.URI" type. This would mean the message structure needs a change. Let us look at the two approaches.

Option 1: Non-Breaking Change

A non-breaking can be done by introducing a new field with the type URI and marking the old field as deprecated. Export Command Message with Non-Breaking Change

    {
    "exportLocation": "/usr/share/export", // type string deprecated
    "exportLocationURI": "s3://region/export" // type java.lang.URI
}
  

The consumer needs to handle the incoming message by processing the new field. However, in this stage, the old field will stay null, and removing it will require to do the deployment in multiple stages: one for adding the new field and one for removing the old field once we make sure all consumers are updated. In case of a fanout or topic exchange, this would require coordination of consumer updates.

Option 2: New Version of Message and RabbitHandler

The other option is to create a new message as V2 with exportLocation having URI type. Export Command Message V2

    {
    "exportLocation": "s3://region/export" // type java.lang.URI
}
  

With the help of RabbitHandler, we can send this message to the same queue as the previous version. In this case, the listener code would look like this:

        @RabbitListener(queues = "queue")
    public class Listener {
        
        @RabbitHandler
        public void receive(@Payload ExportMessage message) {
            // do v1 logic
        }

        @RabbitHandler
        public void receiveURI(@Payload ExportMessageV2 message) {
            // do v2 logic
        }
        
    }
  

The code above works automatically because Spring AMQP transfers the type information from the consumer to the provider. This is achieved under the hood by adding a custom header "_TypeId_" whose value is the fully qualified name of the class. Thus, even if you use the same class name for the evolution of the Message Type, it will be able to discriminate if they are in different packages.

Conclusion

RabbitHandler from Spring AMQP is very powerful and enables us to implement consumers capable of handling queues with multiple types and allows for the seamless evolution of messages without sacrificing readability and maintainability.

Fatih Güclü Akkaya Profile Picture
Fatih Güclü Akkaya
Senior Software Engineer

As Senior Software Engineer in Data Pipeline team, Fatih and his teammates specialize in building fast and reliable pipelines to make data ready to be analyzed. He has a passion for continuous improvement on the projects he is working. Outside of work, he aspires to be a powerlifter and plans to travel around the world.

Dear visitor, you're using an outdated browser. Parts of this website will not work correctly. For a better experience, update or change your browser.