6 min read

Introducing Schema Registry for Azure Messaging Services

Justin Yoo

In my previous post, we discussed the Schema Registry as a tool for message validation. When designing asynchronous or event-/message-driven system architecture on the cloud, the schema registry should really be considered to check the validity of messages. Unfortunately, any of Azure messaging service including Queue Storage, Service Bus, Event Hub, Event Grid doesn't currently support the schema registry feature. Therefore, we have to implement it by ourselves.

Throughout this post, I'm going to build a schema registry using Azure Blob Storage and register schemas there, with sample codes.

Sample Codes and NuGet Libraries

All sample codes shown in this post use C# libraries from NuGet, based on .NET Core. Here are all the links to the libraries and documents downloadable from the GitHub repository.

Package Document Download Version
Aliencube.AzureMessaging.SchemaRegistry Document
Aliencube.AzureMessaging.SchemaRegistry.Sinks Document
Aliencube.AzureMessaging.SchemaRegistry.Sinks.Blob Document
Aliencube.AzureMessaging.SchemaRegistry.Sinks.FileSystem Document
Aliencube.AzureMessaging.SchemaRegistry.Sinks.Http Document
Aliencube.AzureMessaging.SchemaValidation Document
Aliencube.AzureMessaging.SchemaValidation.HttpClient Document
Aliencube.AzureMessaging.SchemaValidation.ServiceBus Document

Publisher/Subscriber Architecture Pattern

The Pub/Sub pattern introduced in the previous post has now a schema registry and here's the updated architecture diagram.

We're going to implement those three parts:

  • Azure Blob Storage: This works as a schema registry.
  • Azure Logic Apps: This is used for both publisher and subscriber. It'll be further discussed in the next post.
  • Azure Functions: This is used for message validation. It'll be further discussed in the next post.

Implementing Schema Registry

By declaring a container on Azure Blob Storage, we can use it as a schema registry. If the high availability is considered, get another Blob Storage instance and store schemas to both storages. However, for our convenience, we're going to create two containers in one Blob Storage, called schemas and backups, which emulates as if there are two separate Azure Storage accounts. From the resource management perspective, we need three resources to use Blob Storage as the schema registry:

  1. Storage Account instance
  2. Blob Service
  3. Blob Container

Here's the over-simplified version of ARM template for Blob Storage. If you're interested in the whole template structure, check out this GitHub page.

...
resources:
# Declare Azure Storage Account
- comments: '### RESOURCE - STORAGE ACCOUNT ###'
apiVersion: "[variables('storageAccount').apiVersion]"
type: Microsoft.Storage/storageAccounts
name: "[variables('storageAccount').name]"
location: "[variables('storageAccount').location]"
kind: StorageV2
tags: "[variables('tags')]"
sku:
name: "[variables('storageAccount').sku.name]"
tier: "[variables('storageAccount').sku.tier]"
properties:
encryption:
keySource: Microsoft.Storage
services:
blob:
enabled: true
file:
enabled: true
# Declare Azure Blob Service
- comments: '### RESOURCE - STORAGE ACCOUNT - BLOB SERVICE ###'
apiVersion: "[variables('storageAccount').blob.apiVersion]"
type: Microsoft.Storage/storageAccounts/blobServices
name: "[variables('storageAccount').blob.name]"
dependsOn:
- "[variables('storageAccount').resourceId]"
properties:
cors:
corsRules: []
deleteRetentionPolicy:
enabled: false
# Declare Azure Blob Container
- comments: '### RESOURCE - STORAGE ACCOUNT - BLOB SERVICE - BLOB CONTAINER ###'
apiVersion: "[variables('storageAccount').blob.apiVersion]"
type: Microsoft.Storage/storageAccounts/blobServices/containers
copy:
name: containers
count: "[length(variables('storageAccount').blob.container.names)]"
name: "[concat(variables('storageAccount').blob.name, '/', variables('storageAccount').blob.container.names[copyIndex()])]"
dependsOn:
- "[variables('storageAccount').blob.resourceId]"
...

