Skip to main content

Different methods to implement Message Aggregator pattern using Service Bus Topic – CorrelationId

In one of my last post I presented the Aggregator Pattern. The main purpose of this pattern is the ability of the consumer to combine (aggregate) messages. This pattern can be implemented in Windows Azure using Windows Azure Service Bus Queues or Topics.
There are two different implementation for this pattern. The implementations are extremely different and also can affect our performance.
The first implementation requires using the Session support from BrokeredMessage. What this means from the code perspective? We need to set the Session each time when we want to send a message. The consumer will start to consume the messages with the specific session id. This solution is simple and it works great when we don’t have a lot of messages. For example if we have only 9-10 messages.
The first advantage of this implementation is on the consumer side. We don’t need to create the consume before messages are added to the Service Bus. The messages will be persisted in the Service Bus infrastructure. One of the downside of this implementation is the numbers of the consumers that we can have for the same session. We can have only one consumer. Because of this, if we want to broadcast a message to more than one consumer … we will have a problem. Also, the current implementation of session doesn’t use any kind of hashing for the id field. Because of this, if we want to process thousands of messages per second, maybe we will have a problem.
Producer
Stream messageStream = message.GetBody<Stream>();
for (int offset = 0;
    offset < length;
    offset += 255)
        {                     
            long messageLength = (length - offset) > 255
                ? 255
        : length - offset;
            byte[] currentMessageContent = new byte[messageLength];
            int result = messageStream.Read(currentMessageContent, 0, (int)messageLength);
            BrokeredMessage currentMessage = new BrokeredMessage(
        new MemoryStream(currentMessageContent),
        true);
            subMessage.SessionId = currentContentId;
            qc.Send(currentMessage);
        }

Consumer

MemoryStream finalStream = new MemoryStream();
MessageSession messageSession = qc.AcceptMessageSession(mySessionId);
while(true)
{
    BrokeredMessage message = messageSession.Receive(TimeSpan.FromSeconds(20));
    if(message != null)
    {
                message.GetBody<Stream>().CopyTo(finalStream);
                message.Complete();
                continue;
    }
   break;
}
In the above example we are spitting a stream in small parts and sending it on the Service Bus.
But, what should we do if we want more than this. For example if we want to send messages to more than one consumer. In this case, if we need a way to group messages, like session we will need to think twice. We have some options for this situation. We can use the CorrelationId for this purpose of to add a custom property to the BrokeredMessage. CorrelationId use hashing, because of this is faster than the first option.
Both solutions will work, but we will encounter the same problem in both of them. How we can create a subscription for the given CorrelationId of property before starting receiving messages. This is the biggest problem that we need to resolve.
Before talking about this problem, I would like to talk a little about CorrelationId. This fields that can be set for each BrokeredMessage that is send on the wire. The advantage using is how we can define the filter. We have a pre-define filter for the CorrelationId that can be use when we want to create a subscription. Also each id is hashed; because of this the check is not made using string comparison.
But what is our problem using CorrelationId. We can broadcast a message to more than one subscriber, but… Yes, there is a but. We need to know the correlation id in the moment when we create the subscription. Why? Because the correlation id need to specify in the moment when we are creating the subscriber. Correlation id need to be specified for a subscriber as a CorrelationFilterExpression.
This is not the end of the road. We can find solutions for this problem. The trick is to notify the consumers before adding messages to the Service Bus Topic about the new group of messages that will be added to the system. For this purpose we can use the same Service Bus Topic, or another topic for this. We can have some special messages that have some custom property that describe what will be content of the next flow of messages with the given correlation id. Based on this information, each consumer will be able to decide if we want to receive the given messages.
The trick here is to register the subscribers before the moment when you start broadcast the messages with a given correlation id. The messages will be sending only to the subscribers that are already listening. Because of this, if you are not register from the beginning, there are chances to lose some of the messages.
You need some kind of callback or a timer. Because in a Service Bus pattern, the producer doesn’t know the numbers of subscribers, we cannot create a mechanism where each subscriber notifies the producer using another topic. We could have a waiting time (5s) on the producer side, after he sends the message that notify about the new correlation id.

