6 min read

ServiceBusPlugin Tricks

Justin Yoo

The legacy Azure Service Bus SDK (WindowsAzure.ServiceBus) uses event handlers for message pre-/post-processing. For example, OnSend is used for synchronous message pre-processing, and both OnBeginSend and OnEndSend are used for asynchronous message pre-processing. There are other event handlers for message post-processing. It's OK to use those event handlers. But, as there is a number of event handlers to implement, the amount of codes gets massively increased, which is not that great.

Is there an elegant way of pre-/post-processing messages?

Fortunately, the new version of Azure Service Bus SDK (Microsoft.Azure.ServiceBus) uses ServiceBusPlugin, instead of lots of event handlers. In other words, as long as we implement the plug-ins, all pre-/post-processing messages are performed within the plug-in automatically.

Throughout this post, I'm going to discuss how to build a custom plug-in for Azure Service Bus, by inheriting ServiceBusPlugin.

Use Case Scenario

AS an application, I WANT to validate whether a specific user property exists in the message, SO THAT I throw an error if the user property doesn't exist.

Extending ServiceBusPlugin

The specific user property mentioned in the user story above is sender. If many information systems share the same Service Bus topic, a subscriber needs to know which one is valid for me or not. Of course, Azure Service Bus already has the filtering feature, so this might not be a good example. However, as at least it shows how the message is handled, it's worth discussing here. Anyway, without using the filtering feature, the plug-in validates the message origin by checking the sender custom property. If there's no sender or unidentified sender, the message will be rejected straightaway.

Because ServiceBusPlugin is an abstract class, it MUST be inherited. Let's create a derived class of SenderValidationPlugin:

public class SenderValidationPlugin : ServiceBusPlugin
{
}

The ServiceBusPlugin class contains an abstract property, Name. It provides a unique name to the plug-in to remove any duplicated registration. Here in this post, it is just the fully-qualified class name.

public class SenderValidationPlugin : ServiceBusPlugin
{
// Overrides abstract property
public override string Name => this.GetType().FullName;
}

The class also contains one virtual property and two virtual methods. Through those properties and methods, we can implement how the plug-in works.

  • ShouldContinueOnException: The default value of this property is false. If any exception arises, the plug-in stops further processing and bubbles up the exception.
  • BeforeMessageSend(Message message): This method validates the message before it is sent to the topic. The validation process includes checking whether the custom property exists or not. If the 'sender` property doesn't exist in the message custom property, it will throw an exception and stop sending the message to the topic.
  • AfterMessageReceive(Message message): This method intercepts the message before it is sent to the subscriber. It will also throw an exception while intercepting the message and validating it.
public class SenderValidationPlugin : ServiceBusPlugin
{
public override string Name => this.GetType().FullName;
// Overrides virtual property
public override bool ShouldContinueOnException { get; }
// Overrides virtual methods
public override async Task<Message> BeforeMessageSend(Message message)
{
throw new NotImplementedException();
}
public override async Task<Message> AfterMessageReceive(Message message)
{
throw new NotImplementedException();
}
}

Let's add a private method, ValidateAsync(Message message). It is invoked by either BeforeMessageSend or AfterMessageReceived.

public class SenderValidationPlugin : ServiceBusPlugin
{
public override string Name => this.GetType().FullName;
public override bool ShouldContinueOnException { get; }
public override async Task<Message> BeforeMessageSend(Message message)
{
return await this.ValidateAsync(message)
.ConfigureAwait(false);
}
public override async Task<Message> AfterMessageReceive(Message message)
{
return await this.ValidateAsync(message)
.ConfigureAwait(false);
}
// Implements private method for both BeforeMessageSend and AfterMessageReceive.
private async Task<Message> ValidateAsync(Message message)
{
throw new NotImplementedException();
}
}

Now all the validation logic sits inside ValidateAsync. It throws an exception and stops further processing, if:

  • There is no message,
  • There is no custom property of sender, or
  • There is no sender value expected.
private async Task<Message> ValidateAsync(Message message)
{
var cloned = message.Clone()
var body = cloned.Body;
if (!body.Any())
{
throw new InvalidOperationException("Message body not exists");
}
var payload = Encoding.UTF8.GetString(cloned.Body);
var sender = cloned.UserProperties["sender"] as string;
if (string.IsNullOrWhiteSpace(sender))
{
throw new InvalidOperationException("Sender not defined");
}
var senders = new List<string>() { "app1", "app2", "app3" };
if (!senders.Contains(sender))
{
throw new InvalidOperationException("Invalid sender");
}
return message;
}

All plug-n implementation has been completed. Let's play around it.

Registering and Using ServiceBusPlugin

When sending a message, register the plug-in to TopicClient. Let's have a look at the code below. First of all, create a new plug-in instance and register it to TopicClient. Then send a message. The code sample below will throw an exception because its sender value is lorem, which is not on the list.

var plugin = new SenderValidationPlugin();
var topic = new TopicClient("/* CONNECTION STRING */", "my-topic");
topic.RegisterPlugin(plugin);
var payload = "{ \"hello\": \"world\" }";
var body = Encoding.UTF8.GetBytes(serialised);
var message = new Message(body);
message.UserProperties.Add("sender", "lorem");
await topic.SendAsync(message)
.ConfigureAwait(false);
view raw topic-send.cs hosted with ❤ by GitHub

When receiving the message, register the plug-in to SubscriptionClient. Then the plug-in automatically validates the message before it arrives at the target system.

var plugin = new SenderValidationPlugin();
var subscription = new SubscriptionClient("/* CONNECTION STRING */", "my-topic", "my-subscription", ReceiveMode.PeekLock);
subscription.RegisterPlugin(plugin);
subscription.RegisterMessageHandler(async (message, token) =>
{
await subscription.CompleteAsync(message.SystemProperties.LockToken)
.ConfigureAwait(false);
},
new MessageHandlerOptions(args => {
Console.WriteLine(args.Exception.Message);
Console.WriteLine(args.ExceptionReceivedContext.EntityPath);
}));

As you can see above, registering the plug-in makes the custom property validation, without additional efforts. As a result, the amount of codes has been decreased. Even better the code itself has been separated from the main TopicClient or SubscriptionClient.


So far, we've walked through how to write an Azure Service Bus plug-in for message validation. This plug-in approach will developers a lot easier and happier due to its ease of use.