Sit back, relax, and enjoy the code.

Hadoop Streaming for Rapid Prototyping of Distributed Algorithms

Posted: January 4th, 2009 | Author: Brad | Filed under: Programming | Tags: , , , , , , , , | 13 Comments »

Note: This article assumes that you know a little about MapReduce, or that if you don’t, you might skim the enclosed links so you know what I’m talking about when I get to the examples, or check out the Hadoop Tutorial. It also assumes that you have Hadoop set up – either clustered or pseudo-clustered – if you’re going to run the examples. Or you can just read along.

Hadoop is a framework (written in Java) that supports distributed computing – specifically Google’s MapReduce algorithm. It also comprises HDFS (the Hadoop Distrubuted File System), which allows you to redundantly store large quantities of data across multiple disconnected disks as if they were a single storage unit. I’ve used Hadoop at two jobs and at home, and it rocks.

It comes with a problem, though, which you may spot in the first paragraph: It’s written in Java. Now, don’t get me wrong – Java’s a great language. But developing software in Java with the most commonly used tools (Eclipse, Ant/Maven, &c) is a monumental pain (and you’re hearing this from an old Visual C++ hand). I’m the first to admit that I should shore up my skills with the Java tools, but even if I were better at it, getting a non-trivial Java project from zero to first runnable build is still about as complex as the invasion of Normandy, and the proliferation of XML config files is just inhumane.

Still, the performance and solidity of Java make it the right choice for a production Hadoop project. But what if you just want to kick around an idea or test an algorithm? Wouldn’t it be nice if you could do that in 2 hours instead of a day and a half? Wouldn’t it be nicer still if you could do it in your language of choice?

Hadoop Streaming has made me a very happy man. Any language that can take data from stdin and give it to stdout can be used to make Hadoop MapReduce jobs.

So of course, I’m doing mine in Ruby.

As an example, I’ll transliterate the trivial canonical Hadoop word counting example into Ruby. This example involves taking a large text, and counting the number of instances of each word it contains. The mapper takes in rows of text, and emits key-value pairs where the key is a word and the value is the number of times that word has occurred in a given row of text. It would look something like this:

word_count_mapper.rb:

#!/usr/bin/env ruby
 
STDIN.each_line do |line|
  word_count = {}
  line.split.each do |word|
    word_count[word] ||= 0
    word_count[word] += 1
  end
 
  word_count.each do |k,v|
    puts "#{k}\t#{v}"
  end
end

Already, we’re at our first non-trivial design decision, and it’s due to a significant design difference between Hadoop Streaming and regular Hadoop jobs: When you’re streaming, a single instance of your script will handle many pieces of input – it’s a bit like a filter-style Unix command (e.g., grep). A regular Hadoop mapping job is more firmly seated in the functional paradigm – each call to your mapper gets one line of data, with no knowledge of others. Because of this, some tutorials will suggest shortcuts such as keeping the hash-based accumulator outside the STDIN loop, and emitting all your rows of [word, count] pairs at the end of processing. There are advantages to this, but I’m going to stick with my code above, for two reasons:

  1. If you maintain global state in your mapper, you’ll incur significant rework if you try to port the code to a more purely functional Java Hadoop mapper.
  2. This could get damagingly memory-intensive on large data sets. Going functional and streaming everything straight to HDFS will add some storage cost, but for large jobs I’m much more worried about RAM than disk.

The purpose of the reduce step is to bring together all the per-line counts for each word, and reduce them to a single, global count per word. This is facilitated by the fact that Hadoop orders all the records by key before handing it to the reducer (where by default the key is everything before the first tab character – in this case, the word being counted.) My version of the reducer looks like this:

word_count_reducer.rb:

#!/usr/bin/env ruby
 
current_word = nil
current_count = 0
STDIN.each_line do |line|
  word, count = line.strip.split
  if word != current_word
    puts "#{current_word}\t#{current_count}" unless current_word.nil?
    current_word = word
    current_count = 0
  end
 
  current_count += count.to_i
end
 
puts "#{current_word}\t#{current_count}" unless current_word.nil?

The whole business with current_count is necessitated by another Hadoop Streaming-vs.-Hadoop quirk: In a regular Hadoop reducer, one call to the reducing function gets a collection of all rows associated with a particular key. When you’re streaming, you’re getting only one row at a time, but you’re guaranteed that the rows will be ordered on keys, and that the rows for a particular key will not be split up across reducer instances. Again, the temptation might be to place a single accumulator outside the main STDIN loop, but I’m going to stick with my strategy of lean runtime footprint and going straight to storage. The price for that is keeping track of which key you’re on.

I have placed the mapper and reducer in a folder called scripts in my $HADOOP_HOME folder. To run them against an existing file in HDFS, I do:

bin/hadoop jar contrib/streaming/hadoop-0.19.0-streaming.jar \
  -mapper scripts/word_count_mapper.rb -reducer scripts/word_count_reducer.rb \
  -file `pwd`/scripts/word_count_mapper.rb \
  -file `pwd`/scripts/word_count_reducer.rb \
  -input texts/my_text -output word_counts

…and results may be extracted from the word_counts folder in HDFS.

