ServiceBus queue trigger service in Azure Service Fabric

One of the things I really like about Azure Web Jobs are the triggers. The concept of writing a small job that triggers every time a message arrives on a queue (for instance) is really powerful.

I need the same sort of behavior for applications I am writing using Service Fabric. Fortunately that is easy to achieve. In this post I will show how to react to messages arriving on an Azure ServiceBus queue using a Service Fabric stateless service.

The example is written in F# and the source code is available at https://github.com/clausasbjorn/ServiceFabricServiceBusTrigger.

The example solution consists of three projects.

  1. ServiceBusTrigger.Common, containing a wrapper around the ServiceBus library that we will use to listen for and send messages using Azure ServiceBus queues. It also contains an "ImportantMessage"-type that we will use as the message.

  2. ServiceBusTrigger.Client, which is a small console application that will add a single "ImportantMessage" to our queue.

  3. ServiceBusTrigger, containing a generic stateless "QueueService" and a "ImportantMessageService"-service that derives from it triggering every time a new "ImportantMessage" arrives in our queue.

ServiceBusTrigger.Common

Basic queueing

At the most basic level we need to be able to send and receive messages using an Azure ServiceBus queue. We will use the standard Microsoft.ServiceBus functionality to achieve this.

namespace ServiceBusTrigger.Common

open System
open System.Threading
open Microsoft.ServiceBus
open Microsoft.ServiceBus.Messaging

module Queueing =

    let private connectionString = @"<YOUR SERVICEBUS CONNECTION STRING HERE>"

    let send queue (message : Object) =
        let namespaceManager = NamespaceManager.CreateFromConnectionString(connectionString)
        match namespaceManager.QueueExists(queue) with
        | false -> namespaceManager.CreateQueue(queue) |> ignore
        | _ -> ()

        let client = QueueClient.CreateFromConnectionString(connectionString, queue)

        new BrokeredMessage(message)
        |> client.Send

    let internal receive<'TMessage> queue callback =
        let namespaceManager = NamespaceManager.CreateFromConnectionString(connectionString)
        match namespaceManager.QueueExists(queue) with
        | false -> namespaceManager.CreateQueue(queue) |> ignore
        | _ -> ()

        let options = new OnMessageOptions();
        options.MaxConcurrentCalls <- 1
        options.AutoComplete <- true

        let client = QueueClient.CreateFromConnectionString(connectionString, queue)
        client.OnMessage((fun msg -> msg.GetBody<'TMessage>() |> callback), options)

        client

Here we have two functions, one for sending messages, and one for receiving them. Sending messages is pretty straightforward, but receiving them is a tad more interesting.

After the initial checking and configuration we create a QueueClient listening for messages on our queue specified in the queue-parameter.

let client = QueueClient.CreateFromConnectionString(connectionString, queue)

Then we instruct the client that every received messages should be parsed as 'TMessage before being passed on to the callback function we specified as a parameter

client.OnMessage((fun msg -> msg.GetBody<'TMessage>() |> callback), options)
A queue listener

With the basic queueing functionality in place we can create a small queue listener.

namespace ServiceBusTrigger.Common

open System
open System.Threading
open Microsoft.ServiceBus
open Microsoft.ServiceBus.Messaging

type QueueListener<'TMessage>(queue, callback) =

    let receiver = Queueing.receive<'TMessage> queue callback

    member this.Stop() =
        receiver.Close()

This is basically an object that can be instantiated to listen for messages of a given type 'TMessage on a given queue, handling each received message using a given callback function.

It uses the basic queueing functionality to create a QueueClient, that will keep listening for messages until it is shut down.

ServiceBusTrigger.Client

The test client is not at all interesting, so let's just get it out of the way. It simply uses the basic queueing functionality to send a single message of type "ImportantMessage" to a test queue. After sending the message the program terminates.

open System
open ServiceBusTrigger.Common
open ServiceBusTrigger.Common.Messages

[<EntryPoint>]
let main argv = 
    Queueing.send "test-queue" { Message = "Very important stuff!" }
    0

ServiceTrigger

A generic stateless queue service

To be able to re-use our code for multiple Azure Service Fabric services we create a generic stateless service that uses our queue listener.

namespace ServiceBusTrigger

open System.Fabric
open Microsoft.ServiceFabric.Services.Communication.Runtime;
open Microsoft.ServiceFabric.Services.Runtime;
open ServiceBusTrigger.Common

type QueueService<'TMessage>(serviceContext, queueName, callback) = 
    inherit StatelessService(serviceContext)

    let mutable queue : Option<QueueListener<'TMessage>> = None

    let stop () =
        match queue with
        | None -> ()
        | Some q -> q.Stop()

    override this.OnAbort () =
        stop()
        base.OnAbort()

    override this.OnCloseAsync(cancellationToken) =
        stop()
        base.OnCloseAsync(cancellationToken)

    override this.OnOpenAsync(cancellationToken) =
        queue <- Some (new QueueListener<'TMessage>(queueName, callback))
        base.OnOpenAsync(cancellationToken)

When the OnOpenAsync-method is run (when the stateless service is started) we will set up a QueueListener listening on the queue specified. We also implement the OnCloseAsync-method allowing the service to be shut down as well.

A queue service implementation

With the generic stateless queue service ready we can create an actual implementation listening for a specific type of messages on a specific queue.

namespace ServiceBusTrigger

open ServiceBusTrigger.Common.Messages
open Microsoft.ServiceFabric

module ImportantMessageBehavior =

    let received (message : ImportantMessage) =
        // Handle that important message
        ()

type ImportantMessageService(context) = 
    inherit QueueService<ImportantMessage>(context, "test-queue", ImportantMessageBehavior.received)

Here we have created a stateless service of type "ImportantMessageService", which listens for "ImportantMessage"-messages on the queue called "test-queue" and handles them using the ImportantMessageBehavior.received function.

Registering the service

The last step is registering our stateless ImportantMessageService as we would register any stateless Service Fabric service.

open System
open System.Threading
open System.Threading.Tasks
open Microsoft.ServiceFabric.Services.Runtime
open ServiceBusTrigger

[<EntryPoint>]
let main argv = 

    try
        ServiceRuntime.RegisterServiceAsync("ServiceBusTriggerType", fun context -> new ImportantMessageService(context) :> StatelessService).GetAwaiter().GetResult()
        Thread.Sleep(Timeout.Infinite)
    with
    | :? Exception as e -> ()

    0

Running it

We are now ready to deploy our service to the Service Fabric cluster. When it runs we can add a new message to the queue using the test client and it should immediately be picked up and handled by our service using the callback function.

Azure Service Fabric Service Bus trigger

That's it. I hope the example will serve as inspiration for creating new types of endpoints for your Azure Service Fabric applications!

View Comments