Safely Upgrading Hadoop and Cascalog

This is a cross post of the same article on the Intent Media Blog


The Intent Media data team uses Hadoop extensively as way of doing distributed computation. The big data ecosystem is evolving very quickly, and the patterns and versions we had been using are being replaced with faster and more stable platforms. This post is about how we successfully and safely upgraded our Hadoop jobs and paid down some technical debt.

Background

Many of Intent Media’s backend job flows use Hadoop to manage the sheer size of the input datasets. We have jobs that range from basic aggregation to learning models. Hadoop is an impressive framework, but we had increasingly been confronting issues with stability, missing features, and a lack of documentation. A number of indicators pointed to our version of Hadoop being too far out of date. 

For our original MapReduce jobs, we were using Hadoop 0.2, which was state of the art at the time those jobs were written, but now the framework has matured through several major versions and Hadoop 2.x is gaining adoption. We needed to at least get to 1.x to give us a path for future upgrades. 

On the data team, we write most of our MapReduce jobs in Cascalog, a Clojure domain specific language (DSL) for Cascading, a library that provides job flow abstractions for Hadoop. The Clojure syntax is fairly terse, and the code is easy to test. Cascalog itself has matured since we first used it and we needed to upgrade our library from 1.x to 2.1 to stay current with the latest stable versions. 

Pre-Upgrade: Testing Infrastructure

Before initiating our upgrade project, we needed a way to automatically test our jobs in a cluster environment. We use TDD, so most jobs had a great unit test suite and ran against local execution engines, but job behavior on a cluster with real world data can vary quite a bit. Unfortunately, a clean run with sample data on a local Hadoop jar does not guarantee flawless execution on a 30 machine cluster against production datasets. Most of the time when we define a new job, our QA team can verify expected output and execution under a variety of conditions. However, this manual process quickly becomes tedious when we have to regression test an entire set of jobs on a broad-sweeping version change.

Our first project was to write a remote job triggering and testing harness which could pull a medium sized subset of production data, execute a long running remote job, and make sanity assertions about the output. Running remote jobs is expensive, so we did not put these tests in the integration test suite (which is run on every push to a branch). Instead, engineers can run these job tests prior to a large, possibly breaking change, and we can trust than a version upgrade is safe within a few hours.

Our job execution application is written in Java, and most of our other integration tests are in cucumber, so we decided to continue with our existing technology choices for our remote cluster tests.  We chose to define scenarios in Cucumber and use Ruby to trigger jobs, tail logs, and make assertions on result data. Engineers and product managers all have the ability to see the expected job flow of each job, understand the input and output expectations, all based on a job execution run on a snapshot of real production data. 

Hadoop Upgrade

We run all our Hadoop jobs on AWS Elastic MapReduce (EMR). EMR is a powerful way to spin up entire clusters of optimized cloud instances preconfigured with Hadoop with a single API call. The Amazon team has worked to abstract a lot of the work creating and tuning a Hadoop cluster, so all we need to do is submit a job request. EMR is configured to start instances with particular virtual machine images, called Amazon Machine Images (AMIs). AMIs come in different versions and are packaged with different software, such as a specific Hadoop version. As early adopters of EMR, we were using AMI version 1.0.0, which shipped with Hadoop 0.20. To run our jobs against Hadoop 1.x, we had to modify the AMI versions in our job triggering configuration. 

The Hadoop transition from 0.x to 1.x consisted of mostly versioning and lower level changes, so fortunately our application code didn’t need to change much. We just needed to swap out the Hadoop library we compiled against and test each job carefully.  Most of these job upgrades went smoothly. 

Some of our Cascalog based jobs did not perform as expected in the transition to running on Hadoop 1.x and returned unusual output. These will need to be significantly debugged, refactored, or replaced altogether. To prevent these few jobs from blocking our path forward with the rest, we built backwards compatibility into our EMR wrapper. This lets us specify a certain AMI version to use for each type of job, so we can continue to upgrade jobs and fix incompatible ones over time.

This had the additional benefit of reducing risk in the production system. Rather than a massive, risky release trying to forcibly upgrade everything at once and hoping we tested every job scenario, we could merge each small upgrade change to master and evolve the system over time. We started slowly and found job performance and stability improved dramatically, giving us confidence to take larger steps and move faster. Rather than fearing upgrades, we could accelerate our transition. 

Cascalog 2.0 Upgrade

Last year Cascalog 2.0 launched, and it was a huge leap forward under the hood, separating the maturing Clojure API from the execution engine. After we went through the upgrade, Sam Ritchie has since written on some of the details fairly extensively here: Cascalog 2.0 In Depth

There were some breaking changes, but we found the migration to be fairly smooth. Here’s what we had to modify to make the transition:

Operations are functions, so now every operation macro is now renamed to show that it works like a function.

  • defmapop -> defmapfn
  • defbufferop -> defbufferfn

Many of the namespaces have changed to be more specific. Some functions are either particular to Cascalog or the underlying Cascading and the namespaces now reflect that.

  • cascalog.ops -> cascalog.logic.ops
  • cascalog.io -> cascalog.cascading.io

Cascalog 2 uses Hadoop 1 instead of 0.2x.

The taps are much more rich and configurable for doing both input and output. We could remove a lot of serialization/deserialization hacks for working with delimited files. For example, in Cascalog 1 the textline tap doesn’t support a tap exporting compressed strings, but it does in later versions. 

For example, our old workaround:

(?- (hfs-delimited output-path :delimiter "," :compress true :sinkmode :replace :quote "")

  (<- [?part0 ?part1 ?part2 ?part3 ?part4 ?part5]

    (get-aggregations ?field-1 ?field-2 ?field-3 ?field-4 ?field-5 ?count :> ?json)

      (clojure.string/split ?json #"," :> ?part0 ?part1 ?part2 ?part3 ?part4 ?part5))))))

Could in Cascalog 2 just become:

(?- (hfs-textline output-path :compress true :sinkmode :replace)

  (<- [?json]

    (get-aggregations ?field-1 ?field-2 ?field-3 ?field-4 ?field-5 ?count :> ?json)))))

All our jobs continued to work as expected on the new EMR AMIs, and now we have access to the latest API and documentation, making Cascalog development much easier.

Key Learnings

Tests are essential. You can have assurances that your application is correct after an upgrade if you have the right tests. Make sure you have unit tests to quickly iterate and test changes but make sure you have larger integration tests as well. We have integration tests for all our Hadoop jobs.  These tests assured us quickly that each library and machine image version was working together properly and computing the expected results correctly.

Tech debt gets more painful over time. Many software packages use some form of semantic versioning to help you understand how important an upgrade is or what it will break. For example, we knew our upgrades could both require significant prioritization because they increased by major versions. However, you can often upgrade minor versions often to get smaller stability improvements and bug fixes. Make sure the client/customer for your product recognizes the importance of allocating time to major version upgrades, and the product owner leaves bandwidth for minor upgrades along the way.

Branches should be as short-lived as possible. Long-lived branches will make your life difficult by requiring you to rebase frequently. Merge often to master to minimize conflicts.

Use feature flags to hold off execution flow to major components. We used this strategy when we built a method to allow us to specify the Hadoop version based on job, which let us test and upgrade our jobs over a period of time rather than all at once.

Pairing is great. Often an upgrade process pushes into new technical territory, but that doesn’t mean the types of issues will be totally new. Get another engineer’s input on the problem, even if you can only pair for part of the project.


Single Malts and Sehnsucht

One of my favorite places to go in NYC is the Brandy Library. It's a library quite like no other, with bottles of every imaginable spirit on bookshelves surrounding the quiet TriBeCa bar.


For around $80 you can get the "Malt Masters" tasting, a flight of six small pours of their high end single malt scotches. I almost never have Scotches older than 15 years so I thought I'd comment on this somewhat singular experience.

1. Oban 18 years
Raisin, green apple, toffee, and vanilla. Starting on a lighter note I found this gentle yet expansive in a way previous experiences with younger Obans haven't done.

2. Bruichladdich 1989 Black Art
Vanilla, Chocolate raisin, cinnamon, ripe blueberries, aniseed, and fruits. My second favorite from the batch starts out with a light mouth touch--as if it's going to give a hug--but then it punches you in the face with a rush of flavor. Everyone at the table remarked on the experience quite unlike that of other Scotches. This is a black art in both senses, lovely in the glass, immense flavor, and lingering with a powerful afterburn. This would be a great bottle to purchase.

3. AnCnoc 22 years.
Sweet and spicy with toffee, honey, cake, green apples, and flowers.

4. Springbank 18 years
Light smoke, licorice, rich & oily with dried fruits, candy, and almonds.

5. Bunnahabhain 25 years
Sherry nose, caramel, oak, leather, berries, cream, and roasted nuts.

6. Brora 30 years
Incredible body, smoke, biscuits, apple, walnut, earth, and lemon.

C.S. Lewis talks about experiences of what he terms "joy", but to more accurately understand what he means by that word, I like to call them brief moments of transport to another world. He uses the German word, "sehnsucht" which has no direct translation, but communicates some combination of longing, yearning, craving, and joy. These spiritual experiences are important and significant and I think are worth recording. Sometimes it's a view of nature. Sometimes it takes the form of a line of poetry or literature. Sometimes it's a sense like taste with food or drink. There are two caveats to these experiences. Firstly, an experience of joy seized is an experience lost. They are very rarely repeatable and usually second attempts only remind us of our first encounter. We must approach these moments with open hands and not look for repetition or addiction. Secondly, experiences of joy are very difficult to share. Often we read a chapter of a book that is transformative and gripping, recommending it to friends who afterwards find no great significance in the pages. Given those caveats though, I still believe it's worth sharing the times we experience "joy" with others.

I have had several experiences throughout my life, some brief and others strong, that I would classify in these categories. My experience drinking Brora 30 reminded me of one of these times. I've enjoyed plenty of peaty, smokey scotches, but this one transformed into something other than smoke. The nose is potent but once it enters the mouth it grows and expands as a crescendo in a symphony or a swell in the ocean. It's masculine, fearless, and endlessly complex.

Upon a little further research however, I found this particular distillery is no longer operating. This is the nature of joy. It's short, memorable, and transient. Life is fleeting. We seek permanence and hope. Sometimes we glimpse that world in moment and then it is gone to not return. Of course one could acquire one of the few remaining bottles, and perhaps I will at some point. Given what I know of joy though, the purpose wouldn't be so noble. An experience can be pursued again, but a joy cannot be retained.

My philosophy professor often said of humanity, "we are wanty, needy creatures, constantly on the hunt for goods, who live by projecting ourselves onto hopeful futures." Sometimes a single malt would communicate the same if we taste.

Cross Browser User Bridging with DynamoDB

This is a cross-post of the same article on the Intent Media tech blog.


Rationale & Background

As the web evolves, user identification has become key when it comes to research, privacy, product customization and engineering. Companies are always balancing the need to respect users’ data collection wishes with the product and economic benefits of providing a customized user experience. The days of relying on third party cookies are gone. Web companies continue to need more effective, trustworthy ways of identifying visitors as previously seen customers or households.

Anonymous user shopping patterns form the core of our ecommerce predictive decisioning platform. We have several classification and regression models that predict the probability of conversion and expected purchase price. We also model a visitor’s expected CTR (Click Through Rate) to perform customized ad selection. To accomplish all this, site page visit history is saved in a “user profile” which represents the best view we have of a site visitor.

When we see a new user, we generate a new UUID and store that in a first party cookie on a partner’s site. That becomes a partner-specific identifier for a that user’s browser.

While first-party visitor cookies are more effective than typical third-party advertising cookies at maintaining a persistent ID for a user, they suffer from a number of drawbacks.

  1. Users still tend to delete their cookies from time to time.

  2. Users buy new computers, discarding machines with the old visitor cookies.

  3. Users switch browsers. If a visitor searches on Chrome and moves to Safari, they get a new visitor cookie and appear like a new user.

  4. Users switch among multiple desktop and mobile devices. A visit from each device generates its own visitor ID cookie.

When you delete cookies, switch browsers or devices, a new visit looks like it is from a previously unseen visitor. This creates low-quality data and degrades the quality of our predictive models.

There are a number of sophisticated methods for probabilistic user identification from a variety of vendors that have built businesses around this difficult problem. In the interest of delivering a backwards-compatible system that could improve our performance in the presence of these issues, we decided to build a way to identify site visitors by their anonymous member ID in addition to the cookie-based visitor ID. This is a unique string that has a one-to-one mapping with usernames, but contains absolutely no personal information. The member ID provides limited coverage since only a small percentage of visitors log in while browsing on e-commerce sites.  It still is effective given people tend to have just a single account across devices. This information retrieves and appends otherwise lost user history to a new visitor ID.

Current Architecture

From an engineering perspective, we use a lot of Amazon Web Services (AWS) for our infrastructure ranging from EC2 for servers to S3 for long-term data storage. We use Amazon DynamoDB for storing user profiles for its simplicity and speed.

DynamoDB is a key/value store. While the actual specification is more complex, it takes a String for a key, which maps to an object that has a value with any number of attributes of various types, like Integers and Strings. For our key we use a GUID stored in a first-party cookie, that is, in the domain of the publisher sites that use our products.

We serve ads within 200 milliseconds. That is enough time for processing, but we cannot waste milliseconds. On an ad call, we send a request to DynamoDB and receive a record with the user’s history, which we use in our decisioning model. After the ad request, we put an update request on a queue for backend processing that then updates Dynamo with any new events we collected from the user.

Implementation

If our data were small enough to fit in a traditional relational database, this would not be a difficult solution. We would only need to add a new index on a second column for whichever field we wanted to look up on, and then modify our request to look at both attributes with an OR in the where clause.

In the world of distributed key value stores, this is a bit trickier to implement without degrading performance. Records can only be retrieved by key. We could make secondary queries based on the results of previous queries pulled back. This however requires a sequence of requests to the database which adds significant time to process each ad request.

Options Considered

Local Secondary Indices

DynamoDB supports local secondary indices. We considered these, but found that in general they are for downsizing a multiple-row result set towards an intended record rather than expanding the result set from a single record into a larger view of a user’s history.

Global Secondary Indices (GSIs)

As we were developing, Dynamo released a feature called Global Secondary Indices. This allows multiple fields to serve as hashes for a record. GSIs may have been effective for a single additional field, but it is difficult to arbitrarily define new lookup fields and query them all in a single request. From the start we wanted the flexibility to expand to add new indices quickly. An example would be to extend this bridging across our third-party visitor ID and other future identifiers. GSIs must be defined at table creation and added to existing tables by completely rehashing. Since they did not lend themselves to our iterative development process, we looked to other options.

Solution

The solution was to develop a second table, which we call our lookups table, in addition to our main profiles table.

Lookups Table

Alternate ID (String, Hash Key) | ID Type (Enum) | Mapped Visitor ID (String)

Profiles Table

Visitor ID (String, Hash Key) | Alternate ID 1 | Alternate ID 2 | Other profile data

When we first see a visitor with an alternate ID, such as a member ID, we write a lookup record with a reverse index, thus mapping the member ID to the visitor ID. Now suppose that user opens a new browser. As soon as they log in, we pull the new profile for the new visitor ID, and we realize we have an existing profile mapped under an old visitor ID through the member ID. We can then pull down that old profile and merge in the historical data to populate the new one. The lookup record is then rewritten to map to the new, merged profile record.

Thus, the lookups table contains a mapping of alternate IDs to the visitor ID of the last browser seen so far of a given individual. Each profile record contains the best knowledge available for a given individual from the last time we have seen that browser. The resting state of the system at any point trades off some duplication of data for query-optimized profiles for a visit from any browser.

Sample Flow

+----------------------+-----------+----------------------+
| User Actions         | Lookups   | Profiles             |
+----------------------+-----------+----------------------+
| User Visits Site     |[empty]    | abc-> {1 page}       |
| Gets Cookie “abc”    |           |                      |
+----------------------+-----------+----------------------+
| User Logs In         | 123-> abc | abc-> {2 pages}      |
+----------------------+-----------+----------------------+
| User Purchases       | 123-> abc | abc-> {2 pages, 1 $} |
+----------------------+-----------+----------------------+
| User Switches Devices| 123-> abc | abc-> {2 pages, 1 $} |
| Gets cookie "def"    |           | def-> {1 pages}      |
+----------------------+-----------+----------------------+
| User logs in         | 123-> def | abc-> {2 pages, 1 $} |
|                      |           | def-> {4 pages, 1 $} |
+----------------------+-----------+----------------------+ 

Key Benefits

  1. The performance hit for a secondary lookup is only taken once, on the first request where we see an alternate ID. Subsequent lookups are a guaranteed hit on the primary store. Once we realize we have an existing historical profile for a user, we pull all that data and merge it into the newest profile.

  2. This scales to support an arbitrary number of alternate ID types. Since DynamoDB supports batch requests, we do a batch request for all the alternate IDs, gather the unique visitor IDs that this retrieves, and then do a batch request for the historical profiles of the returned visitor IDs. This limits us to a maximum of two sequential requests, slashing latency.

  3. Merging is nondestructive. We retain the profiles mapped from the old visitor ID, so if the user alternates between devices multiple times, we can retrieve accurate data quickly.

  4. We can track how many devices we have merged together for a given user, and feed that as a signal into our model. Do users who use multiple browsers, tablets, and smartphones shop differently than users who only shop from their desktop? Our model knows.

Conclusion

We have demonstrated how with simple tools we have built a sophisticated cross-browser user profile. Intent Media’s data team does a lot of work to optimize ad performance for publishers and advertisers as well as the overall experience for users. User profile histories are one of many tools we use. This is just one small enhancement we’ve added to our platform as we continue to iterate and innovate.