Real-time Meetup.com data crunching using Azure Stream Analytics

Azure Stream Analytics provides a really straightforward way to process a lot of data continuously using what is basically a limited SQL syntax with some extra stuff added for handling the time aspects of processing data continuously.

It's intended for processing millions of messages per second to derive insights from your data in near real-time. The idea is to provide you with tools to process the vast amounts of data that is becoming increasingly common due to the Internet of Things, connected devices etc.

In this post I will illustrate how you can set up an analysis pipeline and start crunching some data using Stream Analytics.

What we will do is go grab some live RSVP data from Meetup.com, crunch it using Stream Analytics, and feed it to a dashboard to create a near real time leaderboard of the most RSVPed groups at the moment.

Meetup meets Stream Analytics

If you want to skip the details and just see the result, go ahead and take a look at the dashboard now. This is a proof of concept, so there is no guarantee that the dashboard will be working at all times :)

A quick introduction

Let's start by taking a look at the basics first. Processing using Stream Analytics can be described as a three step process.

  • Data input
  • The query that does the actual data crunching
  • Data output

Stream Analytics Azure Dashboard

Data input

Stream Analytics jobs receive data from EventHubs (you can also use Blob storage for static reference data, but that is not relevant for this post). Data posted to the EventHubs you have specified as inputs for the job will be picked up and processed.

You have a choice of different data formats. In this case I have chosen JSON.

Query

You write a query for your job directly in the Azure management portal. The query is a subset of SQL and rather than referencing tables, you reference the inputs you have provided.

Data output

You specify outputs that your query will write to whenever it produces new data. You have several options.

  • Database
  • Blob storage
  • EventHub

Databases are great for providing the results as working data for your applications, while blobs are more suitable for archiving. EventHubs, which we will use in this case, is a good choice if you want to continue working on your data after processing.

By outputting to an EventHub, the data could become input for another Stream Analytics job, and allow you to build a more sophisticated analytics pipeline.

More background info

If you want to read up on Stream Analytics I suggest you take a look at these resources.

You should also visit Preview features to sign up for the preview to get access. You should get access immediately.

Let's get this thing running

I chose Meetup.com as the subject because they are kind enough to provide us with a real live stream of events running through their system. The data we have access to is RSVPs delivered by websocket, it's not exactly "millions of events" per second, but it's plenty to play around with Stream Analytics.

Take a look at the Meetup RSVP ticker to get an idea of the data we are working with.

What we want to achieve is this.

Analytics Pipeline

  1. Meetup provides all RSVPs as websocket events
  2. We listen for those events in a console app and push them on an EventHub
  3. A Stream Analytics job counts the number of RSVPs per unique Meetup-group in the last 5 minutes using the RSVP events on the EventHub. It does so every 10 seconds and pushes the result to another EventHub.
  4. A console app listens for for events on this EventHub, grabs top 10 from every event and pushes it to a dashboard at TheDash

If you want to try for yourself

You should go get the code at the GitHub repository. You will have to set up a few things in Azure before you get started.

  • A storage account
  • An EventHub for pushing RSVP messages to
  • An EventHub for pushing Stream Analytics results to

Get Started with Event Hubs has all the info you need to get your EventHubs running.

The RSVP ingest

Our RSVP ingest is very simple, it's just a console app that grabs messages from a websocket connection, does a bit of parsing and transformation, and publishes them to an EventHub.

namespace Streamup.Ingest
{
    class Program
    {
        private static WebSocket _websocket;
        private static EventHubClient _eventHub;

        static void _websocket_MessageReceived(object sender, MessageReceivedEventArgs e)
        {
            var rsvp = JsonConvert.DeserializeObject<Rsvp>(e.Message);

            var flat = new FlatRsvp()
            {
                IsGoing = rsvp.IsGoing,
                RepliedAt = rsvp.RepliedAt,
                RepliedMinutesBeforeEvent = (int) rsvp.Event.BeginsAt.Subtract(rsvp.RepliedAt).TotalMinutes,
                GroupId = rsvp.Group.Id,
                GroupName = rsvp.Group.Name,
                GroupCountry = rsvp.Group.Country,
                EventId = rsvp.Event.Id,
                EventName = rsvp.Event.Name
            };

            var serialized = JsonConvert.SerializeObject(flat);

            _eventHub.Send(new EventData(Encoding.UTF8.GetBytes(serialized)));
        }

