<?xml version="1.0" encoding="utf-8"?><rss version="2.0"><channel><title>sfeldman.NET</title><link>https://weblogs.asp.net:443/sfeldman/</link><description>.NET, code, personal thoughts</description><item><title>Service Bus subscriptions - ARM client vs ServiceBusAdministrationClient</title><link>https://weblogs.asp.net:443/sfeldman/service-bus-subscripions-arm-client-vs-servicebusadministrationclient</link><description>&lt;p&gt;Provisioning Azure Service Bus subscriptions under a topic doesn't sound like a big deal. However, Azure offers two SDKs: the Service Bus Management and Messaging libraries. Both can create subscriptions. Which library is the right one?&lt;/p&gt;
&lt;h2&gt;Management Library&lt;/h2&gt;
&lt;p&gt;The management library for ASB is available via the &lt;a href="https://www.nuget.org/packages/Azure.ResourceManager.ServiceBus/"&gt;Azure.ResourceManager.ServiceBus&lt;/a&gt; package. It operates on the ARM (Azure Resource Manager) level. It's important to remember that this library operates on each entity as an individual resource. Creating a subscription is a &lt;strong&gt;separate&lt;/strong&gt; operation from creating a rule. When a subscription is created first, it will, in fact, have a default SQL rule, &lt;code&gt;1=1&lt;/code&gt;. While for most greenfield scenarios, this might not be an issue, for existing systems, this might pose a challenge. If there are already published messages, and we're only interested in a subset of those messages, then for the time between the subscription resource creation and the rule resource creation, there's a time window where the default rule will be in place, allowing accepting all messages to be received by the subscription queue. And that's a problem as receiving unplanned message types could cause the process to fail until the unwanted messages are drained.&lt;/p&gt;
&lt;h2&gt;Messaging Library&lt;/h2&gt;
&lt;p&gt;Messaging library, in general, is intended for message processing, but the creation of subscriptions and rules modifications is supported by the &lt;a href="https://learn.microsoft.com/en-us/dotnet/api/azure.messaging.servicebus.administration.servicebusadministrationclient?view=azure-dotnet"&gt;&lt;code&gt;ServiceBusAdministrationClient&lt;/code&gt;&lt;/a&gt; found in the package &lt;a href="https://www.nuget.org/packages/Azure.Messaging.ServiceBus/"&gt;Azure.Messaging.ServiceBus&lt;/a&gt;. When using the management client, combining the subscription provisioning with its default rule is possible.&lt;/p&gt;
&lt;pre&gt;&lt;code&gt;var admin = new ServiceBusAdministrationClient(&amp;quot;&amp;lt;fully-qualified-namespace&amp;gt;&amp;quot;, new DefaultAzureCredential());

await admin.CreateSubscriptionAsync(
	new CreateSubscriptionOptions(&amp;quot;topic&amp;quot;, &amp;quot;subscription&amp;quot;),
	new CreateRuleOptions(&amp;quot;rule&amp;quot;, new SqlRuleFilter(&amp;quot;\&amp;quot;message-type\&amp;quot;='PageViewed'&amp;quot;)) // SQL or Correlation filter
);
&lt;/code&gt;&lt;/pre&gt;
&lt;p&gt;With this approach, the subscription and the rule are created simultaneously, eliminating any chance of receiving unaccounted-for message types.&lt;/p&gt;
&lt;p&gt;Note: To perform these operations, you must have Manage rights to the Service Bus namespace. When using Azure Identity, the &lt;code&gt;Service Bus Data Owner role&lt;/code&gt; must be granted to your identity.&lt;/p&gt;
</description><pubDate>Sun, 23 Feb 2025 02:39:00 GMT</pubDate><guid isPermaLink="true">https://weblogs.asp.net:443/sfeldman/service-bus-subscripions-arm-client-vs-servicebusadministrationclient</guid><category>ASB</category></item><item><title>DateTime to String with Custom Formatting</title><link>https://weblogs.asp.net:443/sfeldman/datetime-to-string-with-custom-formatting</link><description>&lt;p&gt;When formatting &lt;code&gt;DateTime&lt;/code&gt; to a string, the format specifier provides access to the parts of the date and time we want to express as a string. E.g.&lt;/p&gt;
&lt;pre&gt;&lt;code&gt;DateTime.UtcNow.ToString(&amp;quot;yyyy-MM-dd HH:mm:ss.fff&amp;quot;)
&lt;/code&gt;&lt;/pre&gt;
&lt;p&gt;will produce something like &lt;code&gt;2024-11-03 12:34:56.789&lt;/code&gt;. But, you must be extra careful with the time separator &lt;code&gt;:&lt;/code&gt;. It's not always the same for all cultures, and if an explicit culture is not specified, the default local culture might surprise you. Let's see an example.&lt;/p&gt;
&lt;p&gt;Let's say the code is running on a machine set up with Finish culture.&lt;/p&gt;
&lt;pre&gt;&lt;code&gt;DateTime.UtcNow.ToString(&amp;quot;yyyy-MM-dd HH:mm:ss.fff&amp;quot;, new CultureInfo(&amp;quot;fi-FI&amp;quot;)).Dump();
&lt;/code&gt;&lt;/pre&gt;
&lt;p&gt;The same code snippet used earlier produces an entirely different result, &lt;code&gt;2024-11-03 12.34.56.789&lt;/code&gt;. But how is that possible? That's because the &lt;code&gt;:&lt;/code&gt; custom format specifier is culture-specific. The separator character must be specified within a literal string delimiter to change the time separator for a particular date and time string.&lt;/p&gt;
&lt;pre&gt;&lt;code&gt;DateTime.UtcNow.ToString(&amp;quot;yyyy-MM-dd HH':'mm':'ss.fff&amp;quot;)
&lt;/code&gt;&lt;/pre&gt;
&lt;p&gt;Or escaped.&lt;/p&gt;
&lt;pre&gt;&lt;code&gt;DateTime.UtcNow.ToString(&amp;quot;yyyy-MM-dd HH\\:mm\\:ss.fff&amp;quot;)
&lt;/code&gt;&lt;/pre&gt;
&lt;p&gt;Escaping would be required to avoid surprises if date formatting &lt;code&gt;yyyy/MM/dd&lt;/code&gt; is needed. Find more about date and time separator specifiers on &lt;a href="https://learn.microsoft.com/en-us/dotnet/standard/base-types/custom-date-and-time-format-strings#date-and-time-separator-specifiers"&gt;MSDN&lt;/a&gt;.&lt;/p&gt;
</description><pubDate>Sun, 03 Nov 2024 20:49:36 GMT</pubDate><guid isPermaLink="true">https://weblogs.asp.net:443/sfeldman/datetime-to-string-with-custom-formatting</guid><category>C#</category></item><item><title>Creating Azure Storage SFTP with Password using Bicep</title><link>https://weblogs.asp.net:443/sfeldman/creating-azure-storage-sftp-with-password-using-bicep</link><description>&lt;p&gt;Azure Storage service has a neat little option for hosting an SFTP. Doing so lets you upload your files as blobs to your Storage account. This is extremely helpful, especially when working on the decades-old system migrated to Azure but still requiring SFTP for data transfer. The documentation and setup of SFTP with a Storage account are straightforward—until you try to create the resource using Bicep and set the password as part of Bicep deployment. This is where it's getting a bit cumbersome.&lt;/p&gt;
&lt;p&gt;TLDR: Setting the password when creating the Storage account and SFTP user using Bicep is impossible. The password has to be &lt;strong&gt;reset&lt;/strong&gt;.&lt;/p&gt;
&lt;p&gt;This means that OOTB Bicep can create an SFTP user but cannot set the password. The password needs to be reset, even if it hasn't been set yet, and the only way to do that is via the portal UI or scripting. The portal UI option is unacceptable if you're trying to automate your resource deployment. Which leaves the scripting option. Let's dive into the code.&lt;/p&gt;
&lt;pre&gt;&lt;code&gt;param location string = resourceGroup().location

var sftpRootContainterName = 'sftp'
var sftpUserName = 'sftpuser'
var unique = uniqueString(resourceGroup().id)

resource storageAccount 'Microsoft.Storage/storageAccounts@2022-09-01' = {
  name: toLower('mysftp${unique}')
  location: location
  sku: {
    name: 'Standard_LRS'
  }
  kind: 'StorageV2'
  properties: {
    allowBlobPublicAccess: false
    allowCrossTenantReplication: false
    allowSharedKeyAccess: true
    isHnsEnabled: true
    isLocalUserEnabled: true
    isSftpEnabled: true
    isNfsV3Enabled: false
    minimumTlsVersion: 'TLS1_2'
    supportsHttpsTrafficOnly: true
  }
  tags: {}
}

resource blobServicesResource 'Microsoft.Storage/storageAccounts/blobServices@2022-09-01' = {
  parent: storageAccount
  name: 'default'
  properties: {
  }

  resource sftpStorageContainer 'containers' = {
    name: sftpRootContainterName
    properties: {
      publicAccess: 'None'
    }
  }
}

resource sftpLocalUserResource 'Microsoft.Storage/storageAccounts/localUsers@2023-05-01' = {
  name: sftpUserName 
  parent: storageAccount
  properties: {
    permissionScopes: [
      {
        permissions: 'rcwdl'
        service: 'blob'
        resourceName: sftpRootContainterName
      }
    ]
    homeDirectory: '${sftpRootContainterName}/' // This user will have complete control over the &amp;quot;root&amp;quot; directory in sftpRootContainterName
    hasSharedKey: false
  }
}

// Managed identity necessary to execute the scirpt
resource storageAccountManagedIdentity 'Microsoft.ManagedIdentity/userAssignedIdentities@2023-01-31' existing = {
  name: 'mi-sandbox-sean-feldman'
  scope: resourceGroup()
}

// The script to reset the password
resource deploymentScript 'Microsoft.Resources/deploymentScripts@2023-08-01'= {
  name: 'mysftp-inlineCLI-${unique}'
  location: location
  kind: 'AzureCLI'
  identity: {
    type: 'UserAssigned'
    userAssignedIdentities: {
      '${storageAccountManagedIdentity.id}': {}
    }
  }
  properties: {
    azCliVersion: '2.63.0'
    arguments: '${storageAccount.name} ${resourceGroup().name} ${sftpUserName}'
    scriptContent: '''
      az storage account local-user regenerate-password --account-name $1 -g $2 -n $3
    '''
    timeout: 'PT5M'                 // Set timeout for the script execution (optional)
    cleanupPreference: 'OnSuccess'  // Automatically clean up after success
    retentionInterval: 'PT1H'       // Retain script resources for 1 hour after execution
  }
}

