I’ve been very interested in the possibility of building more useful applications by analyzing how users are moving through the applications we build, how they interact with them, and how that changes as the application evolves.

Over the past few weeks I’ve been trying to build a system for analyzing user behavior through my streaming Snowplow Analytics stack.

It’s been pretty tough.

This is something I’ve wanted to do for a while. But I’ve always been overwhelmed by the sheer complexity of managing and manipulating the large streams of data.

How can I handle the large volumes of data in an efficient way? What if I lose some data? What if I duplicate the data? What if these issues lead my coworkers to draw false conclusions about our users?

These are all very valid questions. Questions which I am probably not the most well-equipped in the world to answer.

But – We all have to start somewhere. We can only spend so much time lying around, waiting for someone else to build our dream system. No better time than the present to get our hands dirty and learn a thing. Do something interesting.

Plus, it’s fun.

Here, I’ve written down some of the issues that have come up and how I’ve been solving them so far.

The analytics stack that I’m working with is running, in real-time, through a Kinesis Stream using a streaming version of Snowplow Analytics setup that I have described how to build.

I have a Github project that I’ve started. It uses all of these techniques. It is not currently well documented or easy-to-use. But if it proves useful, I will be making it more well-documented and well-tested in the future!

Event Re-Ordering

One thing that I’ve been wanting to build is a real-time user segmentation system and user funnel tracking system.

This means that the order of events matters. If we want to see a list of people who’ve read Part 1 – The Snowplow Collector and then read Part 2 – The Snowplow Stream Enrichment Process, We need to make sure that the events are processed in the correct order.

We want to prevent this scenario:

Event 2
Event 3
Event 1
Event 4

If the order could be mixed up, the funnels would be incorrect and we could draw the wrong conclusions for how people are flowing through the website.

At-least-once messages

The first thing to understand is that Kinesis has at least once message guarantees. This means that, if a message makes it to Kinesis, we will always receive it at least one time.

It also means that we might receive the same message more than once.

This affects us in an interesting way:

  • Data can be duplicated
  • Event order can be messed up through data duplication

This is what that might look like:

Event 1
Event 3
Event 2
Event 2
Event 1
Event 4

We’ll handle the actual data duplication later in this article.

But, the thing to note here is that the Kinesis system itself cannot guarantee ordering. Snowplow also can’t really do anything to fix that, from their end.

Derived Timestamp VS Collector Timestamp

Another thing to note is that the Snowplow Trackers themselves can’t ensure that the data is received by the Snowplow Collector in the correct order.

This isn’t really a flaw in the Snowplow Tracker so much as an inconvenience caused by the intricacies of networking on the internet.

The Snowplow Javascript Tracker, for example, will make a series of network calls to the Snowplow Collector.

These calls happen asynchronously and the network may drop or slow down any given request. If an analytics event were dropped, it would need to be retried. By the time it’s retried, another analytics event, that technically happened at a later time, might be received by the collector.

Event 2 - Happened at 1:03 (Derived). Received at 1:03 (Collector)
Event 1 - Happened at 1:02 (Derived). Received at 1:04 (Collector)

The gist of this is that, due to the constraints of the physical world, the Collector Timestamp is not a good measure of the actual time an event occurred. Nor is the order of events pushed to the Kinesis stream guaranteed to correspond to the order in which they actually happened.

None of these issues will probably be solved any time soon.

Luckily, Snowplow has added another timestamp to event records called the Derived Timestamp. This uses all the information the event has and uses it to guess when the event actually occurred.

In the case of the Javascript Tracker, it can make use of a client-side timestamp that can be recorded at the time of execution.

More can be read about the topic of the Derived Timestamp, from the Snowplow team by taking a look at their blog

They often post a lot of useful information about the decisions they’ve made throughout building the system. Definitely a good read!

All of this will be super useful to us in our goals to log events in the correct order. But neither Kinesis nor Snowplow will solve this problem for us and so we need to take it into our own hands.

The Fix: Timed Event Buffers

Imagine we’re reading from the Snowplow Event Stream – one event at a time. As events come in for a specific user, we monitor a user’s lifecycle and figure out at what point in the funnel the user is in.

If we only know about one event at a time, it is impossible for us to detect and recover from unordered events.

Imagine we read all a user’s events into memory, sort them by when they occurred, and then determined where in each funnel the user should be placed. That’d do a better job of detecting events ordered incorrectly.

But – If we read all of a user’s events into memory, and calculate funnels on the fly, we will risk both running out of memory and taking a prohibitively long time to decide what funnels to put the user in.

We need a compromise.

Let’s say that we know, from our past experience, that Kinesis doesn’t often duplicate events across large spans of time.

Let’s also say that we know, from our past experience, that the Snowplow Trackers we use usually don’t often mix up the order of events that occurred with large spans of time in between them.

