Hadoop Streaming for Rapid Prototyping of Distributed Algorithms
Posted: January 4th, 2009 | Author: Brad | Filed under: Programming | Tags: distributed computing, ga, genetic algorithms, hadoop, hadoop streaming, hadoop streaming tutorial, hadoop tutorial, ruby, tutorial | 13 Comments »Note: This article assumes that you know a little about MapReduce, or that if you don’t, you might skim the enclosed links so you know what I’m talking about when I get to the examples, or check out the Hadoop Tutorial. It also assumes that you have Hadoop set up – either clustered or pseudo-clustered – if you’re going to run the examples. Or you can just read along.
Hadoop is a framework (written in Java) that supports distributed computing – specifically Google’s MapReduce algorithm. It also comprises HDFS (the Hadoop Distrubuted File System), which allows you to redundantly store large quantities of data across multiple disconnected disks as if they were a single storage unit. I’ve used Hadoop at two jobs and at home, and it rocks.
It comes with a problem, though, which you may spot in the first paragraph: It’s written in Java. Now, don’t get me wrong – Java’s a great language. But developing software in Java with the most commonly used tools (Eclipse, Ant/Maven, &c) is a monumental pain (and you’re hearing this from an old Visual C++ hand). I’m the first to admit that I should shore up my skills with the Java tools, but even if I were better at it, getting a non-trivial Java project from zero to first runnable build is still about as complex as the invasion of Normandy, and the proliferation of XML config files is just inhumane.
Still, the performance and solidity of Java make it the right choice for a production Hadoop project. But what if you just want to kick around an idea or test an algorithm? Wouldn’t it be nice if you could do that in 2 hours instead of a day and a half? Wouldn’t it be nicer still if you could do it in your language of choice?
Hadoop Streaming has made me a very happy man. Any language that can take data from stdin and give it to stdout can be used to make Hadoop MapReduce jobs.
So of course, I’m doing mine in Ruby.
As an example, I’ll transliterate the trivial canonical Hadoop word counting example into Ruby. This example involves taking a large text, and counting the number of instances of each word it contains. The mapper takes in rows of text, and emits key-value pairs where the key is a word and the value is the number of times that word has occurred in a given row of text. It would look something like this:
word_count_mapper.rb:
#!/usr/bin/env ruby STDIN.each_line do |line| word_count = {} line.split.each do |word| word_count[word] ||= 0 word_count[word] += 1 end word_count.each do |k,v| puts "#{k}\t#{v}" end end
Already, we’re at our first non-trivial design decision, and it’s due to a significant design difference between Hadoop Streaming and regular Hadoop jobs: When you’re streaming, a single instance of your script will handle many pieces of input – it’s a bit like a filter-style Unix command (e.g., grep). A regular Hadoop mapping job is more firmly seated in the functional paradigm – each call to your mapper gets one line of data, with no knowledge of others. Because of this, some tutorials will suggest shortcuts such as keeping the hash-based accumulator outside the STDIN loop, and emitting all your rows of [word, count] pairs at the end of processing. There are advantages to this, but I’m going to stick with my code above, for two reasons:
- If you maintain global state in your mapper, you’ll incur significant rework if you try to port the code to a more purely functional Java Hadoop mapper.
- This could get damagingly memory-intensive on large data sets. Going functional and streaming everything straight to HDFS will add some storage cost, but for large jobs I’m much more worried about RAM than disk.
The purpose of the reduce step is to bring together all the per-line counts for each word, and reduce them to a single, global count per word. This is facilitated by the fact that Hadoop orders all the records by key before handing it to the reducer (where by default the key is everything before the first tab character – in this case, the word being counted.) My version of the reducer looks like this:
word_count_reducer.rb:
#!/usr/bin/env ruby current_word = nil current_count = 0 STDIN.each_line do |line| word, count = line.strip.split if word != current_word puts "#{current_word}\t#{current_count}" unless current_word.nil? current_word = word current_count = 0 end current_count += count.to_i end puts "#{current_word}\t#{current_count}" unless current_word.nil?
The whole business with current_count is necessitated by another Hadoop Streaming-vs.-Hadoop quirk: In a regular Hadoop reducer, one call to the reducing function gets a collection of all rows associated with a particular key. When you’re streaming, you’re getting only one row at a time, but you’re guaranteed that the rows will be ordered on keys, and that the rows for a particular key will not be split up across reducer instances. Again, the temptation might be to place a single accumulator outside the main STDIN loop, but I’m going to stick with my strategy of lean runtime footprint and going straight to storage. The price for that is keeping track of which key you’re on.
I have placed the mapper and reducer in a folder called scripts in my $HADOOP_HOME folder. To run them against an existing file in HDFS, I do:
bin/hadoop jar contrib/streaming/hadoop-0.19.0-streaming.jar \ -mapper scripts/word_count_mapper.rb -reducer scripts/word_count_reducer.rb \ -file `pwd`/scripts/word_count_mapper.rb \ -file `pwd`/scripts/word_count_reducer.rb \ -input texts/my_text -output word_counts
…and results may be extracted from the word_counts folder in HDFS.
This is all there is to the trivial example. There’s more, of course – I’m currently using this technology to prototype a much more ambitious project that I will eventually port to Java (more on that later). While I’m finding that there is (at least subjectively) some performance overhead associated with streaming that I don’t see otherwise, it’s proving incredibly useful and speedy for testing out algorithms and designs.
Are you working with Hadoop Streaming? Do you have questions? Drop a line in the comments – I’d love to swap learnings.
[...] to save most or all of my technical writing for the Kickass Labs blog (case in point: my recent Hadoop Streaming Tutorial). This has to do with the fact that much of my after-hours unsupervised play time is spent on tech [...]
I’m building a frameworks for Hadoop streaming in ruby:
http://github.com/infochimps/wukong
It handles several common patterns, including an Accumulator that would let you write the above both tersely and performantly: subclass Accumulator and then define reset (sets key_count to 0), accumulate (increments it), and finalize (emits the key and the realized value of key_count).
It also lets you write your scripts against a stream of lightweight objects. Say you’re working with retrosheet.org baseball data, and you define structs for BatterGame, PitcherGame and ManagerGame. Implement your mapper or reducer as a RecordStreamer, and ensure that lines look like
Each line is instantiated as the class given in its first field, with the remaining fields as values. Your script just sees the objects, so you could emit with no attention to where those fields appear in each line.
It’s not a gem or anything yet, but it’s clean enough for someone to try playing with it if you’re interested: http://github.com/infochimps/wukong
I’m building a frameworks for Hadoop streaming in ruby:
http://github.com/infochimps/wukong
It handles several common patterns, including an Accumulator that would let you write the above both tersely and performantly: subclass Accumulator and then define reset (sets key_count to 0), accumulate (increments it), and finalize (emits the key and the realized value of key_count).
It also lets you write your scripts against a stream of lightweight objects. Say you’re working with retrosheet.org baseball data, and you define structs for BatterGame, PitcherGame and ManagerGame. Implement your mapper or reducer as a RecordStreamer, and ensure that lines look like
[batter_game ...tab-separated-fields...]
[pitcher_game ...tab-separated-fields...]
[manager_game ...tab-separated-fields...]
Each line is instantiated as the class given in its first field, with the remaining fields as values. Your script just sees the object stream. To find all games a person was involved in in any way, you could emit [obj.player_id obj.game_id] with no attention to where those fields appear in the line. (Then, of course, use uniq as your reducer if you want the pairs, or a ListAccumulator if you want [player_id, ...game_ids...], or pipe uniq into a CountKeysAccumulator like the above if you want a simple count of distinct game_ids).
It’s not a gem or anything yet, but it’s clean enough for someone to try playing with it if you’re interested: http://github.com/infochimps/wukong
Flip – sounds excellent, and not unlike something I was going to undertake on my own.
I’ll be in touch.
What does the word_count directory look like after your job runs? Does it have (a) one output file with all the word counts, or (b) one output file per word with the count for that word?
B. Factor: Good question. The answer is (c) a collection of files, one per reducer instance, each of which has one word/count pair per line. This arrangement (one file per reducer) is standard for Hadoop.
thanks, this helped me understand streaming a bit more.
I modified line 8 of your reducer to be:
key, count = line.strip.split(/\t/)
since the default split() was not compliant with my keys (full search strings, with spaces everywhere).
Not completely relevant to this topic, but you should try Netbeans + Hadoop Studio plugin if you wish to give Java another chance. I am not at home with JARs either, but his combination was very easy to set up and use. It rocks. It’s not Ruby though.
Thanks a lot for this great tutorial!!
It was *exactly* the kind of example I needed. In particular it was good to read clearly that the keys are sorted/grouped before being parsed to the reducer.
Hello Guys And Gals!
i’ve just joined here and wanted to say hi to all of you!I really hope to give something back to this board…
Cheers
fun
download
best
downloads
games
videos.
This is the mainn reason I read http://www.kickasslabs.com. Fascinating posts.
why not…