Safely Upgrading Hadoop and Cascalog

This is a cross post of the same article on the Intent Media Blog


The Intent Media data team uses Hadoop extensively as way of doing distributed computation. The big data ecosystem is evolving very quickly, and the patterns and versions we had been using are being replaced with faster and more stable platforms. This post is about how we successfully and safely upgraded our Hadoop jobs and paid down some technical debt.

Background

Many of Intent Media’s backend job flows use Hadoop to manage the sheer size of the input datasets. We have jobs that range from basic aggregation to learning models. Hadoop is an impressive framework, but we had increasingly been confronting issues with stability, missing features, and a lack of documentation. A number of indicators pointed to our version of Hadoop being too far out of date. 

For our original MapReduce jobs, we were using Hadoop 0.2, which was state of the art at the time those jobs were written, but now the framework has matured through several major versions and Hadoop 2.x is gaining adoption. We needed to at least get to 1.x to give us a path for future upgrades. 

On the data team, we write most of our MapReduce jobs in Cascalog, a Clojure domain specific language (DSL) for Cascading, a library that provides job flow abstractions for Hadoop. The Clojure syntax is fairly terse, and the code is easy to test. Cascalog itself has matured since we first used it and we needed to upgrade our library from 1.x to 2.1 to stay current with the latest stable versions. 

Pre-Upgrade: Testing Infrastructure

Before initiating our upgrade project, we needed a way to automatically test our jobs in a cluster environment. We use TDD, so most jobs had a great unit test suite and ran against local execution engines, but job behavior on a cluster with real world data can vary quite a bit. Unfortunately, a clean run with sample data on a local Hadoop jar does not guarantee flawless execution on a 30 machine cluster against production datasets. Most of the time when we define a new job, our QA team can verify expected output and execution under a variety of conditions. However, this manual process quickly becomes tedious when we have to regression test an entire set of jobs on a broad-sweeping version change.

Our first project was to write a remote job triggering and testing harness which could pull a medium sized subset of production data, execute a long running remote job, and make sanity assertions about the output. Running remote jobs is expensive, so we did not put these tests in the integration test suite (which is run on every push to a branch). Instead, engineers can run these job tests prior to a large, possibly breaking change, and we can trust than a version upgrade is safe within a few hours.

Our job execution application is written in Java, and most of our other integration tests are in cucumber, so we decided to continue with our existing technology choices for our remote cluster tests.  We chose to define scenarios in Cucumber and use Ruby to trigger jobs, tail logs, and make assertions on result data. Engineers and product managers all have the ability to see the expected job flow of each job, understand the input and output expectations, all based on a job execution run on a snapshot of real production data. 

Hadoop Upgrade

We run all our Hadoop jobs on AWS Elastic MapReduce (EMR). EMR is a powerful way to spin up entire clusters of optimized cloud instances preconfigured with Hadoop with a single API call. The Amazon team has worked to abstract a lot of the work creating and tuning a Hadoop cluster, so all we need to do is submit a job request. EMR is configured to start instances with particular virtual machine images, called Amazon Machine Images (AMIs). AMIs come in different versions and are packaged with different software, such as a specific Hadoop version. As early adopters of EMR, we were using AMI version 1.0.0, which shipped with Hadoop 0.20. To run our jobs against Hadoop 1.x, we had to modify the AMI versions in our job triggering configuration. 

The Hadoop transition from 0.x to 1.x consisted of mostly versioning and lower level changes, so fortunately our application code didn’t need to change much. We just needed to swap out the Hadoop library we compiled against and test each job carefully.  Most of these job upgrades went smoothly. 

Some of our Cascalog based jobs did not perform as expected in the transition to running on Hadoop 1.x and returned unusual output. These will need to be significantly debugged, refactored, or replaced altogether. To prevent these few jobs from blocking our path forward with the rest, we built backwards compatibility into our EMR wrapper. This lets us specify a certain AMI version to use for each type of job, so we can continue to upgrade jobs and fix incompatible ones over time.

This had the additional benefit of reducing risk in the production system. Rather than a massive, risky release trying to forcibly upgrade everything at once and hoping we tested every job scenario, we could merge each small upgrade change to master and evolve the system over time. We started slowly and found job performance and stability improved dramatically, giving us confidence to take larger steps and move faster. Rather than fearing upgrades, we could accelerate our transition. 

Cascalog 2.0 Upgrade

Last year Cascalog 2.0 launched, and it was a huge leap forward under the hood, separating the maturing Clojure API from the execution engine. After we went through the upgrade, Sam Ritchie has since written on some of the details fairly extensively here: Cascalog 2.0 In Depth

There were some breaking changes, but we found the migration to be fairly smooth. Here’s what we had to modify to make the transition:

Operations are functions, so now every operation macro is now renamed to show that it works like a function.

  • defmapop -> defmapfn
  • defbufferop -> defbufferfn

Many of the namespaces have changed to be more specific. Some functions are either particular to Cascalog or the underlying Cascading and the namespaces now reflect that.

  • cascalog.ops -> cascalog.logic.ops
  • cascalog.io -> cascalog.cascading.io

Cascalog 2 uses Hadoop 1 instead of 0.2x.

The taps are much more rich and configurable for doing both input and output. We could remove a lot of serialization/deserialization hacks for working with delimited files. For example, in Cascalog 1 the textline tap doesn’t support a tap exporting compressed strings, but it does in later versions. 

For example, our old workaround:

(?- (hfs-delimited output-path :delimiter "," :compress true :sinkmode :replace :quote "")

  (<- [?part0 ?part1 ?part2 ?part3 ?part4 ?part5]

    (get-aggregations ?field-1 ?field-2 ?field-3 ?field-4 ?field-5 ?count :> ?json)

      (clojure.string/split ?json #"," :> ?part0 ?part1 ?part2 ?part3 ?part4 ?part5))))))

Could in Cascalog 2 just become:

(?- (hfs-textline output-path :compress true :sinkmode :replace)

  (<- [?json]

    (get-aggregations ?field-1 ?field-2 ?field-3 ?field-4 ?field-5 ?count :> ?json)))))

All our jobs continued to work as expected on the new EMR AMIs, and now we have access to the latest API and documentation, making Cascalog development much easier.

Key Learnings

Tests are essential. You can have assurances that your application is correct after an upgrade if you have the right tests. Make sure you have unit tests to quickly iterate and test changes but make sure you have larger integration tests as well. We have integration tests for all our Hadoop jobs.  These tests assured us quickly that each library and machine image version was working together properly and computing the expected results correctly.

Tech debt gets more painful over time. Many software packages use some form of semantic versioning to help you understand how important an upgrade is or what it will break. For example, we knew our upgrades could both require significant prioritization because they increased by major versions. However, you can often upgrade minor versions often to get smaller stability improvements and bug fixes. Make sure the client/customer for your product recognizes the importance of allocating time to major version upgrades, and the product owner leaves bandwidth for minor upgrades along the way.

Branches should be as short-lived as possible. Long-lived branches will make your life difficult by requiring you to rebase frequently. Merge often to master to minimize conflicts.

Use feature flags to hold off execution flow to major components. We used this strategy when we built a method to allow us to specify the Hadoop version based on job, which let us test and upgrade our jobs over a period of time rather than all at once.

Pairing is great. Often an upgrade process pushes into new technical territory, but that doesn’t mean the types of issues will be totally new. Get another engineer’s input on the problem, even if you can only pair for part of the project.