So if one event happened, say, an hour ago – It probably won’t show up in our stream again, 60 minutes later. It also probably won’t show up in our stream after an event that happened 30 minutes ago.

With these constraints, we can make a reasonably accurate solution.

What we can do is create an in-memory buffer in our application that reads in all the events it can over the span of, say, one minute.

It can then re-order these events in memory, sorted by their Derived Timestamp.

The application can then process this re-ordered list of events and repeat the process.

Here’s some pseudocode to show what this means:

bufferedEvents = [] for 1.minute { nextEvent = getNextEvent() bufferedEvents.append(nextEvent)
} sort(bufferedEvents) ... process this sorted list of events and repeat ...

We can modify this time-frame as our traffic scales so that we know with a decent amount of certainty that this process won’t run out of memory or take too long.

From my past experiences with my particular application, I know that events are less likely to be incorrectly ordered by over a minute of time. I can exploit that to have more accurately ordered events without the system becoming prohibitively expensive to maintain.

We have reduced the negative affects of incorrect event ordering on our real-time analytics processing system.

Event Duplication

In the past section, we brought up the fact that Kinesis is an at-least-once message handling system. Which means that we might receive the same message more than once.

This is a problem when we want to count how many times a given user has done an action. For example, if we want to keep a running count of, say, how many pages a given user has viewed.

Event 1 - Viewed Page
Event 1 - Viewed Page (Duplicate)
Event 1 - Viewed Page (Duplicate)
Event 2 - Viewed Page Viewed Pages Count: 4

Event duplication would cause us to count the same event multiple times and over-represent the usage of our application by any given user. Messing with all our statistics and usage patterns.

The Fix: Event Fingerprinting and Event Logging

The Snowplow Stream Enrichment process has a handy-dandy enrichment that can add an event_fingerprint property to our Snowplow events.

There’s no real secret to how this fingerprinting works – It takes all the useful properties from a given event, aggregates them together into a big string, runs MD5 over it and stores that string in the event_fingerprint field.

The process looks something like this:

fingerprint := md5(evt.UserId + evt.NetworkUserId + evt.SeAction + ...)

This is great for us. The Snowplow team has said this is a good process for event de-duplication. So we don’t have to guess whether or not we’ve done something wrong or missed an important attribute.

How do we actually make use of it?

We want to keep track of all the fingerprints we’ve processed in recent memory and make sure that if we see them again, we’ll ignore them and don’t re-process them.

... receive event ... if seenBefore { ... skip ... } else { ... process ... } 
What is “recent memory”?

That really depends on the amount of data moving through the system, how likely it is to be duplicated, and how we recover in the case of failure.

If recent memory is too long, it’ll take a larger amount of storage space, memory or horsepower to search through it.

Too short, and some duplicate events might slip through.

In my case, I currently define “recent memory” as within the past hour.

In practice, this has provided enough time to ensure that most duplicate events will be caught while being short enough that I don’t require a large amount of resources to maintain it.

Persistence of the Event Log

We can keep some of this event log in memory, in the running application. To save us some time in the case of a duplicate being found. This is a good process just to have multiple layers of assurances that we’re not duplicating data.

seenFingerprints = {} for evt in events { if seenFingerprints[evtFingerprint] { continue } ... process event ... seenFingerprints[evtFingerprint] = true
}

But, we want to persist this event log either to disk or to another caching system so that if our application crashes, we can recover from where we left off and still not duplicate the data.

Our application crashing is where we most need to prevent duplicate events from being processed. If our application restarts, we don’t want to lose data – nor do we want to process data twice.

We can persist our event log to the hard drive:

seenFingerprints = {} for evt in events { if seenFingerprints[evtFingerprint] { continue } ... process event ... seenFingerprints[evtFingerprint] = true
} saveToDisk(seenFingerprints)

But, I use Terraform for a ton of stuff. Which means that I destroy servers, and by extension their hard drives, all the time. So I store my event log in Redis. Where the key is the event fingerprint and the value is a boolean.

In my own projects, when I need to check if I’ve seen an event before, I try to fetch the fingerprint from my Redis data store. If it’s found, we know that we’ve seen the event before. This also allows us to set the expiration of the fingerprint for an hour to make collisions very unlikely.

if !seenFingerprints[fingerprint] && !inRedis(fingerprint) { ... process event ... seenFingerprints[fingerprint] = true
}
... saveToRedis(seenFingerprints).ForOneHour()

None of these methods will be 100% effective. If there was a 100% effective, generalized solution to these problems, then we wouldn’t need to code our own.

But these methods will make it much less likely to negatively affect us. To the point where we can be assured that we’re working from accurate information.

Event Loss

The internet is a complicated place and some events are going to get lost before they hit your servers. That problem can’t really be solved.

