Load-balanced Ordered Message Processing with Apache ActiveMQ

On my previous post (Preserving Message Order with Apache ActiveMQ) we took a look on what would be necessary to preserve message order using Apache ActiveMQ (FUSE Message Broker). We then explored on the ActiveMQ capabilities called Exclusive Consumers.

But, Exclusive Consumers bring one disadvantage that is having active consumers not doing anything (actually not consuming messages) and that may be not the desired behavior. So, let's take a look on how ActiveMQ (FUSE Message Broker) can help to avoid that situation and explore what capabilities and/or techniques are available to process messages in order when multiple consumers are active.

ActiveMQ has a feature called Message Groups which is the way to load balance multiple active consumers listening to the same queue while preserving message order. ActiveMQ will then group messages on the queue and it will guarantee that all messages of a particular group arrive in order on the same consumer.

So, to give you an example, let's say we're processing records about drivers (adding new drivers, updating drivers' information, processing speed tickets, revoking tickets, etc). You definitely don't want to process the revoke information before the ticket record because that would cause a lot of problems when the ticket doesn't exist yet and the application will try to update its information. Then, to solve that problem we could group messages by driver's license number and then let the message broker process them accordingly. Now, from the global sense the messages may not be processed in order but from the important business application sense of related messages they are delivered in the preserved order.

In the picture below, messages addressed with the same JMSXGroupID, in this case "GRP1" and "GRP2", will be pushed to a unique consumer, in other words, Consumer A will get messages that contain the JMSXGroupID equals to GRP1 and Consumer B will get messages that contain the property JMSXGroupID equals to GRP2.


In summary, ActiveMQ will correlate messages based on the property JMSXGroupsID and will deliver those to the same consumer.

On a side note, messages without a group (meaning messages with no value specified in the JMSXGroupID header property) will be distributed (load balanced) among active consumers as they would be in the normal ActiveMQ queue message processing.

But, how to implement message groups?

Very simple… the only thing you have to do is to set the JMSXGroupID property on the client producer before you send the message with whatever value you want to use for correlation. The broker itself will take care of everything else for you.

Here is a snippet of a Java client producer creating a JMS Text Message and then setting the JMSXGroupID property:

Message message = session.createTextMessage(<foo>hey</foo>);
message.setStringProperty(JMSXGroupID, Msg_group_1);


In the example above, all messages in the queue with JMSXGroupID set to Msg_group_1 will be directed exclusively to a single consumer.

How the messages reach the correct consumer then?

Well, the message broker maintains an internal hash map with the Group IDs and the consumers receiving messages for that specific group.

The consumer will then receive messages for that group until it drops the connection or a producer closes the message group by sending a message with a property called JMSXGroupSeq containing the value of -1 (minus one). That's going to be the flag telling the message broker that the group is no longer needed and the reference in the hash table can be removed.

For new message groups, the broker does a random selection of the active consumers and then start placing messages on that consumer.

So, to illustrate how to close a group, the following code shows the Java producer client closing the Msg_Group_1:

Message message = session.createTextMessage(<foo>bar</foo>);
message.setStringProperty(JMSXGroupIDMsg_group_1);
message.setIntProperty(JMSXGroupSeq-1);
producer.send(message);


Then, if another message is sent to the same message group ID, the message broker will handle that as a new message group randomly assigned to a new consumer.




Comments

  1. Hi Marcelo,

    I am facing an issue with implementing message grouping with ActiveMQ 5.4.3. I have addded the JMSXGroupID to the JMS mesage being sent from the producer. Also added below line in the broker-config.xml



    But when i try to send 2 groups of messages each group having 5 messages each from a client, I find that the messages are consumed by differernt consumers(Consumer in my case is an MDB deployed in JBoss server. ActiveMQ started when JBoss starts). There are differernt instances of MDBs created for each message and all are processed in parallel.

    Altho' I expected that the messages in 1 group be processed by one particular consumer .i.e. one instance of MDB.

    Can you please help me out. I cant understand what I might be doing incorrectly here

    Thank You
    Farheen

    ReplyDelete
  2. Hi Farheen,
    I can't see the line you added to the broker-config.xml (for some reason it got truncated).
    Just out of curiosity, have you tried to troubleshoot this issue using a simple java client to make sure that's not related to the JBoss setup/deployment in first place?
    Take one of the samples shipped with ActiveMQ and try to reproduce the issue. If that doesn't happen then you got a hint where the problem may be.
    Hope this helps,
    -Marcelo

    ReplyDelete
  3. Hi Marcelo,

    Thank u for replying.

    I havent tried with a sample tho' but I came across the below JIRA issue which is still unresolved. Would u ve any idea on this?

    https://issues.apache.org/jira/browse/AMQ-1126

    Farheen

    ReplyDelete
  4. Unfortunately, I don't have any additional comments on this and I also saw you commented on that thread.

    ReplyDelete
  5. This comment has been removed by the author.

    ReplyDelete
    Replies
    1. See http://activemq.apache.org/message-groups.html for more information.

      Delete
  6. I have a question pertaining to the particular situation:

    1. AMQ Message Producer(MP) receives the messages from external system.

    2. MP uses one of the field in the message as the message sequence.

    3. Let's say MP puts messages M1, M2, M4. as it did not receive M3.

    4. Assume that there is no consumer listening, so M1, M2, M4 remains in AMQ.

    5. Later MP receives M3, and puts it using same GroupId.

    6. So now, Messages in the AMQ will be M1, M2, M4, M3 order put timestamp put in the queue.

    7. Will AMQ will deliver these messages in order M1, M2, M3, M4 or M1, M2, M4, M3 when consumer is back available?

    ReplyDelete

Post a Comment

Popular posts from this blog

Calling Web Services with Apache Camel

How to Declare Variables in MS-SQL Server Management Studio

Using HTTP-based endpoints with Apache Camel