This is all there is to the trivial example. There’s more, of course – I’m currently using this technology to prototype a much more ambitious project that I will eventually port to Java (more on that later). While I’m finding that there is (at least subjectively) some performance overhead associated with streaming that I don’t see otherwise, it’s proving incredibly useful and speedy for testing out algorithms and designs.

Are you working with Hadoop Streaming? Do you have questions? Drop a line in the comments – I’d love to swap learnings.

  • Share/Bookmark

13 Comments on “Hadoop Streaming for Rapid Prototyping of Distributed Algorithms”

  1. #1 No One Thing / Refocusing said at 12:28 pm on January 5th, 2009:

    [...] to save most or all of my technical writing for the Kickass Labs blog (case in point: my recent Hadoop Streaming Tutorial). This has to do with the fact that much of my after-hours unsupervised play time is spent on tech [...]

  2. #2 Philip (flip) Kromer said at 9:56 pm on February 16th, 2009:

    I’m building a frameworks for Hadoop streaming in ruby:
    http://github.com/infochimps/wukong

    It handles several common patterns, including an Accumulator that would let you write the above both tersely and performantly: subclass Accumulator and then define reset (sets key_count to 0), accumulate (increments it), and finalize (emits the key and the realized value of key_count).

    It also lets you write your scripts against a stream of lightweight objects. Say you’re working with retrosheet.org baseball data, and you define structs for BatterGame, PitcherGame and ManagerGame. Implement your mapper or reducer as a RecordStreamer, and ensure that lines look like

    Each line is instantiated as the class given in its first field, with the remaining fields as values. Your script just sees the objects, so you could emit with no attention to where those fields appear in each line.

    It’s not a gem or anything yet, but it’s clean enough for someone to try playing with it if you’re interested: http://github.com/infochimps/wukong

  3. #3 Philip (flip) Kromer said at 10:05 pm on February 16th, 2009:

    I’m building a frameworks for Hadoop streaming in ruby:
    http://github.com/infochimps/wukong

    It handles several common patterns, including an Accumulator that would let you write the above both tersely and performantly: subclass Accumulator and then define reset (sets key_count to 0), accumulate (increments it), and finalize (emits the key and the realized value of key_count).

    It also lets you write your scripts against a stream of lightweight objects. Say you’re working with retrosheet.org baseball data, and you define structs for BatterGame, PitcherGame and ManagerGame. Implement your mapper or reducer as a RecordStreamer, and ensure that lines look like

    [batter_game ...tab-separated-fields...]
    [pitcher_game ...tab-separated-fields...]
    [manager_game ...tab-separated-fields...]

    Each line is instantiated as the class given in its first field, with the remaining fields as values. Your script just sees the object stream. To find all games a person was involved in in any way, you could emit [obj.player_id obj.game_id] with no attention to where those fields appear in the line. (Then, of course, use uniq as your reducer if you want the pairs, or a ListAccumulator if you want [player_id, ...game_ids...], or pipe uniq into a CountKeysAccumulator like the above if you want a simple count of distinct game_ids).

    It’s not a gem or anything yet, but it’s clean enough for someone to try playing with it if you’re interested: http://github.com/infochimps/wukong

  4. #4 brad said at 11:22 am on February 17th, 2009:

    Flip – sounds excellent, and not unlike something I was going to undertake on my own.

    I’ll be in touch.

  5. #5 B. Factor said at 1:41 am on April 28th, 2009:

    What does the word_count directory look like after your job runs? Does it have (a) one output file with all the word counts, or (b) one output file per word with the count for that word?

  6. #6 Brad said at 9:27 pm on April 28th, 2009:

    B. Factor: Good question. The answer is (c) a collection of files, one per reducer instance, each of which has one word/count pair per line. This arrangement (one file per reducer) is standard for Hadoop.

  7. #7 matthew said at 6:11 pm on May 20th, 2009:

    thanks, this helped me understand streaming a bit more.

    I modified line 8 of your reducer to be:
    key, count = line.strip.split(/\t/)

    since the default split() was not compliant with my keys (full search strings, with spaces everywhere).

  8. #8 anx said at 7:30 pm on July 12th, 2009:

    Not completely relevant to this topic, but you should try Netbeans + Hadoop Studio plugin if you wish to give Java another chance. I am not at home with JARs either, but his combination was very easy to set up and use. It rocks. It’s not Ruby though. :)

  9. #9 Me said at 3:51 pm on December 17th, 2009:

    Thanks a lot for this great tutorial!!

    It was *exactly* the kind of example I needed. In particular it was good to read clearly that the keys are sorted/grouped before being parsed to the reducer.

  10. #10 trisseria said at 7:34 am on March 1st, 2010:

    Hello Guys And Gals!

    i’ve just joined here and wanted to say hi to all of you!I really hope to give something back to this board…

    Cheers

  11. #11 Steawaythakly said at 11:43 am on March 4th, 2010:

    fun

    download

    best

    downloads

    games

    videos.

  12. #12 Carmelo said at 11:15 am on March 10th, 2010:

    This is the mainn reason I read http://www.kickasslabs.com. Fascinating posts.

  13. #13 omiplirlChorn said at 3:05 pm on August 30th, 2010:

    why not…


Leave a Reply