Azure Service Bus is one of the messaging services in Azure. According to their SLA, it guarantees 99.9% of uptime, which is approximately equivalent to 43 mins of downtime per month (30 days). If your organisation uses Azure Service Bus and can be tolerant of 43 mins of a service failure for a month, then it should be fine. However, if your system is really required for high availability, 43 mins of downtime might be unacceptable. As a part of the business continuity plan (BCP), disaster recovery can be covered by Azure Service Bus Premium Plan.
By the way, the Premium plan is way more expensive, comparing to the Standard plan. Although the Premium plan brings more benefits to the organisation, if your organisation is cost-sensitive, introducing the plan is not convincing enough. Fortunately, we can also implement the high availability with the Standard plan. Once it's implemented, the highly available architecture keeps the uptime rate of 99.9999%, which is converted to 2.6 secs per month.
The Premium plan takes care of all those disaster recovery plans, while we MUST take care of it by ourselves. There's already a sample code on GitHub, but it's for Service Bus Queue, not for Service Bus Topic. Therefore, I'm going to walk-through how to implement geo-redundant Azure Service Bus instances with Azure Functions, to Service Bus Topics.
The sample codes used in this post can be found at this GitHub repository.
Creating Two Azure Service Bus Standard Instances
First of all, we need two Azure Service Bus instances with the Standard plan. In this post, I'm going to create one in Australia Southeast as the primary region and the other in Australia East as the secondary region.
Once created, create a topic, my-topic
, and a subscription, my-topic-subscription
in both instances.
Now, we've got two Standard instances in a different region. Both are representing the geo-redundant Service Bus. Let's implement an Azure Functions app to handle messages for both.
Preparing Azure Functions
It's probably a good time to get yourself familiarised the following technics. I'm sure you can pick up those concepts pretty quickly, as it's been already used in many places.
- Deserialising environment variables
- Managing Inversion of Control (IoC) container
- Constructor injection on Azure Functions
- Message Fan-out/Fan-in
Deserialising Environment Variables
To develop an Azure Function app in our local machine, we always use the local.settings.json
file. It emulates the App Settings blade of the Azure Functions instance, which is basically a list of key/value pairs. Therefore, if we need more structural and strongly-typed configurations, the keys SHOULD follow the format like below. The overall format can be found at here.
{ | |
"Values": { | |
... | |
"AzureServiceBus__ConnectionStrings__Primary": "[AZURE_SEVICE_BUS_CONNECTION_STRING]", | |
"AzureServiceBus__ConnectionStrings__Secondary": "[AZURE_SEVICE_BUS_CONNECTION_STRING]", | |
"AzureServiceBus__Topic__Name": "my-topic", | |
"AzureServiceBus__Topic__Subscription__Name": "my-topic-subscription" | |
} | |
} |
For each nested field is concatenated with two underscores(__
). Those two underscores convert those environment variables into the strongly-typed objects. Here are class definitions of those environment variables.
public class AzureServiceBusSettings | |
{ | |
public virtual Dictionary<string, string> ConnectionStrings { get; set; } | |
public virtual AzureServiceBusTopicSettings Topic { get; set; } | |
} | |
public class AzureServiceBusTopicSettings | |
{ | |
public virtual string Name { get; set; } | |
public virtual AzureServiceBusTopicSubscriptionSettings Subscription { get; set; } | |
} | |
public class AzureServiceBusTopicSubscriptionSettings | |
{ | |
public virtual string Name { get; set; } | |
} |
The AzureServiceBusSettings
class has the ConnectionStrings
property of the dictionary type. With this dictionary, any number of Azure Service Bus instance can be declared here for geo-redundancy. Here in this post, we only use two instances, but technically we can use more instances.
AppSettings
instantiates those environment variables. Have a look at the code below. The overall code is here.
public class AppSettings : AppSettingsBase | |
{ | |
private const string ServiceBusSettingsKey = "AzureServiceBus"; | |
public AppSettings() | |
{ | |
// Get the strongly-typed app settings instance. | |
this.ServiceBus = this.Config.Get<AzureServiceBusSettings>(ServiceBusSettingsKey); | |
} | |
public virtual AzureServiceBusSettings ServiceBus { get; } | |
} |
The AppSettings
class inherits the AppSettingsBase
class which is included within the Aliencube.AzureFunctions.Extensions.Configuration.AppSettings package. This package calls the method like this.Config.Get<AzureServiceBusSettings>(ServiceBusSettingsKey)
to create the strongly-typed object that represents the entire app settings configurations. Then register the AppSettings
instance as a singleton to the IoC container. Once we complete this step, we don't need to worry about the configurations any longer.
Creating IoC Container
Until recently, earlier this year precisely, we had to use the static
modifier for Azure Functions. With this restriction, we SHOULD use the service locator pattern for dependency injection. However, as we are now able to use the IoC container, dependency management has become really easy. Have a look at the code below.
[assembly: FunctionsStartup(typeof(GeoRedundant.FunctionApp.StartUp))] | |
namespace GeoRedundant.FunctionApp | |
{ | |
public class StartUp : FunctionsStartup | |
{ | |
public override void Configure(IFunctionsHostBuilder builder) | |
{ | |
this.ConfigureAppSettings(builder.Services); | |
this.ConfigureServices(builder.Services); | |
} | |
private void ConfigureAppSettings(IServiceCollection services) | |
{ | |
services.AddSingleton<AppSettings>(); | |
} | |
private void ConfigureServices(IServiceCollection services) | |
{ | |
services.AddTransient<IMessageService, MessageService>(); | |
} | |
} | |
} |
In the IoC container, we register the AppSettings
instance as a singleton. There's another instance, IMessageService
. We'll discuss it later in this post.
Constructor Injection to Function Class
Let's create a function class. Unfortunately, Azure Functions Service Bus binding doesn't natively support multiple service instances for geo-replication. Therefore, we MUST use the SDK to send messages to multiple instances at the same time. The IMessageService
interface looks after this concern.
Therefore, this IMessageService
instance SHOULD be injected to the function class. Here's the code snippet how the constructor injection works. The complete code can be found at here.
public class MessageSendHttpTrigger | |
{ | |
private readonly IMessageService _service; | |
public MessageSendHttpTrigger(IMessageService service) | |
{ | |
this._service = service | |
?? throw new ArgumentNullException(nameof(service)); | |
} | |
... | |
} |
There's no static
modifier any longer, as you can see. In addition to this, the IMessageService
is injected through the constructor.
Implementing Message Fan-Out
It's time to implement to send messages. Have a look at the class, MessageService
, that implements IMessageService
. The AppSettings
instance is injected for use. There are two methods implemented to send messages – one for WithTopicClients()
and the other for SendAsync()
. The actual implementation is here.
public IMessageService WithTopicClients() | |
{ | |
this._topicClients.Clear(); | |
foreach (var kvp in this._settings.ConnectionStrings) | |
{ | |
var client = new TopicClient(kvp.Value, this._settings.Topic.Name); | |
this._topicClients.Add(client); | |
} | |
return this; | |
} |
The WithTopicClients()
method registers the TopicClient
instances based on the connection strings from AppSettings
. By returning this
, it's also worth noting this method also implements method chaining, using the fluent interface approach.
Let's have a look at the method, SendAsync()
. It sends a message to both Service Bus Topics at the same time, which is called Fan-out. Here's the actual code.
public async Task<string> SendAsync(string value) | |
{ | |
var body = Encoding.UTF8.GetBytes(value); | |
var message = new Message(body) { MessageId = Guid.NewGuid().ToString() }; | |
var exceptions = new ConcurrentQueue<Exception>(); | |
if (!this._topicClients.Any()) | |
{ | |
throw new InvalidOperationException("No TopicClient exist"); | |
} | |
// Fan-out messages. | |
foreach (var client in this._topicClients) | |
{ | |
try | |
{ | |
await client.SendAsync(message.Clone()); | |
} | |
catch (Exception ex) | |
{ | |
exceptions.Enqueue(ex); | |
} | |
} | |
// Throw the exception if all clients fail sending the message. | |
if (exceptions.Count == this._topicClients.Count) | |
{ | |
throw new AggregateException(exceptions); | |
} | |
return message.MessageId; | |
} |
In the method, as we've already got the list of TopicClient
instances, if one Service Bus instance is temporarily unavailable, we can send the message to the other, or both. Therefore, unless an exception occurs from both instances, we consider the message is sent successfully. As we've got only 2.6 secs of downtime, it's not an issue. As you can see, there's no fancy technic used here, but just we send messages to all instances at the same time.
However, we make sure that, when we send a message, we MUST use the same
MessageId
value. Otherwise, we can't take both messages that are the same as each other when we consume them.
The function method SHOULD call the SendAsync()
method now. Have a look at the code below. There's no static
modifier on the method any longer either. Here's the complete code.
[FunctionName(nameof(MessageSendHttpTrigger))] | |
public async Task<IActionResult> Run( | |
[HttpTrigger(AuthorizationLevel.Function, "post", Route = "messages/send")] HttpRequest req, | |
ILogger log) | |
{ | |
log.LogInformation("C# HTTP trigger function processed a request."); | |
var payload = new SamplePayload() { Message = "Hello World" }; | |
var result = await this._service | |
.WithTopicClients() | |
.SendAsync(payload) | |
.ConfigureAwait(false); | |
return new OkObjectResult(result); | |
} |
As we implemented the fluent interface, both .WithTopicClients()
and .SendAsync()
are beautifully chained one after the other. Let's send five messages for this experiment, and we'll find out there are five duplicated messages on both instances.
Fan-out has been successfully implemented. Let's move on.
Implementing Message Fan-In
We need to generate SubscriptionClient
instances from the connection strings. Here's the code.
public IMessageService WithSubscriptionClients() | |
{ | |
this._subscriptionClients.Clear(); | |
foreach (var kvp in this._settings.ConnectionStrings) | |
{ | |
var client = new SubscriptionClient( | |
kvp.Value, | |
this._settings.Topic.Name, | |
this._settings.Topic.Subscription.Name, | |
ReceiveMode.PeekLock); | |
this._subscriptionClients.Add(client); | |
} | |
return this; | |
} |
Like the other method, this method also returns this
, which implements the fluent interface.
The core part of this implementation is that regardless we pick up messages from multiple instances, we only take one for further processing, and the others are automatically processed as complete. In general, the concept of Fan-in is slightly different, but the overall approach that takes multiple messages and processes them at once is the same.
In my previous post, I've mentioned the usage of callbacks to handle messages when receiving them. This part might be slightly confusing, but following the code snippet would reduce the confusion. The actual implementation can be found at here.
public async Task ReceiveAsync(Func<ISubscriptionClient, Message, Task> callbackToProcess) | |
{ | |
var messageIds = new List<string>(); | |
var msglock = new object(); | |
// Local function: Handles messages. | |
async Task onMessageReceived(ISubscriptionClient client, Message message, int maxMessageDeduplicationCount = 20) | |
{ | |
var duplicated = false; | |
lock (msglock) | |
{ | |
duplicated = messageIds.Remove(message.MessageId); | |
if (!duplicated) | |
{ | |
messageIds.Add(message.MessageId); | |
if (messageIds.Count > maxMessageDeduplicationCount) | |
{ | |
messageIds.RemoveAt(0); | |
} | |
} | |
} | |
if (!duplicated) | |
{ | |
await callbackToProcess(client, message).ConfigureAwait(false); | |
} | |
} | |
var exceptions = new ConcurrentQueue<Exception>(); | |
// Local function: Handles exceptions. | |
async Task onExceptionReceived(ExceptionReceivedEventArgs args) | |
{ | |
exceptions.Enqueue(args.Exception); | |
await Task.CompletedTask.ConfigureAwait(false); | |
} | |
if (!this._subscriptionClients.Any()) | |
{ | |
throw new InvalidOperationException("No SubscriptionClient exist"); | |
} | |
foreach (var client in this._subscriptionClients) | |
{ | |
client.RegisterMessageHandler( | |
(msg, token) => onMessageReceived(client, 1, msg), | |
new MessageHandlerOptions(onExceptionReceived) { AutoComplete = true, MaxConcurrentCalls = 1 }); | |
} | |
if (exceptions.Count == this._subscriptionClients.Count) | |
{ | |
throw new AggregateException(exceptions); | |
} | |
await Task.CompletedTask.ConfigureAwait(false); | |
} |
-
onMessageReceived
: Starting from C# 7.0, we can use local functions within a method. As it increases readability, use the local functions instead of lambda functions.- As everything works asynchronously, we have no idea which
SubscriptionClient
instance picks up the message first. - However, we MUST process only one message against the same
MessageId
. Therefore, using thelock
block will release the message double-take issue. - Once picking up the message, process it with the lambda expression passed through the
callbackToProcess
parameter. - The default value of
maxMessageDeduplicationCount
is20
, but it's just an arbitrary number. Any number can be set, as long as it's bing enough so that no message double-take happens.
- As everything works asynchronously, we have no idea which
onExceptionReceived
: Another local function for exception handling.MessageHandlerOptions
It has the property,AutoComplete
, that setstrue
. It is because all other unpicked redundant messages need to be automatically completed.
This method now needs to be called within the function method. Here's the implementation.
[FunctionName(nameof(MessageReceiveHttpTrigger))] | |
public async Task<IActionResult> Run( | |
[HttpTrigger(AuthorizationLevel.Function, "post", Route = "messages/receive")] HttpRequest req, | |
ILogger log) | |
{ | |
log.LogInformation("C# HTTP trigger function processed a request."); | |
await this._service | |
.WithSubscriptionClients() | |
.ReceiveAsync(async (client, message) => | |
{ | |
log.LogInformation($"Processed: {message.MessageId} at {client.ServiceBusConnection.Endpoint}"); | |
await Task.CompletedTask.ConfigureAwait(false); | |
}) | |
.ConfigureAwait(false); | |
return new OkResult(); | |
} |
The code above sets a lambda expression within the ReceiveAsync()
method. The lambda expression is the actual logic to process the message. This example only logs the message ID, but the real business logic SHOULD come here.
Let's run this function. Only one message is picked from either the primary region or secondary region, and all the others are completed automatically. The screenshot below shows how each region was picked up for message handling.
After all, all messages have been processed, and both instances have no more messages left.
Considerations
So far, we've walked through the implementation of geo-redundant Service Bus instances for high availability. As mentioned in the first place, the Premium plan takes care of this behind the scene. However, if we really want to achieve this with the Standard plan, please make sure that:
- The implementation here is active/active. In other words, we use two Service Bus instances, and messages are duplicated. We pay double.
- Therefore, we need to carefully monitor the usage of two Standard plan instances, comparing to use only one Premium plan instance. At the end of the day, the Premium plan might be cheaper.
- We should also consider the development cost and maintenance cost, as we're all doing it by ourselves.
After considering all those things, if your organisation decides to use this approach, it would be worth trying.