But what we can do is make it so that if an event hits our server, it doesn’t get lost on its way to our backend.

Automated Application Restarts

If our application ever crashes, which it will because the world is a messy place and we are in control of nothing, we should make sure that it is restarted without the need for manual intervention.

Supervisor is a nice tool that we can install on our server to automatically restart our application and automatically re-launch our application on server restart.

A nice little article about installing Supervisor on an Ubuntu server, written by DigitalOcean, can be found here.

Pessimistic Kinesis Checkpointing

Most libraries that help us read from a Kinesis stream have built-in tools for a process called checkpointing.

Checkpointing is essentially a way to keep track of the last record that we successfully processed so that if our server crashes, we can start from where we left off.

It often uses a backend like Redis or DynamoDB to persist the checkpoint information in case of a server crash.

More on the process of Kinesis Checkpointing can be found here.

Though, in practice, a lot of the libraries that allow us to consume Kinesis will have checkpointing built in. So we don’t have to deal with it manually.

I’m going to actually be a little pessimistic and only checkpoint every 5 minutes.

... do all processing ... every.5.minutes { ... save latest position in Kinesis stream ...
} 

If my application crashes at any point during that 5 minutes, it will start from the beginning of the 5 minute period.

This restart would theoretically cause the application to re-process events in the case of a crash.

But, due do the duplication prevention that we programmed earlier, we should be fine in most cases!

Event Corruption

This is all well and good – But if the data is corrupted and causes consistent crashing of our application, it could create a constant start-fail-restart loop that causes our pipeline to halt.

Snowplow actually takes care of most of this for us – Both the collector and the enrichment process filter out corrupted data before it hits our application.

That being said – Our programming probably isn’t perfect. Our application could crash on future updates or if our application has a bug in it.

We should make sure that we have some way to monitor the system and be alerted if it stays crashed for an extended period of time.

... if the application crashes 5 times in a row, email me ...

We could also set up a process for logging and then skipping problematic events. But that could prove confusing and problematic. Especially in the case of a large bug in the application.

I’d recommend this process when the codebase has matured and become well-tested:

err = processEvent(evt) if err != nil { logEventBroken(evt)
} ... continue processing after skipping event ...

It’d be annoying to come back after a weekend to find that we lost days worth of data!

Application State Persistence

I’ve been doing all of these things for one main goal:

Real-time Usage Tracking

But, we’ve already brought up the fact that the application can crash at any given time.

We can’t keep a representation of the user in memory and append to it because we might lose that data in the case of a crash. We want to save it somewhere and retrieve it when needed.

... process a bunch of events ... persistUsageHistory(userId, usageHistory) ... Or, in the event of a crash or reload ... usageHistory = retrieveUsageHistory(userId)

So far I’ve found that ElasticSearch is a good place to store our aggregation data for later retrieval.

If a user hasn’t been seen, or hasn’t been seen in a while, we can load the stored data for that user out from ElasticSearch extremely quickly and then manipulate the user in memory upon future events.

We can persist the changes made to the in-memory user object periodically as it changes. We can remove that user from memory if they haven’t been seen in a while – to prevent overloading of the server.

This also gives us the benefit of using Kibana for data visualization. Kibana is a super powerful dashboarding software for data exploration.

This dashboard also lets us see at a glance if our data is looking alright. To see that the system is working.

Bonus: Parallel Computing

It can be tough to keep up with the large amounts of data hitting our application at fast speeds. If we’re too slow, we’ll fall behind and lose the benefit of being able to act on the data quickly.

I’ve been using the Go programming language and been using goroutines and channels everywhere it makes sense. This allows me to take full advantage of the server my application runs on. It does that without the large complexity often associated with threads.

userManager := NewUserManager()
userManager.AddEvent(evt) ... in userManager ... func NewUserManager() UserManager { ... um.Events = make(chan SnowplowEvent) go um.ProcessLoop() return um
} func (um *UserManager) ProcessLoop() { for { evt := <-um.Events ... }
} func (um *UserManager) AddEvent(evt SnowplowEvent) { um.Events <- evt
}

It is likely that one day, I’ll need to shard the data and distribute the processing amongst multiple servers. But no company I’ve used this with currently has enough data flowing through its analytics system, or intense amounts of real-time processing, to warrant such a complexity.

I feel as though most companies are in such a position – Having analytics data but not enough to need distributed computing. So the tutorial here should be a good start into exploring their analytics.

I may one day make a tutorial for using this in a distributed system.

But for now, this is it!

Have fun!

Go gain some insights from how your users are using your products!

You might find that some assumptions you’ve been making are incorrect and that these usage patterns will help you to develop better products and features.

I’ll probably be writing more articles about this in the future. So, if you’re interested in learning more about behavioral analytics or predictive analytics – subscribe to the blog!

Tags: