Functional queueing with RabbitMQ and F#

RabbitMQ is a great open source messaging broker with very good .NET support. In this blog post I will be looking at an example of how to use the RabbitMQ .NET client with F#, wrapping it to achieve a more functional approach to using RabbitMQ.

In our example we will see how we can use RabbitMQ to queue strongly typed messages, and create a consumer that subscribes to the queue, receiving the messages as they arrive (push-based, rather than polling for new messages).

The example is "order system"-flavored. It doesn't do much, but when we run it, it will queue and consume an "Order"-message using RabbitMQ.

If you want to run the example locally, go and grab the code from the FsharpRabbitExample repo at GitHub.

Queue.fs

Let's start by taking a look at the code wrapping the .NET client. First up we have a discriminated union defining some strongly named queues that our application can use for communication.

We also have a private function for resolving the actual name of the queue based on the Queue-type.

[<AutoOpen>]
module Queue

    open System.IO
    open System.Runtime.Serialization.Formatters.Binary

    open RabbitMQ.Client
    open RabbitMQ.Client.Events

    // Named queues
    type Queue =
    | OrderReceived
    | OrderPaid
    | OrderFulfilled

    let private resolve queue =
        match queue with
        | OrderReceived -> "orders"
        | OrderPaid -> "payments"
        | OrderFulfilled -> "fulfillments"

Next up are two functions for serializing and deserializing the messaging objects we want to send using RabbitMQ.

    // Serialization
    let private toBytes message = 
        let formatter = BinaryFormatter()
        use stream = new MemoryStream()
        formatter.Serialize(stream, message)
        stream.ToArray()

    let private fromBytes<'T> (message : byte[]) =
        let formatter = BinaryFormatter()
        use stream = new MemoryStream()
        stream.Write(message, 0, message.Length)
        stream.Seek(0L, SeekOrigin.Begin) |> ignore
        unbox<'T>(formatter.Deserialize(stream))

Then we have a shorthand function for declaring queues, used for ensuring our queue exist before we attempt to access it.

    // Helpers
    let private declare (channel : IModel) queueName =
        channel.QueueDeclare(queueName, true, false, false, null)

Here we have some config, you probably want to alter these values if you run the example locally. We also create a connection factory which we will use next.

    // Config
    let private host = "127.0.0.1"
    let private port = 5672
    let private userName = ""
    let private password = ""
    let private exchange = ""

    let private factory = ConnectionFactory(HostName = host, Port = port, UserName = userName, Password = password)    

Now everything is ready for us to create our first public function "enqueue", used for adding messages to a queue.

We create a new connection to our RabbitMQ server and a model, which provides us with the operations we can perform against our RabbitMQ server.

After resolving the name of the queue and serializing the message we are ready to publish our message.

    // Add a message to queue
    let enqueue queue message = 
        use connection = factory.CreateConnection()
        use model = connection.CreateModel()

        let queueName = 
            queue 
            |> resolve
            |> (fun qn -> declare model qn |> ignore ; qn)

        let serializedMessage = 
            message 
            |> toBytes

        model.BasicPublish(exchange, queueName, null, serializedMessage)

For receiving messages from RabbitMQ we create a function "subscribe", which takes the type of message we expect to receive as a type argument.

It also takes a function to be invoked when a new message arrives.

We then simply wrap an EventingBasicConsumer and instruct it to deserialize incoming messages before passing it to the callback.

The subscribe function returns an anonymous function that can be used to close the connection to the RabbitMQ server.

    // Start push-based subscription to queue
    let subscribe<'T> queue callback = 
        let connection = factory.CreateConnection()
        let model = connection.CreateModel()

        let queueName =
            queue
            |> resolve
            |> (fun qn -> declare model qn |> ignore ; qn)

        let consumer = EventingBasicConsumer(model)
        consumer.Received.Add((fun message -> 
            message.Body
            |> fromBytes<'T>
            |> callback
        ))

        model.BasicConsume(queueName, true, consumer) |> ignore

        (fun () -> 
            model.Close()
            connection.Close())

Program.fs

With our queue wrapper in place we can create a small application to send and receive messages using it. We start by defining a type "Order" that we will be using as the message type.

open System

// Object used as queue message
type Order = { OrderId : int }

[<EntryPoint>]
let main argv = 

We then create a new subscription for "Order"-messages arriving to the "OrderReceived"-queue, providing a callback function to handle incoming messages. The callback function just prints the new order to the console.

    // Set up subscription
    let cancelOrderReceivedQueue = subscribe<Order> OrderReceived (fun message -> 
        printfn "Order received! Id #%d" message.OrderId // Message is strongly typed
    )

Next we use partial application to create a new function "enqueueOrder" that takes just the message as an argument and adds it to the "OrderReceived"-queue.

    // Create function for adding message to specific queue using partial application
    let enqueueOrder = (enqueue OrderReceived)

Then we create a loop that will keep adding new orders until the user presses escape. When escape is pressed we use the "cancelOrderReceivedQueue" function to close the connection to RabbitMQ.

    // Keep looping until user quits
    let rec loop id =
        let char = Console.ReadKey()
        match char.Key with
        | ConsoleKey.Escape -> cancelOrderReceivedQueue()
        | _ ->
            enqueueOrder { OrderId = id }
            loop (id + 1)

    printfn "Press a key to order, [ESC] to exit"
    loop 0

    0

Running it

When we run our example, every time we press a button, the following happens.

  1. A message with an incremented Id is serialized and added to the "OrderReceived"-queue.
  2. The message is immediately delivered, deserialized, and processed by the callback, writing a message to the console.

It looks something like this.

Console output

Wrapping up

While this isn't exactly ready to go into production I hope it illustrated how libraries can be abstracted in F# to give our code a more functionally idiomatic style.

Make sure to grab the code from my GitHub account if you want to experiment with it locally.

Also feel free to comment and provide feedback!

View Comments