// DO NOT do this in production
output text string = deploymentScript.properties.outputs.sshPassword
&lt;/code&gt;&lt;/pre&gt;
&lt;p&gt;The solution is to deploy and run the &lt;code&gt;deploymentScript&lt;/code&gt; AZ CLI script to reset the password. The output of the &lt;code&gt;az storage account local-user regenerate-password&lt;/code&gt; is the generated password, the output object of the script resource, as the &lt;code&gt;sshPassword&lt;/code&gt;. But this is not ideal for production. For production, keeping the password in Azure KeyVault or Azure Config Service is better. With a twist, testing if the value exists first and setting it only if it doesn't is better.&lt;/p&gt;
</description><pubDate>Mon, 21 Oct 2024 04:08:20 GMT</pubDate><guid isPermaLink="true">https://weblogs.asp.net:443/sfeldman/creating-azure-storage-sftp-with-password-using-bicep</guid><category>Azure</category><category>Bicep</category></item><item><title>Auditing with Azure Functions</title><link>https://weblogs.asp.net:443/sfeldman/auditing-with-azure-functions</link><description>&lt;p&gt;In the previous two posts about &lt;a href="https://weblogs.asp.net/sfeldman/recoverability-with-azure-functions-delayed-retries"&gt;recoverability&lt;/a&gt;, I focused on the rainy day scenarios where intermittent failures require retries and backoffs. This post will focus on the happy day scenario, where everything works as expected. So what's the issue then?&lt;/p&gt;
&lt;p&gt;A successful message processing is not the only outcome that's required. More often than not there's also an audit trail that's requried. Imagine processing purchase orders. Not only you want to know nothing has failed. You might also want to have the confidence in a form of an audit trail that consists of those processed messages.&lt;/p&gt;
&lt;p&gt;With Azure Functions Isolated Worker SDK, this becomes an extremely easy feature to implement. You could implement it as a standalone middleware but I chose to combine it with the revoverability middleware to keep the picture complete.&lt;/p&gt;
&lt;pre&gt;&lt;code&gt;public async Task Invoke(FunctionContext context, FunctionExecutionDelegate next)
{
	try
	{
	    await next(context);

        await Audit(message, context);
	}
	catch (AggregateException exception)
	{
	    // Recoverability, omitted for clarity
	}
}
&lt;/code&gt;&lt;/pre&gt;
&lt;p&gt;The implementation for auditing is just sending a message to the queue chosen to be the audit queue. Similar to the centralized error queue.&lt;/p&gt;
&lt;pre&gt;&lt;code&gt;private async Task Audit(ServiceBusReceivedMessage message, FunctionContext context)
{
    var auditMessage = new ServiceBusMessage(message);

    auditMessage.ApplicationProperties[&amp;quot;Endpoint&amp;quot;] = context.FunctionDefinition.Name;

    await using var serviceBusSender = serviceBusClient.CreateSender(&amp;quot;audit&amp;quot;);

    await serviceBusSender.SendMessageAsync(auditMessage);
}
&lt;/code&gt;&lt;/pre&gt;
&lt;p&gt;Notice the custom header &lt;code&gt;&amp;quot;Endpoint&amp;quot;&lt;/code&gt;. This is intentional to keep track of the endpoint/function that has successfully processed the message that got audited. While there is additional information that could be propagated with the audited message, this is enough for a basic audit trail.&lt;/p&gt;
</description><pubDate>Fri, 03 Nov 2023 17:29:00 GMT</pubDate><guid isPermaLink="true">https://weblogs.asp.net:443/sfeldman/auditing-with-azure-functions</guid></item><item><title>Recoverability with Azure Functions - Delayed Retries</title><link>https://weblogs.asp.net:443/sfeldman/recoverability-with-azure-functions-delayed-retries</link><description>&lt;p&gt;In the &lt;a href="https://weblogs.asp.net/sfeldman/recoverability-with-azure-functions"&gt;previous post&lt;/a&gt;, I showed how to implement basic recoverability with Azure Functions and Service Bus. In this post, I'm going to expand on the idea and demonstrate how to implement a back-off strategy.&lt;/p&gt;
&lt;h2&gt;Back-off strategy&lt;/h2&gt;
&lt;p&gt;A backoff strategy is intended to help with intermittent failures when immediate subsequent retries will suffice due to the required resources not being available within a short period but having a high probability of being back online after a short timeout. This is also known as delayed retries, when retries are attempted after a certain time (delay) to increase the chances of succeeding rather than bombarding with immediate retries and risking failing all the attempts within a short period.&lt;/p&gt;
&lt;h2&gt;Implementation&lt;/h2&gt;
&lt;p&gt;For delayed retries, we'll set an arbitrary number. Let's call it &lt;code&gt;NumberOfDelayedRetries&lt;/code&gt;. The number could be hardcoded or taken from the configuration. The idea is to represent with this number how many delayed retry attempts there will be. Setting it to 0 would disable delayed retries altogether.&lt;/p&gt;
&lt;p&gt;Delayed retries should kick in when the immediate retries are all exhausted. With Azure Service Bus, immediate retries are fairly simple to implement - Service Bus does that for us with the &lt;code&gt;DeliveryCount&lt;/code&gt; on the given message. Unfortunately, today, there's no way to achieve the same with the native message. This will change in the future when there will be the ability to &lt;a href="https://github.com/Azure/azure-service-bus/issues/454"&gt;abandon a message with a custom timespan&lt;/a&gt;. Until then, some custom code will be required to mimic this behaviour.&lt;/p&gt;
&lt;h3&gt;Delayed retry logic&lt;/h3&gt;
&lt;p&gt;Whenever all immediate retries are exhausted, a message should go back to the queue and be delayed (scheduled) for a later time to be received. The problem with this approach is that we could exceed the &lt;code&gt;MaxDeliveryCount&lt;/code&gt; that's there to protect from infinite processing. Sending back the same message also won't work due to the reason explained above (service limitation). So we'll cheat.&lt;/p&gt;
&lt;p&gt;The incoming failing message will be cloned. And when cloned, we'll add a header, let's say &lt;code&gt;&amp;quot;Error.DelayedRetries&amp;quot;&lt;/code&gt;. And each time we want to increase the number of attempted delayed retries, we'll read the original incoming message's header and increase it by one for the cloned message. The first time, there will be no such header, so we need to account for that. As long as we need to proceed with the delayed retries, we'll be completing the original incoming message. That's why logging at this point is important.&lt;/p&gt;
&lt;pre&gt;&lt;code&gt;public async Task Invoke(FunctionContext context, FunctionExecutionDelegate next)
{
	try
	{
		await next(context);
	}
	catch (AggregateException exception)
	{
		BindingMetadata meta = context.FunctionDefinition.InputBindings.FirstOrDefault(b =&amp;gt; b.Value.Type == &amp;quot;serviceBusTrigger&amp;quot;).Value;
		var input = await context.BindInputAsync&amp;lt;ServiceBusReceivedMessage&amp;gt;(meta);
		var message = input.Value ?? throw new Exception($&amp;quot;Failed to send message to error queue, message was null. Original exception: {exception.Message}&amp;quot;, exception);

		if (message.DeliveryCount &amp;lt;= 5)
		{
			logger.LogDebug(&amp;quot;Failed processing message {MessageId} after {Attempt} time, will retry&amp;quot;, message.MessageId, message.DeliveryCount);

			throw;
		}
		
		#region Delayed Retries
		
		var retries = message.GetNumberOfAttemptedDelayedRetries();

		if (retries &amp;lt; NumberOfDelayedRetries)
		{
			var retriedMessage = message.CloneForDelayedRetry(retries + 1);

			await using var senderRetries = serviceBusClient.CreateSenderFor(Enum.Parse&amp;lt;Endpoint&amp;gt;(context.FunctionDefinition.Name));
			await senderRetries.ScheduleMessageAsync(retriedMessage, DateTimeOffset.UtcNow.Add(DelayedRetryBackoff));

			logger.LogWarning(&amp;quot;Message ID {MessageId} failed all immediate retries. Will perform a delayed retry #{Attempt} in {Time}&amp;quot;, message.MessageId, retries + 1, DelayedRetryBackoff);
			return;
		}
		
		#endregion

		// TODO: remove when fixed https://github.com/Azure/azure-functions-dotnet-worker/issues/993
		var specificException = GetSpecificException(exception);
		var failedMessage = message.CloneForError(context.FunctionDefinition.Name, specificException);
		var sender = serviceBusClient.CreateSenderFor(Endpoint.Error);
		await sender.SendMessageAsync(failedMessage);

		logger.LogError(&amp;quot;Message ID {MessageId} failed processing and was moved to the error queue&amp;quot;, message.MessageId);
	}
}
&lt;/code&gt;&lt;/pre&gt;
&lt;p&gt;And that's all there is. The extension methods &lt;code&gt;GetNumberOfAttemptedDelayedRetries()&lt;/code&gt; and &lt;code&gt;CloneForDelayedRetry()&lt;/code&gt; are provided below for reference.&lt;/p&gt;
&lt;pre&gt;&lt;code&gt;public static int GetNumberOfAttemptedDelayedRetries(this ServiceBusReceivedMessage message)
{
	message.ApplicationProperties.TryGetValue(&amp;quot;Error.DelayedRetries&amp;quot;, out object? delayedRetries);

	return delayedRetries is null ? 0 : (int)delayedRetries;
}

public static ServiceBusMessage CloneForDelayedRetry(this ServiceBusReceivedMessage message, int attemptedDelayedRetries)
{
	message.ApplicationProperties.TryGetValue(&amp;quot;Error.OriginalMessageId&amp;quot;, out var value);
	var originalMessageId = value is null ? message.MessageId : value.ToString();

	var error = new ServiceBusMessage(message)
	{
		ApplicationProperties =
		{
			[&amp;quot;Error.DelayedRetries&amp;quot;]    = attemptedDelayedRetries,
			[&amp;quot;Error.OriginalMessageId&amp;quot;] = originalMessageId
		},
		// TODO: remove when https://github.com/Azure/azure-sdk-for-net/issues/38875 is addressed
		TimeToLive = TimeSpan.MaxValue
	};

	return error;
}
&lt;/code&gt;&lt;/pre&gt;
&lt;p&gt;Notice the &lt;code&gt;&amp;quot;Error.OriginalMessageId&amp;quot;&lt;/code&gt; header. It is helpful to correlate the original Service Bus message to the delayed retried messages as those are physically different messages.&lt;/p&gt;
&lt;p&gt;&lt;img src="https://aspblogs.blob.core.windows.net:443/media/sfeldman/2023/azure-functions-recoverability-delayed-retries/image.png" alt="message" /&gt;&lt;/p&gt;
&lt;p&gt;Et voilà! We've got ourselves a nice recoverability with immediate and delayed retries to help deal with intermittent errors and temporary failures.&lt;/p&gt;
&lt;p&gt;&lt;img src="https://aspblogs.blob.core.windows.net:443/media/sfeldman/2023/azure-functions-recoverability-delayed-retries/image-1.png" alt="screenshot" /&gt;&lt;/p&gt;
&lt;h2&gt;Auditing&lt;/h2&gt;
&lt;p&gt;In the next post, I'll demonstrate how we can implement the audit trail of the successfully processed messages to complete the entire picture of all messages processed with Azure Functions.&lt;/p&gt;
</description><pubDate>Fri, 03 Nov 2023 05:14:00 GMT</pubDate><guid isPermaLink="true">https://weblogs.asp.net:443/sfeldman/recoverability-with-azure-functions-delayed-retries</guid><category>Functions</category><category>AzureServiceBus</category></item><item><title>Recoverability with Azure Functions</title><link>https://weblogs.asp.net:443/sfeldman/recoverability-with-azure-functions</link><description>&lt;p&gt;When working with Azure Service Bus triggers and Functions, the recoverability story is not the best with the out-of-box implementation. To understand the challenges with the built-in recoverability and how to overcome those, this post will dive into the built-in recoverability with Azure Functions for Service Bus queues and subscriptions, offering an alternative. But first, what is recoverability?&lt;/p&gt;
&lt;blockquote&gt;
&lt;p&gt;Recoverability in messaging refers to a messaging system's ability to ensure that messages are reliably delivered even in the presence of failures or disruptions. It involves message persistence, acknowledgments, message queues, redundancy, failover mechanisms, and retry strategies to guarantee message delivery and prevent data loss. This is vital for applications where message loss can have serious consequences.&lt;/p&gt;
&lt;/blockquote&gt;
&lt;p&gt;With Azure Service Bus, recoverability is provided with &lt;code&gt;MaxDeliveryCount&lt;/code&gt; and a dead-letter queue. To be more specific, a message is delivered at least &lt;code&gt;MaxDeliveryCount&lt;/code&gt; time and, upon further failure, when re-delivered, will be moved to a special dead-letter sub-queue. Azure Functions leverage that feature to retry messages. However, there are a few issues with this approach.&lt;/p&gt;
&lt;ol&gt;
&lt;li&gt;Retries are immediate&lt;/li&gt;
&lt;li&gt;Upon final failure, the dead-lettered message has no information to assist in troubleshooting.&lt;/li&gt;
&lt;/ol&gt;
&lt;p&gt;Let's dive into those issues to see what can be done.&lt;/p&gt;
&lt;p&gt;As part of processing a message, we must contact a 3rd part API. But, for some reason, despite the promised up-time of 99.9%, we hit an error. As a result of that error, the message processing will throw an exception, and the message will be re-delivered. It will be attempted as many times as the value of &lt;code&gt;MaxDeliveryCount&lt;/code&gt; defined on the entity used to trigger the function. If it's set to 10, that would be 10 retries one after another. Or 10 &lt;em&gt;immediate&lt;/em&gt; retries. That's not a small number of attempts. But if the problem persists, the message will be dead-lettered to allow the Function processing of other messages. Which is good. But when we need to understand what happened with the message at the time of the failure, we'll have a hard time. When a message is dead-lettered, the reason for dead-lettering will only contain the benign reason: the maximum delivery count has been exceeded. Not very helpful. Gladly, there are Application Insights and logged errors that could be correlated to the errors that have occurred and hopefully link between the dead-lettered message(s) and the logged exception(s). But wouldn't it be simpler to look at the message and know &lt;em&gt;exactly&lt;/em&gt; the reason why it failed?&lt;/p&gt;
&lt;p&gt;Thanks to the Isolated Worker SDK, we can do that. Similar to frameworks such as NServiceBus and MassTransit, we can enable recoverability with Azure Functions and make our prod-ops life easier. So, let's build that recoverability!&lt;/p&gt;
&lt;h2&gt;Centralized error queue&lt;/h2&gt;
&lt;p&gt;Unlike &lt;a href="https://weblogs.asp.net/sfeldman/centralized-dead-letter-queue-with-azure-service-bus"&gt;centralized dead-letter queue&lt;/a&gt;, a centralized error queue is an arbitrary queue that we'll add to the topology to store any messages that would typically go to the dead-letter sub-queue per entity. I.e. we won't allow &lt;code&gt;MaxDeliveryCount&lt;/code&gt; executions for the message to be dead-lettered. Instead, we'll ensure we attempt a message no more than N times, &lt;strong&gt;moving&lt;/strong&gt; it to the error queue afterwards. For the sake of the exercise, I'll use a queue called &lt;code&gt;error&lt;/code&gt;.&lt;/p&gt;
&lt;h2&gt;Middleware&lt;/h2&gt;
&lt;p&gt;To implement recoverability, a Funcitons Isolated Worker SDK is required as it supports the concept of middleware (think pipeline). Below is a high-level implementation to elaborate on the approach. You'll need some package references, but the idea is what's important. We're getting closer!&lt;/p&gt;
&lt;pre&gt;&lt;code&gt;public class Program
{
    public static void Main()
    {
        var host = new HostBuilder()
            .ConfigureFunctionsWorkerDefaults(builder =&amp;gt;
            {
                builder.UseWhen&amp;lt;ServiceBusMiddleware&amp;gt;(Is.ServiceBusTrigger); // Up-vote https://github.com/Azure/azure-functions-dotnet-worker/issues/1999 😉
            })
            .ConfigureServices((builder, services) =&amp;gt;
            {
                var serviceBusConnectionString = Environment.GetEnvironmentVariable(&amp;quot;AzureServiceBus&amp;quot;);
                if (string.IsNullOrEmpty(serviceBusConnectionString))
                {
                    throw new InvalidOperationException(&amp;quot;Specify a valid AzureServiceBus connection string in the Azure Functions Settings or your local.settings.json file.&amp;quot;);
                }

                // This can also be done with the AddAzureClients() API
                services.AddSingleton(new ServiceBusClient(serviceBusConnectionString));
            })
            .Build();

        host.Run();
    } 
&lt;/code&gt;&lt;/pre&gt;
&lt;p&gt;The main focus is the &lt;code&gt;ServiceBusMiddleware&lt;/code&gt; class, where the recoverability logic will be found. In a few words, we'll try to execute the functions, &lt;code&gt;await next(context)&lt;/code&gt; call. If it throws, function invocation has failed and will be retried. Except we'll intercept that, and based on how many retries we allow, we'll decide wherever to rethrow or move the message to the centralized error queue. Note that we don't actually move the message. Instead, we clone it, complete the original message by swallowing the exception and sending the clone to the error queue. On top of that, we'll add the exception details to the cloned message to allow easier troubleshooting by inspecting the message headers. This will help the prod-ops to understand better why a message has failed by looking at the exception stack trace and exception details. Message payload, along with the error, can also be very helpful in solving the issue.&lt;/p&gt;
&lt;pre&gt;&lt;code&gt;internal class ServiceBusMiddleware : IFunctionsWorkerMiddleware
{
    private readonly ILogger&amp;lt;ServiceBusMessage&amp;gt; logger;
    private readonly ServiceBusClient serviceBusClient;

    public ServiceBusMiddleware(ServiceBusClient serviceBusClient, ILogger&amp;lt;ServiceBusMessage&amp;gt; logger)
    {
        this.serviceBusClient = serviceBusClient;
        this.logger           = logger;
    }

    public async Task Invoke(FunctionContext context, FunctionExecutionDelegate next)
    {
        try
        {
            await next(context);
        }
        catch (AggregateException exception)
        {
            BindingMetadata meta = context.FunctionDefinition.InputBindings.FirstOrDefault(b =&amp;gt; b.Value.Type == &amp;quot;serviceBusTrigger&amp;quot;).Value;
            var input = await context.BindInputAsync&amp;lt;ServiceBusReceivedMessage&amp;gt;(meta);
            var message = input.Value ?? throw new Exception($&amp;quot;Failed to send message to error queue, message was null. Original exception: {exception.Message}&amp;quot;, exception);

            if (message.DeliveryCount &amp;lt;= 5)
            {
                logger.LogDebug(&amp;quot;Failed processing message {MessageId} after {Attempt} time, will retry&amp;quot;, message.MessageId, message.DeliveryCount);

                throw;
            }

            // TODO: remove when fixed https://github.com/Azure/azure-functions-dotnet-worker/issues/993
            var specificException = GetSpecificException(exception);
            var failedMessage = message.CloneForError(context.FunctionDefinition.Name, specificException);
            var sender = serviceBusClient.CreateSenderFor(Endpoint.Error);
            await sender.SendMessageAsync(failedMessage);

            logger.LogError(&amp;quot;Message ID {MessageId} failed processing and was moved to the error queue&amp;quot;, message.MessageId);
        }
    }

    static Exception GetSpecificException(AggregateException exception) =&amp;gt; exception.Flatten().InnerExceptions.FirstOrDefault()?.InnerException ?? exception;
}
&lt;/code&gt;&lt;/pre&gt;
&lt;p&gt;What about Functions? That's the great part. Every Function triggered by Azure Service Bus messages will be covered. No more need to catch exceptions and handle those.&lt;/p&gt;
&lt;h2&gt;Result&lt;/h2&gt;
&lt;p&gt;What does it look like in action? Sending a message that will continuously fail all 5 retries will cause the message to be &amp;quot;moved&amp;quot; into the error queue.&lt;/p&gt;
&lt;p&gt;&lt;img src="https://aspblogs.blob.core.windows.net:443/media/sfeldman/2023/azure-functions-recoverability/error-message.jpg" alt="failed message" /&gt;&lt;/p&gt;
&lt;p&gt;I've decided to provide the failed function name as &lt;code&gt;Error.FailedQ&lt;/code&gt; to identify what queue/Function has failed. Stack trace and error message to have the details. Straightforward and very helpful when handling failed messages.&lt;/p&gt;
&lt;h2&gt;Back-off retries (delayed retries)&lt;/h2&gt;
&lt;p&gt;In the next post, we'll cover delayed retries to make recoverability even more robust.&lt;/p&gt;
</description><pubDate>Tue, 31 Oct 2023 02:33:14 GMT</pubDate><guid isPermaLink="true">https://weblogs.asp.net:443/sfeldman/recoverability-with-azure-functions</guid><category>Azure</category><category>Functions</category><category>AzureServiceBus</category></item><item><title>Azure Blob Storage Cold Tier</title><link>https://weblogs.asp.net:443/sfeldman/azure-blob-storage-cold-tier</link><description>&lt;p&gt;Azure Storage service is the foundational building block in cloud architecture. Cheap, reliable, resilient, and powerful. From small solutions to monster systems, Blob service, in particular, is convenient. Any system that involves any type of document slowly but steadily has the number of blobs/files growing over time. Be it specific business requirements or legal aspects, blobs must be kept around for some time. But not all blobs are equal.&lt;/p&gt;
&lt;p&gt;Blobs has had the concept of tiers for quite a while. Two tiers that are opposite extremes are Hot and Archive. The Hot tier is fast and inexpensive to access but more expensive to store. The Archive tier is inexpensive to store, but when it comes to reading and writing, let's say it's not a good idea. For a while, there was also the Cool tier. A middle ground if you wish. Blobs that might be accessed but very infrequently.&lt;/p&gt;
&lt;p&gt;Recently, there's even more granularity when it comes to tiers. The Cold tier. The Cold tier is positioned between the Cool and Archive, adding more cost-effectiveness to storing blobs.&lt;/p&gt;
&lt;p&gt;So how do you choose which tier is the right tear for the problem?&lt;/p&gt;
&lt;p&gt;Understand the business needs. How blobs will be used. Plan accordingly. In many cases, blobs must be frequently accessed initially and then progress into the next, cooler tier, depending on the business rules. Microsoft recommended strategy is the &lt;a href="https://learn.microsoft.com/en-us/azure/storage/blobs/access-tiers-overview#summary-of-access-tier-options"&gt;following&lt;/a&gt;.&lt;/p&gt;
&lt;ul&gt;
&lt;li&gt;&lt;strong&gt;Cool&lt;/strong&gt; tier: minimum retention of 30 days&lt;/li&gt;
&lt;li&gt;&lt;strong&gt;Cold&lt;/strong&gt; tier: minimum retention of 90 days&lt;/li&gt;
&lt;li&gt;&lt;strong&gt;Archive&lt;/strong&gt; tier: minimum retention of 180 days&lt;/li&gt;
&lt;/ul&gt;
&lt;p&gt;This doesn't mean you absolutely must follow this recommendation. What if your blobs are stored and never accessed? Or stored and might be accessed at any time?&lt;/p&gt;
&lt;p&gt;This is where &lt;a href="https://learn.microsoft.com/en-us/azure/storage/blobs/lifecycle-management-policy-configure"&gt;Blob Lifecycle Management Policies&lt;/a&gt; are so handy. For example, let's say I'd like to reduce the cost of keeping blobs from day one but have the option to access those. I.e. not fully archived. The following policy would help with that by moving all blobs (including the existing ones) to the new Cold tier right away (some delay is expected as Storage service runs this not in real-time).&lt;/p&gt;
&lt;pre&gt;&lt;code&gt;{
  &amp;quot;rules&amp;quot;: [
    {
      &amp;quot;enabled&amp;quot;: true,
      &amp;quot;name&amp;quot;: &amp;quot;To-Cold&amp;quot;,
      &amp;quot;type&amp;quot;: &amp;quot;Lifecycle&amp;quot;,
      &amp;quot;definition&amp;quot;: {
        &amp;quot;actions&amp;quot;: {
          &amp;quot;baseBlob&amp;quot;: {
            &amp;quot;tierToCold&amp;quot;: {
              &amp;quot;daysAfterModificationGreaterThan&amp;quot;: 0
            }
          }
        },
        &amp;quot;filters&amp;quot;: {
          &amp;quot;blobTypes&amp;quot;: [
            &amp;quot;blockBlob&amp;quot;
          ],
          &amp;quot;prefixMatch&amp;quot;: [
            &amp;quot;masters/&amp;quot;
          ]
        }
      }
    }
  ]
}
&lt;/code&gt;&lt;/pre&gt;
&lt;p&gt;This will allow much lower storage costs. Remember, there will be higher access and transaction costs when blobs are accessed. The difference is that these blobs will be available &lt;strong&gt;immediately&lt;/strong&gt; and not &lt;strong&gt;eventually&lt;/strong&gt;, as they would be with the Archived tier.&lt;/p&gt;
</description><pubDate>Wed, 16 Aug 2023 04:31:46 GMT</pubDate><guid isPermaLink="true">https://weblogs.asp.net:443/sfeldman/azure-blob-storage-cold-tier</guid></item><item><title>Azure Function: One Line of Insanity</title><link>https://weblogs.asp.net:443/sfeldman/azure-function-a-single-line-to-drive-you-nuts</link><description>&lt;!-- ![image][2] --&gt;
&lt;p&gt;Azure Functions Isolated Worker SDK is an easy-to-set-up and get-running framework.
The minimal &lt;code&gt;Progarm.cs&lt;/code&gt; is hard to mess up.&lt;/p&gt;
&lt;pre&gt;&lt;code&gt;var host = new HostBuilder()
    .ConfigureFunctionsWorkerDefaults()
    .Build();

await host.RunAsync();
&lt;/code&gt;&lt;/pre&gt;
&lt;p&gt;Right? Except when it's not. The extension method, &lt;code&gt;ConfigureFunctionsWorkerDefaults&lt;/code&gt; is a critical piece of code that &lt;strong&gt;has&lt;/strong&gt; to be invoked, or the generic host will start, but nothing will be wired up. When it's just a few lines, it's not hard to miss if the call is accidentally omitted. But it's less noticeable if that's an average Functions Application with several things configured, such as dependency services and configurations.&lt;/p&gt;
&lt;p&gt;And that's the situation I found myself in. While performing code refactoring, I unintentionally deleted the invocation of ConfigureFunctionsWorkerDefaults. Surprisingly, there were no compilation errors or startup issues. However, an unexpected problem arose: binding a configuration file to one of my custom configuration classes failed. This raised eyebrows. When I examined the configuration providers, I immediately noticed that the environment variables provider was absent, which should have been included by default. At this point, I realized that I had accidentally eliminated the entire startup process of the Isolated Worker by inadvertently omitting that crucial extension method call.&lt;/p&gt;
&lt;p&gt;Even more ironic is that I commented on a &lt;a href="https://github.com/Azure/azure-functions-dotnet-worker/issues/1347"&gt;similar issue&lt;/a&gt; about six months ago. Same setup, same problem. And the suggestion I made back then would help me today - an analyzer that ensures &lt;code&gt;ConfigureFunctionsWorkerDefaults&lt;/code&gt; is not removed accidentally.&lt;/p&gt;
&lt;p&gt;Why do I still think that analyzer could be helpful? No one wants to remember special methods to be called. The point in the case is the class below.&lt;/p&gt;
&lt;pre&gt;&lt;code&gt;class Demo
{
  public void Initialize() { // important initialization here }

  public void DoSomething() {}
}
&lt;/code&gt;&lt;/pre&gt;
&lt;p&gt;To use &lt;code&gt;d&lt;/code&gt;, it needs to be initialized.&lt;/p&gt;
&lt;pre&gt;&lt;code&gt;var d = new Demo();
d.Initialize();
d.DoSomehting();
&lt;/code&gt;&lt;/pre&gt;
&lt;p&gt;From the class itself, it is not apparent that &lt;code&gt;Initialize()&lt;/code&gt; has to take place, and it is easy to omit the call. That's why &lt;code&gt;DoSomething()&lt;/code&gt; is likely to validate if the initialization took place.&lt;/p&gt;
&lt;pre&gt;&lt;code&gt;class Demo
{
  private bool initialized;
  
  public void Initialize()
  {
    // important initialization here 
    initialized = true;
  }
  
  private CheckWasInitialized()
  {
    if (initialized == false)
    {
      throw new Exception(&amp;quot;Initialization did not occur. Call Initialize() first&amp;quot;);
    }
  }

  public void DoSomething()
  {
    CheckWasInitialized();
    // logic
  }
}
&lt;/code&gt;&lt;/pre&gt;
&lt;p&gt;Not the most elegant approach, but you get the idea. Trying to use an instance of &lt;code&gt;Demo&lt;/code&gt; without going through initialization will cause an exception to be thrown.&lt;/p&gt;
&lt;p&gt;However, achieving this goal using ConfigureFunctionsWorkerDefaults is currently not feasible. If the complete initialization of Azure Functions relies on this method, it would be desirable to implement a protective measure that guarantees its presence. One potential solution could involve utilizing a Roslyn analyzer to verify the method's existence. This might appear excessive at first glance, but this precaution could be worthwhile considering the potential consequences of removing a single line of code, which could bring down the entire function app without a clear indication of the issue. Currently, prioritizing stability and error prevention is paramount when compensation is no longer tied to the number of code lines produced.&lt;/p&gt;
</description><pubDate>Sat, 12 Aug 2023 22:48:00 GMT</pubDate><guid isPermaLink="true">https://weblogs.asp.net:443/sfeldman/azure-function-a-single-line-to-drive-you-nuts</guid><category>Functions</category></item><item><title>Why Event Sourcing?</title><link>https://weblogs.asp.net:443/sfeldman/why-event-sourcing</link><description>&lt;p&gt;&lt;img src="https://aspblogs.blob.core.windows.net:443/media/sfeldman/2023/why-event-sourcing/refill.jpg" alt="refill" /&gt;&lt;/p&gt;
&lt;h1&gt;Some context&lt;/h1&gt;
&lt;p&gt;I've seen software systems built since 2001. My first exposure was to classic ASP and VB6 applications with traditional state-based architecture. As someone new to software development, I was both fascinated by the use of data stores such as SQL Server to persist the vast amounts of data and horrified by the ease of irreversible mistakes that could take place. I should be honest; that &lt;strong&gt;took&lt;/strong&gt; place when I accidentally ran some SQL update statements against the wrong database. Glorious days of a newbie developer at a startup company. I learned quickly that a safe strategy includes backing up data frequently.&lt;/p&gt;
&lt;p&gt;Twenty years later, I realized it could have been a &lt;em&gt;safe&lt;/em&gt; strategy. Still, it wasn't a good solution, to begin with, for a domain that involves business applications. So let's dig into the details.&lt;/p&gt;
&lt;h1&gt;State-based Application lie&lt;/h1&gt;
&lt;p&gt;I've been with a particular pharmacy for a long time. We moved a lot, and with each address change, the local pharmacy has always registered my new address and used that to confirm my identity each time I picked up a prescription. I had lived for over 5 years at the current address, so you can imagine my surprise when I failed the identity verification during a routine prescription pick-up. I'm older these days, but still not at the stage where I forget my address. So that was quite confusing.&lt;/p&gt;
&lt;p&gt;When asked if I could provide a different address, I returned to the memory lane to the previous address. But, to my even bigger surprise, that wasn't the address on the file either. So I asked the pharmacist if they happened to store more than one address, trying the theory that the pharmacy system is not showing the default address. But no, the system only has a single address. So now I was really puzzled. But just for kicks, I gave it the address I had over 15 years ago. And bingo! My identity was confirmed. But let's unpack what has happened here.&lt;/p&gt;
&lt;p&gt;The address in the system has changed. Obviously, somewhere in the pharmacy's system &lt;em&gt;something&lt;/em&gt; changed my address from the current to the one I had over 15 years ago. But what was that event? Would it be possible to look at a log to determine what happened? If I file a complaint, would it be possible to find out what happened? And how did the dormant address for 15 years miraculously get resurrected, replacing my active address? A mystery surrounded by more guesses than answers.&lt;/p&gt;
&lt;h1&gt;Operating within the constraints&lt;/h1&gt;
&lt;p&gt;So how would a &amp;quot;mystery&amp;quot; of this sort get approached in a conventional system? Logs. Let's look into logs to see what has happened. That's assuming logs account for such a scenario and log the details. But it's virtually impossible to log absolutely every permutation in the system.&lt;/p&gt;
&lt;p&gt;Track database changes! See if there's anything in the data. Well, that's not a trivial exercise, either. Assuming data changes are captured. And let's assume those data changes are captured; what's the context? What was the &lt;strong&gt;event&lt;/strong&gt; that took place that caused the system to start using that 15yo address? Cricket.&lt;/p&gt;
&lt;h1&gt;What's a better approach?&lt;/h1&gt;
&lt;p&gt;This is the question I've been toying around for quite a while. My career has taken me from business applications to libraries and back. And it was the second round when I started questioning the state-based approach and the conventional architecture. This is where I got to return to the idea of Event Sourcing and re-evaluate the approach. If my &lt;strong&gt;data&lt;/strong&gt; are the events that take place in the system, those events are the authoritative source of truth, not only allowing us to reconstruct the current state but also help understand how that state was derived. And that's a game changer.&lt;/p&gt;
&lt;p&gt;I will save you from the details as plenty of more competent people wrote better posts on the topic. I'll just wrap up with my own experience highlight. Being able to trust the system and understand how it got to the place where it is is invaluable for business applications.&lt;/p&gt;
&lt;p&gt;&lt;img src="https://aspblogs.blob.core.windows.net:443/media/sfeldman/2023/why-event-sourcing/log.jpg" alt="events" /&gt;&lt;/p&gt;
&lt;h1&gt;Great. How do I do it?&lt;/h1&gt;
&lt;p&gt;YMMV. I've started simple. No frameworks and no products. Just Azure Storage Tables to store my events, Azure Service Bus to communicate events for projections (async), and Azure MySQL Flex Server to keep projections for searches and queries. Knowing what I know today, I would probably do that again but choose slightly different services. Nothing works better than building your own to understand the concepts. If you need someone else to take over, consider a framework of some sort.&lt;/p&gt;
</description><pubDate>Wed, 31 May 2023 05:10:00 GMT</pubDate><guid isPermaLink="true">https://weblogs.asp.net:443/sfeldman/why-event-sourcing</guid><category>EventSourcing</category></item><item><title>Manually Completing Service Bus Messages with Functions</title><link>https://weblogs.asp.net:443/sfeldman/manually-completing-service-bus-messages-with-functions</link><description>&lt;p&gt;Message settlement with Azure Service Bus has undergone some changes over the past few iterations of the Service Bus SDK for .NET. In the latest SDK (Azure.Messaging.ServiceBus), the settlement is performed via the &lt;code&gt;ServiceBusReceivedMessage&lt;/code&gt;. In the previous SDK, this was accomplished with the help of the &lt;code&gt;MessageReceiver&lt;/code&gt; object.&lt;/p&gt;
&lt;p&gt;Azure Functions In-Process SDK can disable message auto-completion by specifying &lt;code&gt;AutoComplete = false&lt;/code&gt; on the &lt;code&gt;ServiceBusTrigger&lt;/code&gt;. When auto-completion is disabled, the responsibility to complete (settle) the incoming message is on the function code. Except with the latest SDK, &lt;code&gt;MessageReceiver&lt;/code&gt; is no longer an option. And while the equivalent, &lt;code&gt;ServiceBusReceiver&lt;/code&gt;, seems to be the logical replacement, it is not. Instead, a particular type, &lt;code&gt;ServiceBusMessageActions&lt;/code&gt;*, must be injected and used to settle messages.&lt;/p&gt;
&lt;p&gt;And what about Isolated Worker SDK? Well, not there yet. Hopefully, it &lt;a href="https://github.com/Azure/azure-functions-dotnet-worker/issues/1008"&gt;will be&lt;/a&gt; soon.&lt;/p&gt;
&lt;p&gt;* will require &lt;code&gt;Microsoft.Azure.WebJobs.Extensions.ServiceBus&lt;/code&gt; NuGet package to be added&lt;/p&gt;
</description><pubDate>Tue, 06 Sep 2022 06:27:00 GMT</pubDate><guid isPermaLink="true">https://weblogs.asp.net:443/sfeldman/manually-completing-service-bus-messages-with-functions</guid><category>AzureFunctions</category><category>Azure</category></item><item><title>Updating Azure Functions Tools</title><link>https://weblogs.asp.net:443/sfeldman/updating-azure-functions-tools</link><description>&lt;p&gt;Azure Functions Tools is at the heart of providing local development for Azure Function. Whenever you use Visual Studio, Rider, VS Code, or anything else, you need it to be able to run your bits. For command line folks, the installation process is &lt;a href="https://github.com/Azure/azure-functions-core-tools#installing"&gt;outlined&lt;/a&gt; in the tools repository. For Visual Studio (2022) and Rider, it is less evident as it depends on the tool. So, where am I heading with this? Right, the need to update the Azure Functions Tools.&lt;/p&gt;
&lt;p&gt;Normally, VS and Rider do it automatically. Azure Functions Tools feed (https://functionscdn.azureedge.net/public/cli-feed-v4.json) stored at &lt;code&gt;%LocalAppData%\AzureFunctionsTools&lt;/code&gt; has a JSON feed file, &lt;code&gt;feed-v&amp;lt;sequence-number&amp;gt;.json&lt;/code&gt;, that is periodically updated. This file points to all the necessary information, including the latest version for the version of the function (v4 in my case).&lt;/p&gt;
&lt;pre&gt;&lt;code&gt;&amp;quot;v4&amp;quot;: {
  &amp;quot;release&amp;quot;: &amp;quot;4.20.0&amp;quot;,
  &amp;quot;releaseQuality&amp;quot;: &amp;quot;GA&amp;quot;,
  &amp;quot;hidden&amp;quot;: false
},
&lt;/code&gt;&lt;/pre&gt;
&lt;p&gt;Release points at the Core Tools version&lt;/p&gt;
&lt;pre&gt;&lt;code&gt;&amp;quot;coreTools&amp;quot;: [
    {
      &amp;quot;OS&amp;quot;: &amp;quot;Linux&amp;quot;,
      &amp;quot;Architecture&amp;quot;: &amp;quot;x64&amp;quot;,
      &amp;quot;downloadLink&amp;quot;: &amp;quot;https://functionscdn.azureedge.net/public/4.0.4704/Azure.Functions.Cli.linux-x64.4.0.4704.zip&amp;quot;,
      //...
    },
&lt;/code&gt;&lt;/pre&gt;
&lt;p&gt;When running your Functions project and noticing that the version is falling behind, there are a few things to check:&lt;/p&gt;
&lt;ol&gt;
&lt;li&gt;The feed file. It could be that the feed is stale.&lt;/li&gt;
&lt;li&gt;The tooling in the IDE is not updating.&lt;/li&gt;
&lt;/ol&gt;
&lt;p&gt;For #2, there's a difference between VS and Rider.&lt;/p&gt;
&lt;ul&gt;
&lt;li&gt;Rider will check for a newer version of Azure Functions Tools each time a project is loaded*&lt;/li&gt;
&lt;li&gt;VS will check for a newer version when a new Functions project is &lt;strong&gt;created&lt;/strong&gt;&lt;/li&gt;
&lt;/ul&gt;
&lt;p&gt;*Rider also allows inspecting the version and manually replacing it with another version by going through Settings --&amp;gt; Tools --&amp;gt; Azure --&amp;gt; Functions and configuring Azure Functions Tools location.&lt;/p&gt;
&lt;p&gt;&lt;img src="https://aspblogs.blob.core.windows.net:443/media/sfeldman/2022/updating-azure-functions-tools/image.png" alt="Rider settings screenshot" /&gt;&lt;/p&gt;
&lt;p&gt;With VS, it's not really intuitive. If I work on the same project and do not add new triggers or Funcitons projects to the solution, it can be very confusing. Rider does a better job, no doubt.&lt;/p&gt;
&lt;p&gt;&lt;em&gt;Running the same project before and after adding an &lt;em&gt;additional&lt;/em&gt; Funcitons project just to update the tools.&lt;/em&gt;&lt;/p&gt;
&lt;p&gt;Before:&lt;/p&gt;
&lt;p&gt;&lt;img src="https://aspblogs.blob.core.windows.net:443/media/sfeldman/2022/updating-azure-functions-tools/image-1.png" alt="before" /&gt;&lt;/p&gt;
&lt;p&gt;After:&lt;/p&gt;
&lt;p&gt;&lt;img src="https://aspblogs.blob.core.windows.net:443/media/sfeldman/2022/updating-azure-functions-tools/image-2.png" alt="after" /&gt;&lt;/p&gt;
&lt;p&gt;With this, the version of Tools can always be up-to-date.&lt;/p&gt;
</description><pubDate>Wed, 10 Aug 2022 17:06:00 GMT</pubDate><guid isPermaLink="true">https://weblogs.asp.net:443/sfeldman/updating-azure-functions-tools</guid><category>AzureFunctions</category><category>VS.NET</category><category>Rider</category></item><item><title>Fixing NServiceBus default databus serializer in .NET 6</title><link>https://weblogs.asp.net:443/sfeldman/fixing-nservicebus-default-databus-serializer-in-net-6</link><description>&lt;p&gt;Upgrading to .NET 6, updating all the packages, boosters turned on, launching testing.&lt;/p&gt;
&lt;p&gt;Houston, we've got a problem.&lt;/p&gt;
&lt;blockquote&gt;
&lt;p&gt;System.NotSupportedException:  BinaryFormatter serialization and
deserialization are disabled within this application. See
https://aka.ms/binaryformatter for more information.&lt;/p&gt;
&lt;/blockquote&gt;
&lt;p&gt;Ouch! What just happened? There were no warnings, no obsolete messages, nothing on to the autopsy.&lt;/p&gt;
&lt;p&gt;NServiceBus has a &lt;a href="https://docs.particular.net/nservicebus/messaging/databus/"&gt;data bus&lt;/a&gt; (or a 'databus') feature. The feature implements the &lt;a href="https://docs.microsoft.com/en-us/azure/architecture/patterns/claim-check"&gt;Claim Check pattern&lt;/a&gt; to allow messages to surpass the imposed maximum message size by the underlying messaging technology. The feature serializes the data internally, and the default &lt;code&gt;DefaultDataBusSerializer&lt;/code&gt; uses &lt;code&gt;BinaryFormatter&lt;/code&gt;. Nothing new; it has been &lt;a href="https://github.com/Particular/NServiceBus/blame/a510c214806540d920de10ed81b50f191129fbed/src/databus/NServiceBus.Databus/DefaultDatabusSerializer.cs#L6"&gt;used for years&lt;/a&gt;. Unfortunately, with .NET 5, &lt;code&gt;BinaryFormatter&lt;/code&gt; was deprecated due to a &lt;a href="https://docs.microsoft.com/en-ca/dotnet/standard/serialization/binaryformatter-security-guide"&gt;security risk&lt;/a&gt; it poses. And while you could skip .NET 5 and live with .NET Core 3.1, .NET 6 is breathing down the neck, and an upgrade is imminent.&lt;/p&gt;
&lt;p&gt;There is only one option:&lt;/p&gt;
&lt;ol&gt;
&lt;li&gt;&lt;a href="https://docs.microsoft.com/en-us/dotnet/core/compatibility/core-libraries/5.0/binaryformatter-serialization-obsolete"&gt;Re-enable the binary formatter&lt;/a&gt; 🦨&lt;/li&gt;
&lt;li&gt;Work around the problem until Particular has an official solution&lt;/li&gt;
&lt;/ol&gt;
&lt;p&gt;You read it right. Until an official fix, #2 is the only option that will be compliant with most environments.&lt;/p&gt;
&lt;p&gt;The workaround can be summarized as the following:&lt;/p&gt;
&lt;ul&gt;
&lt;li&gt;Pick serialization&lt;/li&gt;
&lt;li&gt;Replace the default data bus serializer with the custom version&lt;/li&gt;
&lt;li&gt;Deploy&lt;/li&gt;
&lt;/ul&gt;
&lt;h2&gt;Picking serialization&lt;/h2&gt;
&lt;p&gt;I've chosen to go with BSON. The naive implementation is the following:&lt;/p&gt;
&lt;pre&gt;&lt;code&gt;public class BsonDataBusSerializer : IDataBusSerializer
{
    public void Serialize(object databusProperty, Stream stream)
    {
        using var writer = CreateNonClosingStreamWriter(stream);
        using var bsonBinaryWriter = new BsonBinaryWriter(stream);
        BsonSerializer.Serialize(bsonBinaryWriter, databusProperty);
    }

    StreamWriter CreateNonClosingStreamWriter(Stream stream)
        =&amp;gt; new(stream, Encoding.UTF8, bufferSize: 1024, leaveOpen: true);

    public object Deserialize(Stream stream)
    {
        using var bsonBinaryReader = new BsonBinaryReader(stream);
        return BsonSerializer.Deserialize&amp;lt;object&amp;gt;(bsonBinaryReader);
    }
}
&lt;/code&gt;&lt;/pre&gt;
&lt;h2&gt;Replacing the default data bus serializer&lt;/h2&gt;
&lt;p&gt;&lt;strong&gt;Update&lt;/strong&gt;: there's a cleaner option. Skip to the I Need a Better Option section below.&lt;/p&gt;
&lt;p&gt;One of the things I wanted to avoid is sprinkling the code-base with the replacement code in various projects that use NServiceBus. So rather than going to the multiple places and having to register the workaround in the following way:&lt;/p&gt;
&lt;pre&gt;&lt;code&gt;// TODO: required workaround for issue (link). Remove when fixed.
endpoint.AdvancedConfiguration.RegisterComponents(c =&amp;gt; 
    c.RegisterSingleton&amp;lt;IDataBusSerializer&amp;gt;(new BsonDataBusSerializer()));
&lt;/code&gt;&lt;/pre&gt;
&lt;p&gt;A perfect candidate would be using an auto-registered &lt;a href="https://docs.particular.net/nservicebus/pipeline/features"&gt;features&lt;/a&gt; feature. A feature could be a part of the Shared solution that all endpoints are using and would automatically replace the data bus serializer w/o any endpoints having to do anything in the configuration code.&lt;/p&gt;
&lt;pre&gt;&lt;code&gt;internal class BsonDataBusSerializerFeature : Feature
{
    public BsonDataBusSerializerFeature()
    {
        DependsOn&amp;lt;NServiceBus.Features.DataBus&amp;gt;();

        EnableByDefault();
    }

    protected override void Setup(FeatureConfigurationContext context)
    {
        if (context.Container.HasComponent&amp;lt;IDataBusSerializer&amp;gt;())
        {
           // ???. Remove(defaultDataBusSerializer);
        }
        context.Container.ConfigureComponent&amp;lt;IDataBusSerializer&amp;gt;(_ =&amp;gt; 
              new BsonDataBusSerializer(), DependencyLifecycle.SingleInstance);
    }
}
&lt;/code&gt;&lt;/pre&gt;
&lt;p&gt;Except there's no way to achieve that with NServiceBus &lt;em&gt;today&lt;/em&gt;. The &lt;code&gt;IServiceCollection&lt;/code&gt; is adapted into NServiceBus &lt;code&gt;ServiceCollectionAdapter&lt;/code&gt;, which doesn't provide a way to remove any previously registered services as one can do with a plain &lt;code&gt;IServiceCollection&lt;/code&gt;. More details &lt;a href="https://github.com/Particular/NServiceBus/issues/6374#issuecomment-1119799315"&gt;here&lt;/a&gt;.&lt;/p&gt;
&lt;h2&gt;Workaround for the workaround&lt;/h2&gt;
&lt;p&gt;This part might be a bit smelly, but it's the necessary evil. NServiceBus adapts &lt;code&gt;IServiceCollection&lt;/code&gt; and keeps a reference as a private member field. With some reflection, we can get hold of the service collection and purge the default &lt;code&gt;IDataBusSerializer&lt;/code&gt; implementation to ensure it's not registered and &lt;a href="https://github.com/Particular/NServiceBus/issues/6374#issuecomment-1114447110"&gt;resolved first&lt;/a&gt;.&lt;/p&gt;
&lt;pre&gt;&lt;code&gt;protected override void Setup(FeatureConfigurationContext context)
{
	if (context.Container.HasComponent&amp;lt;IDataBusSerializer&amp;gt;())
	{
		var serviceCollection = context.Container.GetFieldValue&amp;lt;IServiceCollection&amp;gt;(&amp;quot;serviceCollection&amp;quot;);

		if (serviceCollection is not null)
		{
			var defaultDataBusSerializer = serviceCollection.FirstOrDefault(descriptor =&amp;gt;
                       descriptor.ServiceType == typeof(IDataBusSerializer));

			if (defaultDataBusSerializer is not null)
			{
				serviceCollection.Remove(defaultDataBusSerializer);
			}
		}
	}

	context.Container.ConfigureComponent&amp;lt;IDataBusSerializer&amp;gt;(_ =&amp;gt; 
             new BsonDataBusSerializer(), DependencyLifecycle.SingleInstance);
}
&lt;/code&gt;&lt;/pre&gt;
&lt;p&gt;With a slight modification to the &lt;code&gt;Setup&lt;/code&gt; method, the feature is now ready to be used!&lt;/p&gt;
&lt;h2&gt;I Need a Better Option&lt;/h2&gt;
&lt;p&gt;And as was &lt;a href="https://github.com/Particular/NServiceBus/issues/6374#issuecomment-1129634358"&gt;pointed out&lt;/a&gt; by Particular, there's an option to register a custom data bus serializer earlier than Core does it, removing the need in reflection. The feature could be replaced by an &lt;a href="https://docs.particular.net/nservicebus/lifecycle/ineedinitialization"&gt;&lt;code&gt;INeedInitialization&lt;/code&gt; component&lt;/a&gt;, which is invoked &lt;em&gt;before&lt;/em&gt; endpoint creation and initialization.&lt;/p&gt;
&lt;pre&gt;&lt;code&gt;public class ReplaceDefaultDataBusSerializer : INeedInitialization
{
  public void Customize(EndpointConfiguration endpointConfiguration)
  {
    endpointConfiguration.RegisterComponents(components =&amp;gt;
      components.RegisterSingleton&amp;lt;IDataBusSerializer&amp;gt;(new BsonDataBusSerializer()));
  }
}
&lt;/code&gt;&lt;/pre&gt;
&lt;h2&gt;Deploying&lt;/h2&gt;
&lt;p&gt;A word of caution for the solutions using one of these features in combination with data bus:&lt;/p&gt;
&lt;ul&gt;
&lt;li&gt;Events&lt;/li&gt;
&lt;li&gt;Delayed messages&lt;/li&gt;
&lt;/ul&gt;
&lt;p&gt;You will need to tread carefully. The migration is not a simple data bus serializer replacement in these scenarios. It has to cater to the fact that messages serialized with &lt;code&gt;BinaryFormatter&lt;/code&gt; could be processed by the endpoints converted to use the new serialization. Subscribing to the &lt;a href="https://github.com/Particular/NServiceBus/issues/6058"&gt;issue&lt;/a&gt; on this topic is probably a safe bet. Or at least toss a few ideas before you start. And no matter what, good luck!&lt;/p&gt;
</description><pubDate>Fri, 06 May 2022 17:31:00 GMT</pubDate><guid isPermaLink="true">https://weblogs.asp.net:443/sfeldman/fixing-nservicebus-default-databus-serializer-in-net-6</guid><category>C#</category><category>NServiceBus</category></item><item><title>Sagas with Azure Service Bus</title><link>https://weblogs.asp.net:443/sfeldman/sagas-with-azure-service-bus</link><description>&lt;h2&gt;Introduction&lt;/h2&gt;
&lt;p&gt;Handling messages out of order is always tricky. The asynchronous nature of messaging makes it challenging. On top of that, systems in the real world are messy and unpredictable. That's why handling workflows always brings more complexity than just handling messages. To illustrate the challenge, I'll use a scenario where my workflow depends on two different services.&lt;/p&gt;
&lt;ol&gt;
&lt;li&gt;Payments&lt;/li&gt;
&lt;li&gt;Shipping&lt;/li&gt;
&lt;/ol&gt;
&lt;p&gt;To successfully complete an order, I'll need to handle a message from each service. &lt;code&gt;PaymentAccepted&lt;/code&gt; from the Payments service and &lt;code&gt;ItemShipped&lt;/code&gt; from the Shipping service. The order can be considered successfully completed only when the two messages are received. The absence of one of the messages would indicate a failed state and require a compensating action. The compensating action will depend on which one of the two messages has already been handled. I'll leave the details of the compensating action out of this post to keep it somewhat light.&lt;/p&gt;
&lt;h2&gt;Setting the expectations&lt;/h2&gt;
&lt;p&gt;One of the assumptions I'll make is how we handle a given order. Both the payment and the shipping services would need to use a correlation ID to connect the things together. This could be an order ID that should be unique. Another assumption is how to handle messages out of order over time. This is where the &lt;a href="https://docs.microsoft.com/en-us/azure/architecture/reference-architectures/saga/saga"&gt;saga pattern&lt;/a&gt;. An important aspect to note is that it will require persisting the state because we'll deal with time. And while we could leverage an external storage/database service with Azure Service Bus, this is unnecessary, thanks to a feature called &lt;a href="https://docs.microsoft.com/en-us/azure/service-bus-messaging/message-sessions"&gt;Message Sessions&lt;/a&gt;. While Message Sessions is more commonly used for FIFO scenarios where the message processing order has to be the same as the message sending order, my choice of Message Sessions was not driven by that. An additional property of the Message Sessions feature that is frequently overlooked is the ability to have a state associated with a given session. The state is an arbitrary object kept on the broker and associated with the session ID. The state can event exist w/o any messages for the session being around. This session state can be accessed by the session ID and can hold up to a single message size of data.&lt;/p&gt;
&lt;h2&gt;Implementation&lt;/h2&gt;
&lt;p&gt;With all this in mind, let's get to the implementation. Each of the two services, as mentioned above, will post a message. The messages will always indicate the order ID as a correlation ID and set the message's &lt;code&gt;SessionId&lt;/code&gt; to this value. I'll use a specific GUID as the order ID and store it in a shared project under &lt;code&gt;Correlation.Id&lt;/code&gt; to make the demo simple.&lt;/p&gt;
&lt;pre&gt;&lt;code&gt;public static class Correlation
{
    public const string Id = &amp;quot;77777777-0000-0000-0000-000000000000&amp;quot;;
}
&lt;/code&gt;&lt;/pre&gt;
&lt;p&gt;To mimic the real world where messages can come out of order and at different times, the Shipping service will post a message with a delay.&lt;/p&gt;
&lt;pre&gt;&lt;code&gt;await publisher.ScheduleMessageAsync(new ServiceBusMessage(&amp;quot;Shipping OK&amp;quot;)
{
    SessionId = Correlation.Id,
    ApplicationProperties = { { &amp;quot;MessageType&amp;quot;, &amp;quot;ItemShipped&amp;quot; } },
}, DateTimeOffset.Now.Add(TimeSpan.FromSeconds(7)));
&lt;/code&gt;&lt;/pre&gt;
&lt;p&gt;Notice the &lt;code&gt;MessageType&lt;/code&gt; header. I'll use topics and subscriptions, filtering out messages based on the &lt;code&gt;MessageType&lt;/code&gt; header. Similar code but without delay will be published an even from the Payments service.&lt;/p&gt;
&lt;pre&gt;&lt;code&gt;await publisher.SendMessageAsync(new ServiceBusMessage(&amp;quot;Payment OK&amp;quot;)
{
    SessionId = Correlation.Id,
    ApplicationProperties = { { &amp;quot;MessageType&amp;quot;, &amp;quot;PaymentAccepted&amp;quot; }  }
});
&lt;/code&gt;&lt;/pre&gt;
&lt;p&gt;When these two are executed, &lt;code&gt;PaymentAccepted&lt;/code&gt; will be delivered right away and &lt;code&gt;ItemShipped&lt;/code&gt; after 7 seconds. And now to the saga implementation that will handle these messages coming out of order at different times and the option that not always both messages will make it.&lt;/p&gt;
&lt;h2&gt;Saga implementation&lt;/h2&gt;
&lt;p&gt;As mentioned earlier, the saga will be implemented using Message Sessions. To process session messages, the SDK provides a &lt;code&gt;SessionProcessor&lt;/code&gt;. To see messages flow in a way that is easier to digest, I'll set the number of sessions to handle to 1. Of course, we'd not want to handle a single session but instead multiple sessions in the real world.&lt;/p&gt;
&lt;pre&gt;&lt;code&gt;var options = new ServiceBusSessionProcessorOptions
{
    MaxConcurrentSessions = 1,
    MaxConcurrentCallsPerSession = 1,
    SessionIdleTimeout = TimeSpan.FromSeconds(15)
};
var processor = client.CreateSessionProcessor(topicName: &amp;quot;orchestration&amp;quot;, subscriptionName: &amp;quot;orchestrator&amp;quot;, options);
&lt;/code&gt;&lt;/pre&gt;
&lt;p&gt;Not that I'm using a topic and a subscription. You could also use a queue or some other topology. Here's how I've arranged my topology for this demo:&lt;/p&gt;
&lt;pre&gt;&lt;code&gt;orchestration (topic)
│
└────orchestrator (subscription)
     │
     ├────ItemShipped (rule)
     │
     ├────PaymentAccepted (rule)
     │
     └────Timeout (rule)
&lt;/code&gt;&lt;/pre&gt;
&lt;p&gt;Notice the &lt;code&gt;Timeout&lt;/code&gt; rule. This will be needed whenever we are waiting for the arrival of the missing messages. Timeouts will be our postponing of saga execution until either all messages will be handled or we'll reach the condition where no more waiting can occur. Then, a compensating action has to be executed as we've given up.&lt;/p&gt;
&lt;p&gt;A session has several lifecycle events that can take place. Those are:&lt;/p&gt;
&lt;pre&gt;&lt;code&gt;processor.SessionInitializingAsync += args =&amp;gt;
{
    WriteLine($&amp;quot;Handling session with ID: {args.SessionId}&amp;quot;);
    return Task.CompletedTask;
};

processor.SessionClosingAsync += args =&amp;gt;
{
    WriteLine($&amp;quot;Closing session with ID: {args.SessionId}&amp;quot;);
    return Task.CompletedTask;
};

processor.ProcessErrorAsync += args =&amp;gt;
{
    WriteLine($&amp;quot;Error: {args.Exception}&amp;quot;, warning: true);        
    return Task.CompletedTask;
};
&lt;/code&gt;&lt;/pre&gt;
&lt;p&gt;And the important one, &lt;code&gt;ProcessMessageAsync&lt;/code&gt;. Again, it's a bit overwhelming, so give it a quick look and head over to the explanation below.&lt;/p&gt;
&lt;pre&gt;&lt;code&gt;processor.ProcessMessageAsync += async args =&amp;gt;
{
    // (1)
    var message = args.Message;
    var messageType = message.ApplicationProperties[&amp;quot;MessageType&amp;quot;];
    WriteLine($&amp;quot;Got a message of type: {messageType} for session with ID {args.SessionId}&amp;quot;);

    // (2)
    var sessionState = await args.GetSessionStateAsync();
    var state = sessionState is null
        ? new State()
        : sessionState.ToObject&amp;lt;State&amp;gt;(new JsonObjectSerializer())!;

    // (3)
    if (state.Completed)
    {
        WriteLine($&amp;quot;Completing the process for Order with correlation ID {message.SessionId}&amp;quot;);

        var publisher = client.CreateSender(&amp;quot;orchestration&amp;quot;);
        await publisher.SendMessageAsync(new ServiceBusMessage($&amp;quot;Orchestration for Order with session ID {message.SessionId} is completed&amp;quot;));
    }

    Func&amp;lt;State, Task&amp;gt; ExecuteAction = messageType switch
    {
        // (4)
        &amp;quot;PaymentAccepted&amp;quot; =&amp;gt; async delegate
        {
            state.PaymentReceived = true;
            await SetTimeoutIfNecessary(client, args, state, TimeSpan.FromSeconds(5));
        },
        &amp;quot;ItemShipped&amp;quot; =&amp;gt; async delegate
        {
            state.ItemShipped = true;
            await SetTimeoutIfNecessary(client, args, state, TimeSpan.FromSeconds(5));
        },
        // (5)
        &amp;quot;Timeout&amp;quot; =&amp;gt; async delegate
        {
            if (state.Completed || sessionState is null)
            {
                WriteLine($&amp;quot;Orchestration ID {args.SessionId} has completed. Discarding timeout.&amp;quot;);
                return;
            }
            
            if (state.RetriesCount &amp;lt; 3)
            {
                await SetTimeoutIfNecessary(client, args, state, TimeSpan.FromSeconds(5));
            }
            else
            {
                WriteLine($&amp;quot;Exhausted all retries ({state.RetriesCount}). Executing compensating action and completing session with ID {args.SessionId}&amp;quot;, warning: true);
                // Compensating action here
                await args.SetSessionStateAsync(null);
            }
        },
        _ =&amp;gt; throw new Exception($&amp;quot;Received unexpected message type {messageType} (message ID: {message.MessageId})&amp;quot;)
    };

    await ExecuteAction(state);

    static async Task SetTimeoutIfNecessary(ServiceBusClient client, ProcessSessionMessageEventArgs args, State state, TimeSpan timeout)
    {
        if (state.Completed)
        {
            WriteLine($&amp;quot;Orchestration with session ID {args.SessionId} has successfully completed. Sending notification (TBD).&amp;quot;);
            await args.SetSessionStateAsync(null);
            return;
        }

        WriteLine($&amp;quot;Scheduling a timeout to check in {timeout}&amp;quot;);

        var publisher = client.CreateSender(&amp;quot;orchestration&amp;quot;);
        await publisher.ScheduleMessageAsync(new ServiceBusMessage
        {
            SessionId = args.Message.SessionId,
            ApplicationProperties = { { &amp;quot;MessageType&amp;quot;, &amp;quot;Timeout&amp;quot; } }
        }, DateTimeOffset.Now.Add(timeout));

        state.RetriesCount++;
        await args.SetSessionStateAsync(BinaryData.FromObjectAsJson(state));
    }
}
&lt;/code&gt;&lt;/pre&gt;
&lt;p&gt;What this code is doing is the following:&lt;/p&gt;
&lt;ol&gt;
&lt;li&gt;Upon a received message, it looks at the message type and the session state.&lt;/li&gt;
&lt;li&gt;Session state is the saga state. If one doesn't exist, a new state is initiated. Otherwise, it's deserialized into the POCO to be used for the logic. The state keeps the vital information for the decision-making that needs to survive over the time between the messages.&lt;/li&gt;
&lt;li&gt;If the state indicates completion (both messages received), notify about the successful completion of the saga. The underlying session will be completed eventually.&lt;/li&gt;
&lt;li&gt;If the message is &lt;code&gt;PaymentAccepted&lt;/code&gt;, the state is updated to indicate this message has been handled. And right away, a timeout is set, if necessary.&lt;/li&gt;
&lt;li&gt;If the message is &lt;code&gt;Timeout&lt;/code&gt;, the state is checked for completion (meaning &lt;code&gt;PaymentAccepted&lt;/code&gt; &lt;em&gt;and&lt;/em&gt; &lt;code&gt;ItemShipped&lt;/code&gt; where received), or if the session state is null, telling the saga is over. If that's the case, the timeout message will be discarded as it has arrived after the saga has been completed. Otherwise, a simple number of retries will be checked to determine wherever the saga should continue waiting or not. This part is very custom, and I've decided to let the saga issue a timeout of 5 seconds for 3 times. You could do it exponentially or introduce different types of timeouts. &lt;strong&gt;But&lt;/strong&gt; if the number of retries has been exceeded, we've never got one of the missing messages, and the saga has not been completed successfully. This is where a compensating action would occur, and the session state would be cleared. It's crucial to remove the session state to ensure it doesn't stay on the broker forever.&lt;/li&gt;
&lt;/ol&gt;
&lt;p&gt;Here's a happy day scenario, when both messages make it to the topic:&lt;/p&gt;
&lt;pre&gt;&lt;code&gt;[23:35:42] Handling session with ID: 77777777-0000-0000-0000-000000000000
[23:35:42] Got a message of type: PaymentAccepted for session with ID 77777777-0000-0000-0000-000000000000
[23:35:43] Scheduling a timeout to check in 00:00:05
[23:35:48] Got a message of type: Timeout for session with ID 77777777-0000-0000-0000-000000000000
[23:35:48] Scheduling a timeout to check in 00:00:05
[23:35:52] Got a message of type: Timeout for session with ID 77777777-0000-0000-0000-000000000000
[23:35:53] Scheduling a timeout to check in 00:00:05
[23:35:55] Got a message of type: ItemShipped for session with ID 77777777-0000-0000-0000-000000000000
[23:35:55] Orchestration with session ID 77777777-0000-0000-0000-000000000000 has successfully completed. Sending notification (TBD).
[23:35:57] Got a message of type: Timeout for session with ID 77777777-0000-0000-0000-000000000000
[23:35:57] Orchestration ID 77777777-0000-0000-0000-000000000000 has completed. Discarding timeout.
[23:36:13] Closing session with ID: 77777777-0000-0000-0000-000000000000
&lt;/code&gt;&lt;/pre&gt;
&lt;p&gt;And this is what the execution looks like when one of the messages never arrives:&lt;/p&gt;
&lt;pre&gt;&lt;code&gt;[01:15:16] Handling session with ID: 77777777-0000-0000-0000-000000000000
[01:15:16] Got a message of type: PaymentAccepted for session with ID 77777777-0000-0000-0000-000000000000
[01:15:16] Scheduling a timeout to check in 00:00:05
[01:15:21] Got a message of type: Timeout for session with ID 77777777-0000-0000-0000-000000000000
[01:15:21] Scheduling a timeout to check in 00:00:05
[01:15:26] Got a message of type: Timeout for session with ID 77777777-0000-0000-0000-000000000000
[01:15:26] Scheduling a timeout to check in 00:00:05
[01:15:31] Got a message of type: Timeout for session with ID 77777777-0000-0000-0000-000000000000
[01:15:31] Exhausted all retries (3). Executing compensating action and completing session with ID 77777777-0000-0000-0000-000000000000
[01:15:47] Closing session with ID: 77777777-0000-0000-0000-000000000000
&lt;/code&gt;&lt;/pre&gt;
&lt;h2&gt;Recap&lt;/h2&gt;
&lt;p&gt;Modelling a process that is executing over time requires persistence. With Azure Service Bus we can leverage Message Sessions to keep the state along with the session's messages, adding timeout messages to provide some future checkpoints to determine wherever the compensating logic needs to be executed or not. with the session state, we can also inspect the state of the saga by querying for it with the session ID and the correlation ID to be used.&lt;/p&gt;
&lt;p&gt;Full solution is available on &lt;a href="https://github.com/SeanFeldman/Orchestration"&gt;GitHub&lt;/a&gt;.&lt;/p&gt;
</description><pubDate>Mon, 25 Apr 2022 07:22:00 GMT</pubDate><guid isPermaLink="true">https://weblogs.asp.net:443/sfeldman/sagas-with-azure-service-bus</guid><category>AzureServiceBus</category></item><item><title>Impersonating Events</title><link>https://weblogs.asp.net:443/sfeldman/impersonating-events</link><description>&lt;p&gt;&lt;img src="https://aspblogs.blob.core.windows.net:443/media/sfeldman/2022/impersonating-events/mask.jpg" alt="enter image description here" /&gt;&lt;/p&gt;
&lt;p&gt;Azure Service Bus queues and subscriptions are an excellent way to process messages using competing consumers. But it can also get really tricky. Let's look at a scenario where a single event needs to be processed by two services. For this example, I'll use a process of an agent being assigned to a case. The requirement is pretty straightforward. When an agent is assigned to a case, we should send an email notifying the agent. In my system, I've designed it the way that when the event of assignment (&lt;code&gt;AgentAssigned&lt;/code&gt;) is taking place, there are two event handlers that would react to it:&lt;/p&gt;
&lt;ol&gt;
&lt;li&gt;Update the querying data store with the information about the assignment to be able to look up agent assignments, and&lt;/li&gt;
&lt;li&gt;Notify the agent about the assignment with some case details.&lt;/li&gt;
&lt;/ol&gt;
&lt;p&gt;&lt;img src="https://aspblogs.blob.core.windows.net:443/media/sfeldman/2022/impersonating-events/no-order.jpg" alt="enter image description here" /&gt;&lt;/p&gt;
&lt;p&gt;It's all great except for one problem. When the second handler runs first, there's still no association between the agent and the case. No email can go out as there's nothing to notify about. Or worse, when another event, &lt;code&gt;AgentReassigned&lt;/code&gt;, took place but hasn't been processed by the first handler. In this case, we'd be sending an email notification to the original agent who's no longer on the case. The problem is quite apparent - we can't have competing consumers for the same event. And the order of execution is clearly essential.&lt;/p&gt;
&lt;p&gt;One of the solutions is to introduce an additional event, &lt;code&gt;AgentAssignedCompleted&lt;/code&gt;, which would be triggered by the first handler when the querying data store is updated with the information about the case and the agent. And have the second handler subscribe to this new event rather than the original one.&lt;/p&gt;
&lt;p&gt;But what if I have more than one event to notify about where I shouldn't have competing consumers? And the original event would need to be duplicated as-is as the same information would be required. I really don't want to do that. The good news is there's no need. Azure Service Bus is robust enough to allow message impersonation. How does it work?&lt;/p&gt;
&lt;p&gt;The first handler, upon its completion, will dispatch a new event. We'll use a convention of &lt;code&gt;{OriginalMessageType}Completed&lt;/code&gt;. In the case of &lt;code&gt;AgentReassigned&lt;/code&gt;, the newly dispatched event will be &lt;code&gt;AgentAssignedCompleted&lt;/code&gt;. But what we'll do is stamp the new message headers with the &lt;em&gt;original&lt;/em&gt; message type and set the payload to the original message payload.&lt;/p&gt;
&lt;pre&gt;&lt;code&gt;var outgoingMessage = new ServiceBusMessage(BinaryData.FromObjectAsJson(message))
{
	ApplicationProperties =
	{
		{ &amp;quot;EventType&amp;quot;, $&amp;quot;{nameof(ConsultantReassigned)}Completed&amp;quot; },
		{ &amp;quot;OriginalEventType&amp;quot;, typeof(ConsultantReassigned).FullName }
	}
};
await sender.SendMessageAsync(outgoingMessage);
&lt;/code&gt;&lt;/pre&gt;
&lt;p&gt;The subscription we'll create for the 2nd handler will subscribe to the &lt;code&gt;AgentAssignedCompleted&lt;/code&gt; event type, using SQL filter &lt;code&gt;EventType='ConsultantReassignedCompleted'&lt;/code&gt;. This will ensure that copies of the messages of &lt;code&gt;ConsultantReassignedCompleted&lt;/code&gt; will be stored under the subscription.&lt;/p&gt;
&lt;p&gt;And here's the trick, we'll use SQL filter &lt;strong&gt;action&lt;/strong&gt;to replace &lt;code&gt;EventType&lt;/code&gt; of the message that will be given to the subscription if it matches the condition, back to the original message type using the following instruction: &lt;code&gt;SET EventType=OriginalEventType; REMOVE OriginalEventType;.&lt;/code&gt; With this action, any message that has satisfied the SQL filter will have its header &lt;code&gt;EventType&lt;/code&gt; modified to the header's value &lt;code&gt;OriginalEventType&lt;/code&gt;, removing the temporary &lt;code&gt;OriginalEventType&lt;/code&gt; after that.&lt;/p&gt;
&lt;p&gt;When the 2nd handler receives messages from this subscription, the type of the message indicated by &lt;code&gt;EventType&lt;/code&gt; will be the original &lt;code&gt;ConsultantReassigned&lt;/code&gt; event rather than the modified &lt;code&gt;ConsultantReassignedCompleted&lt;/code&gt; type. And the payload will be the original &lt;code&gt;ConsultantReassigned&lt;/code&gt; payload.&lt;/p&gt;
&lt;p&gt;&lt;img src="https://aspblogs.blob.core.windows.net:443/media/sfeldman/2022/impersonating-events/order.jpg" alt="enter image description here" /&gt;&lt;/p&gt;
&lt;h3&gt;Provisioning&lt;/h3&gt;
&lt;p&gt;There are several ways. Manually, using a tool such as ServiceBus Explorer, or scripted using Az CLI or Bicep. Bicep seems to have a &lt;a href="https://github.com/Azure/bicep/issues/6557"&gt;bug&lt;/a&gt;, but &lt;a href="https://docs.microsoft.com/en-us/cli/azure/servicebus/topic/subscription/rule?view=azure-cli-latest#az-servicebus-topic-subscription-rule-create"&gt;Az CLI&lt;/a&gt; works great. This is what it would look like:&lt;/p&gt;
&lt;pre&gt;&lt;code&gt;az servicebus topic subscription rule create --resource-group 'MyGroup' --namespace-name 'MyNamespace'
    --topic-name 'tva.events' --subscription-name 'Notifications' --name ConsultantReassignedCompleted
    --filter-sql-expression=&amp;quot;EventType='ConsultantReassignedCompleted'&amp;quot; 
    --action-sql-expression='SET EventType=OriginalEventType; REMOVE OriginalEventType;'
&lt;/code&gt;&lt;/pre&gt;
&lt;h2&gt;Is this necessary?&lt;/h2&gt;
&lt;p&gt;It really depends. You could create Additional &lt;code&gt;xxxxCompleted&lt;/code&gt; types and duplicate all the properties from the original message types if you'd like. We can skip that and keep only the original events that matter, enabling ordered processing by tweaking the provisioned topology with event impersonation.&lt;/p&gt;
</description><pubDate>Fri, 15 Apr 2022 07:47:00 GMT</pubDate><guid isPermaLink="true">https://weblogs.asp.net:443/sfeldman/impersonating-events</guid><category>Azure</category><category>AzureServiceBus</category></item><item><title>Executing Azure Timer function manually</title><link>https://weblogs.asp.net:443/sfeldman/executing-azure-timer-function-manually</link><description>&lt;p&gt;&lt;img src="https://aspblogs.blob.core.windows.net:443/media/sfeldman/2022/executing-azure-timer-function-manually/screwdrivers.jpg" alt="enter image description here" /&gt;&lt;/p&gt;
&lt;p&gt;Azure timer triggered. Functions are convenient for automated execution. With the specified time interval, a function gets to execute when specified and then sleeps until the subsequent execution.&lt;/p&gt;
&lt;p&gt;But what happens when a function needs to be executed on demand? For example, during development, when debugging the logic and want to kick off a function right away rather than waiting?&lt;/p&gt;
&lt;p&gt;That's possible with the &lt;code&gt;TimerTrigger&lt;/code&gt; that accepts an additional parameter, &lt;code&gt;RunOnStartup&lt;/code&gt;. Assign it a value of &lt;code&gt;true&lt;/code&gt;, and the function will be executed when the Function App starts. You might want to wrap it with &lt;code&gt;#if DEBUG&lt;/code&gt; to ensure it gets executed upon each deployment or restarting a Function/Function App in production.&lt;/p&gt;
&lt;pre&gt;&lt;code&gt;[FunctionName(nameof(MyTimerTrigger))]
public async Task RunAsync([TimerTrigger(&amp;quot;0 0 */12 * * *&amp;quot;
#if DEBUG
  , RunOnStartup = true
#endif
)] TimerInfo myTimer, ExecutionContext executionContext)
{
 // function code
}
&lt;/code&gt;&lt;/pre&gt;
&lt;p&gt;That's great, but what if I need to force the function to execute not right away? For example, my function executes every 12 hours (&lt;code&gt;&amp;quot;0 0 */12 * * *&amp;quot;&lt;/code&gt;), and I need to force it to run earlier than that?&lt;/p&gt;
&lt;p&gt;One way is to use the CRON expression from a configuration, update the configuration and restart the Function. But that's clunky and inconvenient. A better way is to force the function to execute by making a request through the administrative API.&lt;/p&gt;
&lt;p&gt;An HTTP request to the administrative API with a master key will trigger the function execution. The URL is always of the following format:&lt;/p&gt;
&lt;p&gt;&lt;code&gt;https://&amp;lt;function-app&amp;gt;.azurewebsites.net/admin/functions/&amp;lt;function-name&amp;gt;&lt;/code&gt;&lt;/p&gt;
&lt;p&gt;For example, https://my-test-funcapp.azurewebsites.net/admin/functions/MyTimerTrigger&lt;/p&gt;
&lt;p&gt;The content to POST-ed for a timer-triggered function can be an empty JSON, &lt;code&gt;&amp;quot;{}&amp;quot;&lt;/code&gt;.
The master key can be found under the Function App Keys section. Careful with the value, do not share or commit it. The value should be passed with the header &lt;code&gt;x-functions-key&lt;/code&gt;.&lt;/p&gt;
&lt;p&gt;Note: locally, the &lt;code&gt;x-functions-key&lt;/code&gt; header is not required.&lt;/p&gt;
&lt;p&gt;Upon successful execution, HTTP response code 202 Accepted will be returned.&lt;/p&gt;
&lt;p&gt;Conveniently enough, this works on &lt;em&gt;any&lt;/em&gt; non-HTTP triggered function and on v3 and v4 In-Process SDK and Isolated Worker SDK.&lt;/p&gt;
&lt;p&gt;While this little gem is &lt;a href="https://docs.microsoft.com/en-us/azure/azure-functions/functions-manually-run-non-http"&gt;documented&lt;/a&gt;, it deserves more publicity it brings some excellent options to the table when it comes to invoking non-HTTP functions on demand.&lt;/p&gt;
</description><pubDate>Mon, 21 Mar 2022 06:34:00 GMT</pubDate><guid isPermaLink="true">https://weblogs.asp.net:443/sfeldman/executing-azure-timer-function-manually</guid><category>Functions</category><category>Azure</category></item><item><title>Service Bus Message to Blob</title><link>https://weblogs.asp.net:443/sfeldman/asb-message-to-blob</link><description>&lt;p&gt;About 5+ years ago I blogged about &lt;a href="https://weblogs.asp.net/sfeldman/azure-functions-to-make-audit-queue-and-auditors-happy"&gt;turning messages into audit blobs&lt;/a&gt;. Back then, it was for Storage Queue messages and the early Azure Functions implementation that required portal configuration. Since then, Storage Queues has been replaced by Azure Service Bus and Azure Functions has gained the ability to declare everything through the code. And not only that but also in two different ways, using&lt;/p&gt;
&lt;ol&gt;
&lt;li&gt;In-Process SDK&lt;/li&gt;
&lt;li&gt;Isolated Worker SDK (out-of-process)&lt;/li&gt;
&lt;/ol&gt;
&lt;p&gt;The concept hasn't changed much but the the code did become somewhat simpler.&lt;/p&gt;
&lt;p&gt;&lt;strong&gt;In-Process SDK&lt;/strong&gt;&lt;/p&gt;
&lt;pre&gt;&lt;code&gt;public static class MessageTriggeredFunction
{
    [FunctionName(nameof(MessageTriggeredFunction))]
    public static async Task Run(
        [ServiceBusTrigger(&amp;quot;myqueue&amp;quot;, Connection = &amp;quot;ServiceBusConnectionString&amp;quot;)]string payload,
        string messageId,
        [Blob(&amp;quot;messages/{messageId}.txt&amp;quot;, FileAccess.Write, Connection = &amp;quot;StorageAccountConnectionString&amp;quot;)] Stream output)
    {
        await output.WriteAsync(Encoding.UTF8.GetBytes(payload));
    }
}
&lt;/code&gt;&lt;/pre&gt;
&lt;p&gt;&lt;strong&gt;Isolated Worker SDK&lt;/strong&gt;&lt;/p&gt;
&lt;pre&gt;&lt;code&gt;public class MessageTriggeredFunctionIsolated
{
   [Function(nameof(MessageTriggeredFunctionIsolated))]
   [BlobOutput(&amp;quot;messages/{messageId}.txt&amp;quot;, Connection = &amp;quot;StorageAccountConnectionString&amp;quot;)]
   public string Run(
       [ServiceBusTrigger(&amp;quot;myqueue&amp;quot;, Connection = &amp;quot;ServiceBusConnectionString&amp;quot;)] string payload,
       string messageId)
  {
            return payload;
  }
}
&lt;/code&gt;&lt;/pre&gt;
&lt;p&gt;The two snippets will result in the same outcome - a message will trigger the function and cause a blob to be generated and named as &lt;code&gt;message-id.txt&lt;/code&gt; where &lt;code&gt;message-id&lt;/code&gt; will be the physical message id.&lt;/p&gt;
</description><pubDate>Mon, 28 Feb 2022 06:26:31 GMT</pubDate><guid isPermaLink="true">https://weblogs.asp.net:443/sfeldman/asb-message-to-blob</guid><category>AzureServiceBus</category><category>Functions</category></item><item><title>Azure Functions Isolated Worker - Sending multiple messages</title><link>https://weblogs.asp.net:443/sfeldman/functions-isolated-worker-sending-multiple-messages</link><description>&lt;p&gt;&lt;img src="https://aspblogs.blob.core.windows.net:443/media/sfeldman/2021/functions-isolated-worker-sending-multiple-messages/collector.jpg" alt="enter image description here" /&gt;&lt;/p&gt;
&lt;p&gt;The new Azure Functions SDK for Isolated Worker (process) has been introduced around .NET 5. While it's still in flux despite being GA-ed, it's gaining more and more popularity. And yet, there are still some sharp edges you should be careful with and validate that everything you're using with the older SDK, In-Process, is offered with the new SDK. Or at least there's a replacement.&lt;/p&gt;
&lt;p&gt;Today, I've stumbled upon a StackOverflow question about &lt;code&gt;IAsyncCollector&lt;/code&gt; and Service Bus messages. &lt;code&gt;IAsyncCollector,&lt;/code&gt; as its synchronous counterpart &lt;code&gt;ICollector&lt;/code&gt; offers the comfort of output binding and returning multiple items. For example, with Azure Service Bus, one can send out multiple messages from the executing function. Quite handy, and with the In-Process SDK, it looks like the following. The function's signature contains a collector (I call it dispatcher) that can be used to &amp;quot;add&amp;quot; messages. Those are actually getting dispatched to the queue the &lt;code&gt;ServiceBus&lt;/code&gt; attribute is configured with by adding messages. Which, in this case, is a queue called &lt;code&gt;dest&lt;/code&gt;.&lt;/p&gt;
&lt;pre&gt;&lt;code&gt;[FunctionName(&amp;quot;Concierge&amp;quot;)]
public async Task&amp;lt;IActionResult&amp;gt; Handle([HttpTrigger(AuthorizationLevel.Function,&amp;quot;post&amp;quot;, Route = &amp;quot;receive&amp;quot;)] HttpRequest req,
    [ServiceBus(&amp;quot;dest&amp;quot;, Connection = &amp;quot;AzureServiceBus&amp;quot;)] IAsyncCollector&amp;lt;ServiceBusMessage&amp;gt; dispatcher)
&lt;/code&gt;&lt;/pre&gt;
&lt;p&gt;And sending messages:&lt;/p&gt;
&lt;pre&gt;&lt;code&gt;for (var i = 0; i &amp;lt; 10; i++)
{
   var message = new ServiceBusMessage($&amp;quot;Message #{i}&amp;quot;);
   await collector.AddAsync(serviceBusMessage);
}
&lt;/code&gt;&lt;/pre&gt;
&lt;p&gt;Straight forward and simple. But how do you do the same with the new Isolated Worker (out of process) SDK?&lt;/p&gt;
&lt;p&gt;Not the same way. The new SDK doesn't currently support native SDK types. Therefore types such as &lt;code&gt;ServiceBusMessage&lt;/code&gt; are not supported. Also, SDK Service Bus clients are not available directly. So functions need to marshal data as strings or byte arrays to be able to send those. And receive as well. But we're focusing on sending. So what's the way to send those multiple messages?&lt;/p&gt;
&lt;p&gt;The official documentation does mention &lt;a href="https://docs.microsoft.com/en-us/azure/azure-functions/dotnet-isolated-process-guide#multiple-output-bindings"&gt;multiple output binding&lt;/a&gt;. But that's in the context of using multiple &lt;em&gt;different&lt;/em&gt; output bindings. To output multiple items to the &lt;strong&gt;same&lt;/strong&gt; output binding, we need to resort to a bit of tedious work.&lt;/p&gt;
&lt;p&gt;First, we'll need to serialize our messages. Then we'll dispatch those serialized objects using an output binding, connected to a collection property. Here's an example:&lt;/p&gt;
&lt;pre&gt;&lt;code&gt;[Function(&amp;quot;OneToMany&amp;quot;)]
public static DispatchedMessages Run([ServiceBusTrigger(&amp;quot;myqueue&amp;quot;, 
    Connection = &amp;quot;AzureServiceBus&amp;quot;)] string myQueueItem, FunctionContext context)
{
  // Generate 5 messages
  var messages = new List&amp;lt;MyMessage&amp;gt;();
  for (var i = 0; i &amp;lt; 5; i++)
  {
      var message = new MyMessage { Value = $&amp;quot;Message #{i}&amp;quot; };
      messages.Add(message);
  }

  return new DispatchedMessages
  { 
      Messages = messages.Select(x =&amp;gt; JsonSerializer.Serialize(x)) 
  };
}
&lt;/code&gt;&lt;/pre&gt;
&lt;p&gt;Each message of type &lt;code&gt;MyMessage&lt;/code&gt; is serialized first.&lt;/p&gt;
&lt;pre&gt;&lt;code&gt;class MyMessage
{
    public string Value { get; set; }
}
&lt;/code&gt;&lt;/pre&gt;
&lt;p&gt;And then, we return an object of &lt;code&gt;DispatchedMessage&lt;/code&gt; where the binding glue is:&lt;/p&gt;
&lt;pre&gt;&lt;code&gt;public class DispatchedMessages
{
    [ServiceBusOutput(queueOrTopicName: &amp;quot;dest&amp;quot;, Connection = &amp;quot;AzureServiceBus&amp;quot;)]
    public IEnumerable&amp;lt;string&amp;gt; Messages { get; set; }
}
&lt;/code&gt;&lt;/pre&gt;
&lt;p&gt;This object will be returned from the function and marshalled back to the SDK code that will take care to enumerate over the &lt;code&gt;Messages&lt;/code&gt; property, taking each string and passing it as the body value to the newly constructed &lt;code&gt;ServiceBusMessage&lt;/code&gt;. With the help of the &lt;code&gt;ServiceBusOutput&lt;/code&gt; attribute, Functions SDK knows where to send the message and where to find the connection string. Note that w/o specifying the connection string name, the SDK will attempt to load the connection string from the variable/key named &lt;code&gt;AzureWebJobsServiceBus&lt;/code&gt;. This means that we can have multiple dispatchers, similar to the in-process SDK multiple collectors, by having a property per destination/namespace on the returned type.&lt;/p&gt;
&lt;p&gt;And just like this, we can kick off the function and dispatch multiple messages with the new Isolated Worker SDK.&lt;/p&gt;
&lt;p&gt;&lt;img src="https://aspblogs.blob.core.windows.net:443/media/sfeldman/2021/functions-isolated-worker-sending-multiple-messages/result.png" alt="enter image description here" /&gt;&lt;/p&gt;
</description><pubDate>Tue, 14 Sep 2021 06:48:00 GMT</pubDate><guid isPermaLink="true">https://weblogs.asp.net:443/sfeldman/functions-isolated-worker-sending-multiple-messages</guid><category>Functions</category><category>AzureServiceBus</category></item><item><title>Azure Functions Elevated</title><link>https://weblogs.asp.net:443/sfeldman/azure-functions-elevated</link><description>&lt;p&gt;A recent talk I gave online at ServerlessDays Amsterdam&lt;/p&gt;
&lt;iframe width="560" height="315" src="https://www.youtube.com/embed/oxsDVjxGGfc?start=3288" title="YouTube video player" frameborder="0" allow="accelerometer; autoplay; clipboard-write; encrypted-media; gyroscope; picture-in-picture" allowfullscreen&gt;&lt;/iframe&gt;
</description><pubDate>Wed, 02 Jun 2021 18:42:04 GMT</pubDate><guid isPermaLink="true">https://weblogs.asp.net:443/sfeldman/azure-functions-elevated</guid><category>Azure</category><category>AzureFunctions</category><category>NServiceBus</category></item><item><title>Automatically provision NServiceBus Service Bus Function endpoint topology</title><link>https://weblogs.asp.net:443/sfeldman/automatic-nservicebus-topology-creation-for-function</link><description>&lt;p&gt;&lt;img src="https://aspblogs.blob.core.windows.net:443/media/sfeldman/2021/automatic-nservicebus-topology-creation-for-function/pipes.jpg" alt="enter image description here" /&gt;&lt;/p&gt;
&lt;p&gt;&lt;strong&gt;2021-01-19 update&lt;/strong&gt;: code for subscription was adjusted to ensure the correct default rule for subscription is created.&lt;/p&gt;
&lt;p&gt;In the previous post, &lt;a href="https://weblogs.asp.net/sfeldman/automatic-queue-creation-for-function"&gt;Automatically create Service Bus trigger queue for Azure Function&lt;/a&gt;, I've shown how to provision a &lt;code&gt;ServiceBusTrigger&lt;/code&gt; queue from within a Function.&lt;/p&gt;
&lt;p&gt;In this post, we'll take that idea and push it further to something a bit more sophisticated - provisioning the topology necessary for NServiceBus endpoint hosted with Azure Function and using Azure Service Bus transport. If you haven't used NServiceBus or NServiceBus with Azure Functions, here's a &lt;a href="https://docs.particular.net/previews/azure-functions-service-bus"&gt;starting point&lt;/a&gt; for you. NServiceBus can bring a few advantages over native Functions I'll leave to discover on your own. And now, let's have a look at what are the things we'll need to accomplish.&lt;/p&gt;
&lt;p&gt;Just as with the native Azure Function, a logical endpoint is represented by an input queue. That input queue needs to be created.&lt;/p&gt;
&lt;p&gt;Next, NServiceBus has centralized error and audit queues. While those are not difficult to create, it's more convenient to have those queues created by the first starting endpoint.&lt;/p&gt;
&lt;p&gt;Last is the pub/sub infrastructure. Azure Service Bus transport has a specific topology all endpoints adhere to. That includes a centralized topic, by default named &lt;code&gt;bundle-1&lt;/code&gt; and each logical endpoint as a subscription. Upon startup, each endpoint subscribes to the events it's interested in using this infrastructure.&lt;/p&gt;
&lt;p&gt;With this information, let's start putting the pieces needed for the whole thing to work together.&lt;/p&gt;
&lt;h2&gt;Discovering endpoints&lt;/h2&gt;
&lt;p&gt;As there might be one or more logical endpoints, the hard-coding queue name as it was done in the previous post is not ideal. An alternative would be to reflect the endpoint's name (queue name) at runtime when the Function App is bootstrapping everything.&lt;/p&gt;
&lt;pre&gt;&lt;code&gt;    var attribute = Assembly.GetExecutingAssembly().GetTypes()
        .SelectMany(t =&amp;gt; t.GetMethods())
        .Where(m =&amp;gt; m.GetCustomAttribute&amp;lt;FunctionNameAttribute&amp;gt;(false) != null)
        .SelectMany(m =&amp;gt; m.GetParameters())
        .SelectMany(p =&amp;gt; p.GetCustomAttributes&amp;lt;ServiceBusTriggerAttribute&amp;gt;(false))
        .FirstOrDefault();
&lt;/code&gt;&lt;/pre&gt;
&lt;p&gt;With this code, we'll discover all &lt;code&gt;ServiceBusTriggerAttribute&lt;/code&gt; applied to Azure Service Bus triggered functions. For each of these attributes, we'll have to&lt;/p&gt;
&lt;ol&gt;
&lt;li&gt;Create a queue if it doesn't exist&lt;/li&gt;
&lt;li&gt;Create a subscription if it doesn't exist&lt;/li&gt;
&lt;/ol&gt;
&lt;p&gt;The caveat is that a subscription can only be created when a topic is found. Therefore a topic needs to be created first. Also, to make the topology work as the transport expects, each subscription should be auto-forwarding messages to the input queue it's associated with. And finally, the audit and error queues can be provisioned as well, completing the topology work necessary for each endpoint to be bootstrapped.&lt;/p&gt;
&lt;h2&gt;Putting it together&lt;/h2&gt;
&lt;p&gt;Here's the helper method we'd be using:&lt;/p&gt;
&lt;pre&gt;&lt;code&gt;static async Task CreateTopologyWithReflection(IConfiguration configuration, string topicName = &amp;quot;bundle-1&amp;quot;, string auditQueue = &amp;quot;audit&amp;quot;, string errorQueue = &amp;quot;error&amp;quot;)
{
    var connectionString = configuration.GetValue&amp;lt;string&amp;gt;(&amp;quot;AzureWebJobsServiceBus&amp;quot;);
    var managementClient = new ManagementClient(connectionString);

    var attribute = Assembly.GetExecutingAssembly().GetTypes()
        .SelectMany(t =&amp;gt; t.GetMethods())
        .Where(m =&amp;gt; m.GetCustomAttribute&amp;lt;FunctionNameAttribute&amp;gt;(false) != null)
        .SelectMany(m =&amp;gt; m.GetParameters())
        .SelectMany(p =&amp;gt; p.GetCustomAttributes&amp;lt;ServiceBusTriggerAttribute&amp;gt;(false))
        .FirstOrDefault();

    if (attribute == null)
    {
        throw new Exception(&amp;quot;No endpoint was found&amp;quot;);
    }

    // there are endpoints, create a topic
    if (!await managementClient.TopicExistsAsync(topicName))
    {
        await managementClient.CreateTopicAsync(topicName);
    }

    var endpointQueueName = attributes.First().QueueName;

    if (!await managementClient.QueueExistsAsync(endpointQueueName))
    {
        await managementClient.CreateQueueAsync(endpointQueueName);
    }

    if (!await managementClient.SubscriptionExistsAsync(topicName, endpointQueueName))
    {
        var subscriptionDescription = new SubscriptionDescription(topicName, endpointQueueName)
        {
            ForwardTo = endpointQueueName,
            UserMetadata = $&amp;quot;Events {endpointQueueName} subscribed to&amp;quot;
        };
        var ruleDescription = new RuleDescription
        {
            Filter = new FalseFilter()
        };
        await managementClient.CreateSubscriptionAsync(subscriptionDescription, ruleDescription);
    }

    if (!await managementClient.QueueExistsAsync(auditQueue))
    {
        await managementClient.CreateQueueAsync(auditQueue);
    }

    if (!await managementClient.QueueExistsAsync(errorQueue))
    {
        await managementClient.CreateQueueAsync(errorQueue);
    }
}
&lt;/code&gt;&lt;/pre&gt;
&lt;p&gt;Next, this helper method needs to be involved in the Startup class:&lt;/p&gt;
&lt;pre&gt;&lt;code&gt;[assembly: FunctionsStartup(typeof(Startup))]
public class Startup : FunctionsStartup
{
    public override void Configure(IFunctionsHostBuilder builder)
    {      
        CreateTopology(builder.GetContext().Configuration).GetAwaiter().GetResult();

        builder.UseNServiceBus(() =&amp;gt;
        {
          var configuration = new ServiceBusTriggeredEndpointConfiguration(AzureServiceBusTriggerFunction.EndpointName);
          configuration.Transport.SubscriptionRuleNamingConvention(type =&amp;gt; type.Name);
          return configuration;
        });
    }
}
&lt;/code&gt;&lt;/pre&gt;
&lt;p&gt;In my test solutions, I've defined an endpoint named &lt;code&gt;ASBEndpoint&lt;/code&gt; (&lt;code&gt;AzureServiceBusTriggerFunction.EndpointName&lt;/code&gt; is assigned the name). Once Azure Function hosting the endpoint is deployed, the following topology is created:&lt;/p&gt;
&lt;p&gt;&lt;img src="https://aspblogs.blob.core.windows.net:443/media/sfeldman/2021/automatic-nservicebus-topology-creation-for-function/topology.png" alt="topology" /&gt;&lt;/p&gt;
&lt;p&gt;with the correct forwarding to the input queue&lt;/p&gt;
&lt;p&gt;&lt;img src="https://aspblogs.blob.core.windows.net:443/media/sfeldman/2021/automatic-nservicebus-topology-creation-for-function/forwarding.png" alt="fording" /&gt;&lt;/p&gt;
&lt;h2&gt;Subscribing to events&lt;/h2&gt;
&lt;p&gt;In the endpoint, I've added an event and event handler.&lt;/p&gt;
&lt;pre&gt;&lt;code&gt;public class SimpleEvent : IEvent { }

public class SimpleEventHandler : IHandleMessages&amp;lt;SimpleEvent&amp;gt;
{
    readonly ILogger&amp;lt;SimpleEvent&amp;gt; logger;

    public SimpleEventHandler(ILogger&amp;lt;SimpleEvent&amp;gt; logger)
    {
        this.logger = logger;
    }

    public Task Handle(SimpleEvent message, IMessageHandlerContext context)
    {
        logger.LogInformation($&amp;quot;{nameof(SimpleEventHandler)} invoked&amp;quot;);
        return Task.CompletedTask;
    }
}
&lt;/code&gt;&lt;/pre&gt;
&lt;p&gt;NServiceBus automatically picks up and subscribes to all the events it finds handlers for. The subscription is expressed as a rule for each event. But this only happens when an endpoint is activated. This is not the case with message triggered Function endpoint. Luckily, there's a trick with &lt;code&gt;TimerTrigger&lt;/code&gt; we can apply.&lt;/p&gt;
&lt;h2&gt;Timer trigger trick&lt;/h2&gt;
&lt;p&gt;Normally, &lt;code&gt;TimerTirgger&lt;/code&gt; is executed periodically using a schedule defined using the CRON expression. In addition to that, there's also a flag to force a time-triggered function to run a single time when a timer triggered function is deployed. With this option, we can leverage a timer triggered function to run once upon deployment and stay dormant for a year. When the function executes, it will dispatch the &lt;code&gt;ForceAutoSubscription&lt;/code&gt; control message and cause the endpoint to load and auto-subscribe to the &lt;code&gt;SimpleEvent&lt;/code&gt;.&lt;/p&gt;
&lt;p&gt;Control message definition:&lt;/p&gt;
&lt;pre&gt;&lt;code&gt;public class ForceAutoSubscription : IMessage { }
&lt;/code&gt;&lt;/pre&gt;
&lt;p&gt;Timer function:&lt;/p&gt;
&lt;pre&gt;&lt;code&gt;public class TimerFunc
{
    readonly IFunctionEndpoint functionEndpoint;

    public TimerFunc(IFunctionEndpoint functionEndpoint)
    {
        this.functionEndpoint = functionEndpoint;
    }

    [FunctionName(&amp;quot;TimerFunc&amp;quot;)]
    public async Task Run([TimerTrigger(&amp;quot;* * * 1 1 *&amp;quot;, RunOnStartup = true)]TimerInfo myTimer,
        ILogger logger, ExecutionContext executionContext)
    {
        var sendOptions = new SendOptions();
        sendOptions.SetHeader(Headers.ControlMessageHeader, bool.TrueString);
        sendOptions.SetHeader(Headers.MessageIntent, MessageIntentEnum.Send.ToString());
        sendOptions.RouteToThisEndpoint();
        await functionEndpoint.Send(new ForceAutoSubscription(), sendOptions, executionContext, logger);
    }
}
&lt;/code&gt;&lt;/pre&gt;
&lt;p&gt;Note: &lt;code&gt;ForceAutoSubscription&lt;/code&gt; is a control message and will neither require a message handler to be defined nor will it cause recoverability to be executed.&lt;/p&gt;
&lt;p&gt;The final result is what we needed. The endpoint is subscribed to &lt;code&gt;SimpleEvent&lt;/code&gt;, and it's part of the topology. This means there's a rule under the endpoint's subscription.&lt;/p&gt;
&lt;p&gt;&lt;img src="https://aspblogs.blob.core.windows.net:443/media/sfeldman/2021/automatic-nservicebus-topology-creation-for-function/image.png" alt="event-subscription" /&gt;&lt;/p&gt;
&lt;h2&gt;Summary&lt;/h2&gt;
&lt;p&gt;With this in place, we can bootstrap NServiceBus Function hosted endpoint using Azure Service Bus transport (preview 0.5 and later) w/o the need to manually provision the topology.&lt;/p&gt;
&lt;p&gt;P.S.: if you're interested in Azure Functions supporting an opt-in queue creation, here's a &lt;a href="https://github.com/Azure/azure-functions-servicebus-extension/issues/130"&gt;feature request&lt;/a&gt; you could upvote.&lt;/p&gt;
</description><pubDate>Fri, 15 Jan 2021 04:36:00 GMT</pubDate><guid isPermaLink="true">https://weblogs.asp.net:443/sfeldman/automatic-nservicebus-topology-creation-for-function</guid><category>Azure</category><category>AzureServiceBus</category><category>Functions</category><category>NServiceBus</category></item><item><title>Automatically create Service Bus trigger queue for Azure Function</title><link>https://weblogs.asp.net:443/sfeldman/automatic-queue-creation-for-function</link><description>&lt;p&gt;&lt;img src="https://aspblogs.blob.core.windows.net:443/media/sfeldman/2021/automatic-queue-creation-for-function/chicken-egg.jpg" alt="header" /&gt;&lt;/p&gt;
&lt;p&gt;Azure Functions are great. Take HTTP triggered Function. You make a request, it's passed into the Function code, the code is executed, and that's it. Simple. What does it take to deploy an HTTP-triggered function? Packaging and deploying it.&lt;/p&gt;
&lt;pre&gt;&lt;code&gt;[FunctionName(&amp;quot;HttpTriggerFunc&amp;quot;)]
public async Task&amp;lt;IActionResult&amp;gt; Run(
    [HttpTrigger(AuthorizationLevel.Function, &amp;quot;get&amp;quot;, &amp;quot;post&amp;quot;, Route = null)]
    HttpRequest req, ILogger log)
{
    log.LogInformation(&amp;quot;C# HTTP trigger function processed a request.&amp;quot;);

    string name = req.Query[&amp;quot;name&amp;quot;];

    return name != null
        ? (ActionResult)new OkObjectResult($&amp;quot;Hello, {name}&amp;quot;)
        : new BadRequestObjectResult(&amp;quot;Please pass a name on the query string&amp;quot;);
}
&lt;/code&gt;&lt;/pre&gt;
&lt;p&gt;If only all triggers were that simple. Let's take a queue triggered Function.&lt;/p&gt;
&lt;p&gt;Let's write a function that is triggered by incoming messages on a queue called &lt;code&gt;myqueue&lt;/code&gt; and logs its label to mimic the message's processing. Here's how the code would look like:&lt;/p&gt;
&lt;pre&gt;&lt;code&gt;[FunctionName(&amp;quot;ServiceBusQueueTriggerCSharp&amp;quot;)]                    
public Task Run([ServiceBusTrigger(&amp;quot;myqueue&amp;quot;)] Message message, ILogger log)
{
    log.LogInformation($&amp;quot;Received message with label: {message.Label}&amp;quot;);
    return Task.CompletedTask;
}
&lt;/code&gt;&lt;/pre&gt;
&lt;p&gt;What does it take to deploy a Service Bus triggered function? Packaging and deploying it? Unfortunately not that simple. The queue that we'd like the function to be listening to has to be provisioned first. But to trigger the function the message has to arrive from that queue. This means it has to be there in the first place before the function even runs. A queue-triggered function will only execute if there's a message, i.e. a queue has to be there. That's sort of a chicken and egg situation.&lt;/p&gt;
&lt;p&gt;The obvious solution is to provision the queue first and then deploy the function. While some even prefer this controlled infrastructure deployment, some prefer not to split the queue provision and the deployment of the function. I.e. have the function to create what's needed. What's gives?&lt;/p&gt;
&lt;p&gt;Sometimes, a brute force approach is an approach to take. If you're using statically defined Functions, have a look at using the &lt;code&gt;FunctionsHostBuilder&lt;/code&gt; approach. It enables the generic host approach and DI container use with Functions. It also opens up the option of executing an arbitrary code when setting up dependencies. And it runs &lt;em&gt;before&lt;/em&gt; any trigger, upon function startup.&lt;/p&gt;
&lt;pre&gt;&lt;code&gt;[assembly: FunctionsStartup(typeof(Startup))]
public class Startup : FunctionsStartup
{
    public override void Configure(IFunctionsHostBuilder builder)
    {
      // DI setup
    }
}
&lt;/code&gt;&lt;/pre&gt;
&lt;p&gt;This is the spot that could be used to &amp;quot;hack&amp;quot; the provisioning of the necessary infrastructure! Adding a helper method to create the queue:&lt;/p&gt;
&lt;pre&gt;&lt;code&gt;static async Task CreateTopology(IConfiguration configuration)
{
    var connectionString = configuration.GetValue&amp;lt;string&amp;gt;(&amp;quot;AzureWebJobsServiceBus&amp;quot;); // this is the default connection string name
    var managementClient = new ManagementClient(connectionString);
    
    if (!await managementClient.QueueExistsAsync(&amp;quot;myqueue&amp;quot;))
    {
        await managementClient.CreateQueueAsync(&amp;quot;myqueue&amp;quot;);
    }
}
&lt;/code&gt;&lt;/pre&gt;
&lt;p&gt;All that's left is to call the helper method from the Configure class. Unfortunately, this would be calling an asynchronous helper method from a synchronous &lt;code&gt;Configure&lt;/code&gt; method, which will require somewhat dirty implementation but hey, à la guerre comme à la guerre!&lt;/p&gt;
&lt;pre&gt;&lt;code&gt;[assembly: FunctionsStartup(typeof(Startup))]
public class Startup : FunctionsStartup
{
    public override void Configure(IFunctionsHostBuilder builder)
    {
      CreateTopology(builder.GetContext().Configuration).GetAwaiter().GetResult();
    }
}
&lt;/code&gt;&lt;/pre&gt;
&lt;p&gt;That's it. Now the function can be deployed, and no need to worry about queue deployment. The helper method is invoked once only when a function instance is created or scaled-out. A small price to pay to not worry about queue provisioning: the same approach can be applied to subscription-triggered functions.&lt;/p&gt;
</description><pubDate>Fri, 08 Jan 2021 07:41:25 GMT</pubDate><guid isPermaLink="true">https://weblogs.asp.net:443/sfeldman/automatic-queue-creation-for-function</guid><category>Azure</category><category>Functions</category><category>AzureServiceBus</category></item></channel></rss>