As you can see above, all ARM templates are written in YAML. If you want to know more about YAML-based ARM templates, please have a look at my previous post.

After completing the ARM template, run this through Azure CLI to generate the instance.

az group deployment create \
-g [RESOURCE_GROUP_NAME] \
-n [DEPLOYMENT_NAME] \
--template-file StorageAccount.json \
--parameters @StorageAccount.parameters.json

This is the result of Schema Registry implementation.

Let's write a console app to register schemas.

Schema Registration

In a nutshell, registering schemas is just uploading them into Azure Blob Storage. Therefore, we can simply use Azure REST API or SDKs using in different languages. However, there are always use cases that Azure Blob Storage is not the only schema registry, but it can be anything, say AWS S3 Bucket or something else. To consider this sort of possibility, the library borrows the concept of the Sink and each sink works as DSL. Therefore, for our use case, declare BlobStorageSchemaSink and upload schemas through it.

The entire sample code for this schema registration console app is here.

Sink Declaration for Schema Registry

Within the console app, declare two sinks with two containers, as we are going to have two sinks, one for main and the other for backup.

// Declare the main schema registry
var mainConnectionString = "AZURE_MAIN_STORAGE_CONNECTION_STRING";
var mainBlobClient = CloudStorageAccount.Parse(mainConnectionString)
.CreateCloudBlobClient();
var mainBlobBaseUri = "AZURE_MAIN_BLOB_STORAGE_BASE_URI";
var mainSchemaContainer = "schemas";
var mainSink = new BlobStorageSchemaSink(mainBlobClient)
.WithBaseLocation(mainBlobBaseUri)
.WithContainer(mainSchemaContainer);
// Declare the backup schema registry
var backupConnectionString = "AZURE_BACKUP_STORAGE_CONNECTION_STRING";
var backupBlobClient = CloudStorageAccount.Parse(backupConnectionString)
.CreateCloudBlobClient();
var backupBlobBaseUri = "AZURE_BACKUP_BLOB_STORAGE_BASE_URI";
var backupSchemaContainer = "backups";
var backupSink = new BlobStorageSchemaSink(backupBlobClient)
.WithBaseLocation(backupBlobBaseUri)
.WithContainer(backupSchemaContainer);

When you have a look at the code, the BlobStorageSchemaSink library introduces Fluent API and actively uses the method chaining approach like WithXXX() methods. As a result, the code readability gets significantly improved.

In case you are confused the concept between "train wreck" and "method chaining", have a look at this post and this post to get a high-level overview.

Schema Producer Declaration

Now, we've got the schema sink representing the schema registry. We need to declare the SchemaProducer. As the SchemaRegistry library contains the producer, import the library and use it.

// Declare JSON schema generator settings
var settings = new JsonSchemaGeneratorSettings();
// Declare schema builder
var builder = new SchemaBuilder()
.WithSettings(settings);
// Declare schema producer
var producer = new SchemaProducer()
.WithBuilder(builder)
.WithSink(mainSink)
.WithSink(backupSink);

The code shows how to declare the producer and register two sinks that connect to each schema registry.

Schema Upload

Let's upload schemas! The following code snippet shows how to upload schema through the producer by sending the class type reference.

// Upload schema of `SampleClass`
var version = "v1";
var filename = "schema.json";
var filepath = $"{version}/{filename}";
var produced = await producer.ProduceAsync<SampleClass>(filepath)
.ConfigureAwait(false);

If a JSON schema is ready, then upload it directly like below:

var schema = "{ JSON_SCHEMA }";
// Upload schema directly
var version = "v1";
var filename = "schema.json";
var filepath = $"{version}/{filename}";
var produced = await producer.ProduceAsync(schema, filepath)
.ConfigureAwait(false);

Once the schema is uploaded, Azure Blob Storage shows it's uploaded.

So far, we've created a schema registry using Azure Blob Storage and register schemas using the NuGet libraries with a sample console app. In the next post, we're going to deal with the next part of this implementation – how publisher and subscriber make use of the schema registry for message validation.