The legacy Azure Service Bus SDK (WindowsAzure.ServiceBus) uses event handlers for message pre-/post-processing. For example, OnSend
is used for synchronous message pre-processing, and both OnBeginSend
and OnEndSend
are used for asynchronous message pre-processing. There are other event handlers for message post-processing. It's OK to use those event handlers. But, as there is a number of event handlers to implement, the amount of codes gets massively increased, which is not that great.
Is there an elegant way of pre-/post-processing messages?
Fortunately, the new version of Azure Service Bus SDK (Microsoft.Azure.ServiceBus) uses ServiceBusPlugin
, instead of lots of event handlers. In other words, as long as we implement the plug-ins, all pre-/post-processing messages are performed within the plug-in automatically.
Throughout this post, I'm going to discuss how to build a custom plug-in for Azure Service Bus, by inheriting ServiceBusPlugin
.
Use Case Scenario
AS an application, I WANT to validate whether a specific user property exists in the message, SO THAT I throw an error if the user property doesn't exist.
ServiceBusPlugin
Extending The specific user property mentioned in the user story above is sender
. If many information systems share the same Service Bus topic, a subscriber needs to know which one is valid for me or not. Of course, Azure Service Bus already has the filtering feature, so this might not be a good example. However, as at least it shows how the message is handled, it's worth discussing here. Anyway, without using the filtering feature, the plug-in validates the message origin by checking the sender
custom property. If there's no sender or unidentified sender, the message will be rejected straightaway.
Because ServiceBusPlugin
is an abstract class, it MUST be inherited. Let's create a derived class of SenderValidationPlugin
:
public class SenderValidationPlugin : ServiceBusPlugin | |
{ | |
} |
The ServiceBusPlugin
class contains an abstract property, Name
. It provides a unique name to the plug-in to remove any duplicated registration. Here in this post, it is just the fully-qualified class name.
public class SenderValidationPlugin : ServiceBusPlugin | |
{ | |
// Overrides abstract property | |
public override string Name => this.GetType().FullName; | |
} |
The class also contains one virtual property and two virtual methods. Through those properties and methods, we can implement how the plug-in works.
ShouldContinueOnException
: The default value of this property isfalse
. If any exception arises, the plug-in stops further processing and bubbles up the exception.BeforeMessageSend(Message message)
: This method validates the message before it is sent to the topic. The validation process includes checking whether the custom property exists or not. If the 'sender` property doesn't exist in the message custom property, it will throw an exception and stop sending the message to the topic.AfterMessageReceive(Message message)
: This method intercepts the message before it is sent to the subscriber. It will also throw an exception while intercepting the message and validating it.
public class SenderValidationPlugin : ServiceBusPlugin | |
{ | |
public override string Name => this.GetType().FullName; | |
// Overrides virtual property | |
public override bool ShouldContinueOnException { get; } | |
// Overrides virtual methods | |
public override async Task<Message> BeforeMessageSend(Message message) | |
{ | |
throw new NotImplementedException(); | |
} | |
public override async Task<Message> AfterMessageReceive(Message message) | |
{ | |
throw new NotImplementedException(); | |
} | |
} |
Let's add a private method, ValidateAsync(Message message)
. It is invoked by either BeforeMessageSend
or AfterMessageReceived
.
public class SenderValidationPlugin : ServiceBusPlugin | |
{ | |
public override string Name => this.GetType().FullName; | |
public override bool ShouldContinueOnException { get; } | |
public override async Task<Message> BeforeMessageSend(Message message) | |
{ | |
return await this.ValidateAsync(message) | |
.ConfigureAwait(false); | |
} | |
public override async Task<Message> AfterMessageReceive(Message message) | |
{ | |
return await this.ValidateAsync(message) | |
.ConfigureAwait(false); | |
} | |
// Implements private method for both BeforeMessageSend and AfterMessageReceive. | |
private async Task<Message> ValidateAsync(Message message) | |
{ | |
throw new NotImplementedException(); | |
} | |
} |
Now all the validation logic sits inside ValidateAsync
. It throws an exception and stops further processing, if:
- There is no message,
- There is no custom property of
sender
, or - There is no sender value expected.
private async Task<Message> ValidateAsync(Message message) | |
{ | |
var cloned = message.Clone() | |
var body = cloned.Body; | |
if (!body.Any()) | |
{ | |
throw new InvalidOperationException("Message body not exists"); | |
} | |
var payload = Encoding.UTF8.GetString(cloned.Body); | |
var sender = cloned.UserProperties["sender"] as string; | |
if (string.IsNullOrWhiteSpace(sender)) | |
{ | |
throw new InvalidOperationException("Sender not defined"); | |
} | |
var senders = new List<string>() { "app1", "app2", "app3" }; | |
if (!senders.Contains(sender)) | |
{ | |
throw new InvalidOperationException("Invalid sender"); | |
} | |
return message; | |
} |
All plug-n implementation has been completed. Let's play around it.
ServiceBusPlugin
Registering and Using When sending a message, register the plug-in to TopicClient
. Let's have a look at the code below. First of all, create a new plug-in instance and register it to TopicClient
. Then send a message. The code sample below will throw an exception because its sender
value is lorem
, which is not on the list.
var plugin = new SenderValidationPlugin(); | |
var topic = new TopicClient("/* CONNECTION STRING */", "my-topic"); | |
topic.RegisterPlugin(plugin); | |
var payload = "{ \"hello\": \"world\" }"; | |
var body = Encoding.UTF8.GetBytes(serialised); | |
var message = new Message(body); | |
message.UserProperties.Add("sender", "lorem"); | |
await topic.SendAsync(message) | |
.ConfigureAwait(false); |
When receiving the message, register the plug-in to SubscriptionClient
. Then the plug-in automatically validates the message before it arrives at the target system.
var plugin = new SenderValidationPlugin(); | |
var subscription = new SubscriptionClient("/* CONNECTION STRING */", "my-topic", "my-subscription", ReceiveMode.PeekLock); | |
subscription.RegisterPlugin(plugin); | |
subscription.RegisterMessageHandler(async (message, token) => | |
{ | |
await subscription.CompleteAsync(message.SystemProperties.LockToken) | |
.ConfigureAwait(false); | |
}, | |
new MessageHandlerOptions(args => { | |
Console.WriteLine(args.Exception.Message); | |
Console.WriteLine(args.ExceptionReceivedContext.EntityPath); | |
})); |
As you can see above, registering the plug-in makes the custom property validation, without additional efforts. As a result, the amount of codes has been decreased. Even better the code itself has been separated from the main TopicClient
or SubscriptionClient
.
So far, we've walked through how to write an Azure Service Bus plug-in for message validation. This plug-in approach will developers a lot easier and happier due to its ease of use.