Here is the code that we should have on the producer and on the consumer side.
-on the producer side, we need to create and send the message that contains the new correlation id. After this we will need to wait a period of time, until the consumers will be able to register to it.
BrokeredMessage message = new BrokeredMessage();
message.Properties["NewCorrelationId"] = 1;
message.Properties["Content"] = "new available cars for rent";
topicClient.Send(message);
Thread.Sleep(10000);
-creating the subscription that check if the message contains the property that specify the correlation id.
namespaceManager.CreateSubscription(
     “myTopic”,
     “listenToNewCorrelationIds”,
     new SqlFilterExpression("EXISTS(NewCorrelationId”));
-create the consumer that processes the message, by creating a new subscription.
SubscriptionClient client =
    SubscriptionClient.CreateFromConnectionString(
        connectionString,
        "myTopic",
        “listenToNewCorrelationIds”);
BrokeredMessage correlationIdMessage = client.Receive();
namespaceManager.CreateSubscription(
     “myTopic”,
     “listenToMyCorrelationId”,
     new CorrelationFilterExpression(correlationIdMessage.Properties["NewCorrelationId"]));
SubscriptionClient client =
    SubscriptionClient.CreateFromConnectionString(
        connectionString,
        "myTopic",
        “listenToMyCorrelationId);

... client.Receive() …
From performance perspective is would be better to use a different topic to send the messages that contains the new correlation id.
We can imagine a lot of implementation. What we need to remember when using correlation id that we need to create a subscription for the given correlation id before starting sending the messages to it. This waiting period that I described in the above paragraphs is the most challenging part. 
In conclusion, we should use session when we need to have only one consumer. If we need more than one consumer, than we should use correlation id.

Comments

Popular posts from this blog

Windows Docker Containers can make WIN32 API calls, use COM and ASP.NET WebForms

After the last post , I received two interesting questions related to Docker and Windows. People were interested if we do Win32 API calls from a Docker container and if there is support for COM. WIN32 Support To test calls to WIN32 API, let’s try to populate SYSTEM_INFO class. [StructLayout(LayoutKind.Sequential)] public struct SYSTEM_INFO { public uint dwOemId; public uint dwPageSize; public uint lpMinimumApplicationAddress; public uint lpMaximumApplicationAddress; public uint dwActiveProcessorMask; public uint dwNumberOfProcessors; public uint dwProcessorType; public uint dwAllocationGranularity; public uint dwProcessorLevel; public uint dwProcessorRevision; } ... [DllImport("kernel32")] static extern void GetSystemInfo(ref SYSTEM_INFO pSI); ... SYSTEM_INFO pSI = new SYSTEM_INFO(

Azure AD and AWS Cognito side-by-side

In the last few weeks, I was involved in multiple opportunities on Microsoft Azure and Amazon, where we had to analyse AWS Cognito, Azure AD and other solutions that are available on the market. I decided to consolidate in one post all features and differences that I identified for both of them that we should need to take into account. Take into account that Azure AD is an identity and access management services well integrated with Microsoft stack. In comparison, AWS Cognito is just a user sign-up, sign-in and access control and nothing more. The focus is not on the main features, is more on small things that can make a difference when you want to decide where we want to store and manage our users.  This information might be useful in the future when we need to decide where we want to keep and manage our users.  Feature Azure AD (B2C, B2C) AWS Cognito Access token lifetime Default 1h – the value is configurable 1h – cannot be modified

What to do when you hit the throughput limits of Azure Storage (Blobs)

In this post we will talk about how we can detect when we hit a throughput limit of Azure Storage and what we can do in that moment. Context If we take a look on Scalability Targets of Azure Storage ( https://azure.microsoft.com/en-us/documentation/articles/storage-scalability-targets/ ) we will observe that the limits are prety high. But, based on our business logic we can end up at this limits. If you create a system that is hitted by a high number of device, you can hit easily the total number of requests rate that can be done on a Storage Account. This limits on Azure is 20.000 IOPS (entities or messages per second) where (and this is very important) the size of the request is 1KB. Normally, if you make a load tests where 20.000 clients will hit different blobs storages from the same Azure Storage Account, this limits can be reached. How we can detect this problem? From client, we can detect that this limits was reached based on the HTTP error code that is returned by HTTP