        static void Main(string[] args)
        {
            // Init event hub to write events to
            var connectionString = ConfigurationManager.AppSettings["Streamup.EventHub.ConnectionString"];
            var eventHubName = ConfigurationManager.AppSettings["Streamup.EventHub.Name"];
            _eventHub = EventHubClient.CreateFromConnectionString(connectionString, eventHubName);

            // Init websocket for fetching Meetup data
            _websocket = new WebSocket("ws://stream.meetup.com/2/rsvps");
            _websocket.MessageReceived += _websocket_MessageReceived;
            _websocket.Open();

            Console.ReadLine();
        }
    }
}

I use Websocket4Net to grab events from Meetup.com and parse them using Json.NET before publishing them to the EventHub. Notice that I simplify the data structure a bit (FlatRsvp) to make the Stream Analytics job in the next step a little simpler.

The event we push to the EventHub contains quite a lot of data (so we can do more sophisticated analytics later if we are so inclined), but for now we only really need the GroupId, GroupName and GroupCountry

The actual Stream Analytics job

When you create a new Stream Analytics job in Azure you start by specifying Inputs and Outputs. We add our first EventHub as input (the one we publish incoming RSVPs to), and the second EventHub as output for the Stream Analytics job to write to when it generates new events.

Create inputs and outputs

Then we can go ahead and set up the actual query by clicking on the "Query"-tab and writing our SQL-like query directly in the query window in the Azure Management portal.

Query window

Let's take a closer look at the query code.

SELECT
    COUNT(1) AS RsvpCount,
    GroupId,
    GroupName,
    GroupCountry
FROM 
    [streamup-in]
GROUP BY HoppingWindow(second, 300, 10), GroupId, GroupName, GroupCountry

For the most part this looks like a standard SQL grouping query. But there are two things that are worth discussing.

First, the FROM clause does not reference a table, but rather the name we gave our input EventHub when we added it as an input in the previous step.

Secondly, the GROUP BY clause contains an interesting construct called "HoppingWindow". Windows are what allow us to create continuously on-going queries. They provide the time aspect to the query.

In this case we use a "hopping window", and configure it to run the query every 10 seconds, but using event data from the last 300 seconds.

The result is a query that every 10 seconds will output a count of RSVPs per Meetup-group encountered within the last five minutes. Every 10 seconds a new list is generated and published to our output EventHub for further processing.

The push to dashboard

The last component we need is a console app that will read the output events from the Stream Analytics job from the EventHub, grab the top 10 most active groups (by RSVP count) and push that list to the dashboard.

Notice that I didn't just take "TOP 10" results directly in the Stream Analytics query. That is because the TOP keyword is not supported by the query language yet.

To consume events from the EventHub we implement an IEventProcessor. I used the example in the Get Started with EventHubs guide as a template.

I don't provide the complete code here but it is of course available in the GitHub repository. The core functionality is in how we process the events from the output.

async Task IEventProcessor.ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
{
    foreach (EventData eventData in messages)
    {
        var data = Encoding.UTF8.GetString(eventData.GetBytes());

        // Extract top 10 by RsvpCount
        var groups = JsonConvert.DeserializeObject<List<GroupStats>>(data);
        var top = groups.OrderByDescending(g => g.RsvpCount).Take(10).Select(g => new {
            Group = String.Format("{0} ({1})", g.GroupName, g.GroupCountry),
            Count = g.RsvpCount
        }).ToList();

        // Build and post CSV data to Dashboard
        var csv = new StringBuilder();
        csv.AppendLine("Group,RSVPs");

        foreach (var group in top)
            csv.AppendLine(String.Format("{0},{1}", group.Group.Replace(",", ""), group.Count));

        Post(_dashboardUri, csv.ToString());
    }
}

Roughly every 10 seconds when the Stream Analytics query outputs a new event we will grab the 10 most active groups and push it to the dashboard in a CSV format. The result looks something like this.

The dashboard

If you haven't already, go check out the result on the dashboard now.

Extra bonus

Since Ingest and Push are just basic console applications, it was really easy to set them up as two webjobs in Azure and have them run continuously, taking my laptop out of the picture and having the Dashboard being updated without running things locally.

Azure Webjobs

Wrapping it up

It's not hard to come up with a list of drawbacks and limitations when using this technology. One obvious thing is the limitation of the query language itself. But also things like not having your analytics queries in source control but rather managing them directly in the portal. I can see this approach getting messy really quick for any real world scenarios.

But it's important to remember that this is still a preview, and it really seems to hold a lot of promise. Getting a simple pipeline for "analyzing" the Meetup data up and running was really quick, with the actual query being the easiest part. I have no trouble imagining real world scenarios where Stream Analytics could be a good solution and I look forward to seeing how it evolves.

View Comments