Hadoop Streaming for Rapid Prototyping of Distributed Algorithms

Note: This article assumes that you know a little about MapReduce, or that if you don’t, you might skim the enclosed links so you know what I’m talking about when I get to the examples, or check out the Hadoop Tutorial. It also assumes that you have Hadoop set up – either clustered or pseudo-clustered – if you’re going to run the examples. Or you can just read along.

Hadoop is a framework (written in Java) that supports distributed computing – specifically Google’s MapReduce algorithm. It also comprises HDFS (the Hadoop Distrubuted File System), which allows you to redundantly store large quantities of data across multiple disconnected disks as if they were a single storage unit. I’ve used Hadoop at two jobs and at home, and it rocks.

It comes with a problem, though, which you may spot in the first paragraph: It’s written in Java. Now, don’t get me wrong – Java’s a great language. But developing software in Java with the most commonly used tools (Eclipse, Ant/Maven, &c) is a monumental pain (and you’re hearing this from an old Visual C++ hand). I’m the first to admit that I should shore up my skills with the Java tools, but even if I were better at it, getting a non-trivial Java project from zero to first runnable build is still about as complex as the invasion of Normandy, and the proliferation of XML config files is just inhumane.

Still, the performance and solidity of Java make it the right choice for a production Hadoop project. But what if you just want to kick around an idea or test an algorithm? Wouldn’t it be nice if you could do that in 2 hours instead of a day and a half? Wouldn’t it be nicer still if you could do it in your language of choice?

Hadoop Streaming has made me a very happy man. Any language that can take data from stdin and give it to stdout can be used to make Hadoop MapReduce jobs.

So of course, I’m doing mine in Ruby.

As an example, I’ll transliterate the trivial canonical Hadoop word counting example into Ruby. This example involves taking a large text, and counting the number of instances of each word it contains. The mapper takes in rows of text, and emits key-value pairs where the key is a word and the value is the number of times that word has occurred in a given row of text. It would look something like this:

word_count_mapper.rb:

#!/usr/bin/env ruby
 
STDIN.each_line do |line|
  word_count = {}
  line.split.each do |word|
    word_count[word] ||= 0
    word_count[word] += 1
  end
 
  word_count.each do |k,v|
    puts "#{k}\t#{v}"
  end
end

Already, we’re at our first non-trivial design decision, and it’s due to a significant design difference between Hadoop Streaming and regular Hadoop jobs: When you’re streaming, a single instance of your script will handle many pieces of input – it’s a bit like a filter-style Unix command (e.g., grep). A regular Hadoop mapping job is more firmly seated in the functional paradigm – each call to your mapper gets one line of data, with no knowledge of others. Because of this, some tutorials will suggest shortcuts such as keeping the hash-based accumulator outside the STDIN loop, and emitting all your rows of [word, count] pairs at the end of processing. There are advantages to this, but I’m going to stick with my code above, for two reasons:

  1. If you maintain global state in your mapper, you’ll incur significant rework if you try to port the code to a more purely functional Java Hadoop mapper.
  2. This could get damagingly memory-intensive on large data sets. Going functional and streaming everything straight to HDFS will add some storage cost, but for large jobs I’m much more worried about RAM than disk.

The purpose of the reduce step is to bring together all the per-line counts for each word, and reduce them to a single, global count per word. This is facilitated by the fact that Hadoop orders all the records by key before handing it to the reducer (where by default the key is everything before the first tab character – in this case, the word being counted.) My version of the reducer looks like this:

word_count_reducer.rb:

#!/usr/bin/env ruby
 
current_word = nil
current_count = 0
STDIN.each_line do |line|
  word, count = line.strip.split
  if word != current_word
    puts "#{current_word}\t#{current_count}" unless current_word.nil?
    current_word = word
    current_count = 0
  end
 
  current_count += count.to_i
end
 
puts "#{current_word}\t#{current_count}" unless current_word.nil?

The whole business with current_count is necessitated by another Hadoop Streaming-vs.-Hadoop quirk: In a regular Hadoop reducer, one call to the reducing function gets a collection of all rows associated with a particular key. When you’re streaming, you’re getting only one row at a time, but you’re guaranteed that the rows will be ordered on keys, and that the rows for a particular key will not be split up across reducer instances. Again, the temptation might be to place a single accumulator outside the main STDIN loop, but I’m going to stick with my strategy of lean runtime footprint and going straight to storage. The price for that is keeping track of which key you’re on.

I have placed the mapper and reducer in a folder called scripts in my $HADOOP_HOME folder. To run them against an existing file in HDFS, I do:

bin/hadoop jar contrib/streaming/hadoop-0.19.0-streaming.jar \
  -mapper scripts/word_count_mapper.rb -reducer scripts/word_count_reducer.rb \
  -file `pwd`/scripts/word_count_mapper.rb \
  -file `pwd`/scripts/word_count_reducer.rb \
  -input texts/my_text -output word_counts

…and results may be extracted from the word_counts folder in HDFS.

This is all there is to the trivial example. There’s more, of course – I’m currently using this technology to prototype a much more ambitious project that I will eventually port to Java (more on that later). While I’m finding that there is (at least subjectively) some performance overhead associated with streaming that I don’t see otherwise, it’s proving incredibly useful and speedy for testing out algorithms and designs.

Are you working with Hadoop Streaming? Do you have questions? Drop a line in the comments – I’d love to swap learnings.

Share
This entry was posted in Programming and tagged , , , , , , , , . Bookmark the permalink.

20 Responses to Hadoop Streaming for Rapid Prototyping of Distributed Algorithms

  1. Pingback: No One Thing / Refocusing

  2. I’m building a frameworks for Hadoop streaming in ruby:
    http://github.com/infochimps/wukong

    It handles several common patterns, including an Accumulator that would let you write the above both tersely and performantly: subclass Accumulator and then define reset (sets key_count to 0), accumulate (increments it), and finalize (emits the key and the realized value of key_count).

    It also lets you write your scripts against a stream of lightweight objects. Say you’re working with retrosheet.org baseball data, and you define structs for BatterGame, PitcherGame and ManagerGame. Implement your mapper or reducer as a RecordStreamer, and ensure that lines look like

    Each line is instantiated as the class given in its first field, with the remaining fields as values. Your script just sees the objects, so you could emit with no attention to where those fields appear in each line.

    It’s not a gem or anything yet, but it’s clean enough for someone to try playing with it if you’re interested: http://github.com/infochimps/wukong

  3. I’m building a frameworks for Hadoop streaming in ruby:
    http://github.com/infochimps/wukong

    It handles several common patterns, including an Accumulator that would let you write the above both tersely and performantly: subclass Accumulator and then define reset (sets key_count to 0), accumulate (increments it), and finalize (emits the key and the realized value of key_count).

    It also lets you write your scripts against a stream of lightweight objects. Say you’re working with retrosheet.org baseball data, and you define structs for BatterGame, PitcherGame and ManagerGame. Implement your mapper or reducer as a RecordStreamer, and ensure that lines look like

    [batter_game ...tab-separated-fields...]
    [pitcher_game ...tab-separated-fields...]
    [manager_game ...tab-separated-fields...]

    Each line is instantiated as the class given in its first field, with the remaining fields as values. Your script just sees the object stream. To find all games a person was involved in in any way, you could emit [obj.player_id obj.game_id] with no attention to where those fields appear in the line. (Then, of course, use uniq as your reducer if you want the pairs, or a ListAccumulator if you want [player_id, ...game_ids...], or pipe uniq into a CountKeysAccumulator like the above if you want a simple count of distinct game_ids).

    It’s not a gem or anything yet, but it’s clean enough for someone to try playing with it if you’re interested: http://github.com/infochimps/wukong

  4. brad says:

    Flip – sounds excellent, and not unlike something I was going to undertake on my own.

    I’ll be in touch.

  5. B. Factor says:

    What does the word_count directory look like after your job runs? Does it have (a) one output file with all the word counts, or (b) one output file per word with the count for that word?

  6. Brad says:

    B. Factor: Good question. The answer is (c) a collection of files, one per reducer instance, each of which has one word/count pair per line. This arrangement (one file per reducer) is standard for Hadoop.

  7. matthew says:

    thanks, this helped me understand streaming a bit more.

    I modified line 8 of your reducer to be:
    key, count = line.strip.split(/\t/)

    since the default split() was not compliant with my keys (full search strings, with spaces everywhere).

  8. anx says:

    Not completely relevant to this topic, but you should try Netbeans + Hadoop Studio plugin if you wish to give Java another chance. I am not at home with JARs either, but his combination was very easy to set up and use. It rocks. It’s not Ruby though. :)

  9. Me says:

    Thanks a lot for this great tutorial!!

    It was *exactly* the kind of example I needed. In particular it was good to read clearly that the keys are sorted/grouped before being parsed to the reducer.

  10. trisseria says:

    Hello Guys And Gals!

    i’ve just joined here and wanted to say hi to all of you!I really hope to give something back to this board…

    Cheers

  11. Carmelo says:

    This is the mainn reason I read http://www.kickasslabs.com. Fascinating posts.

  12. vic says:

    Thanks a lot, you just saved me many hours of hadoop reading :)

  13. Brad says:

    Vic – I’m happy I could help.

  14. jo says:

    Hi Brad,
    Is there a real performance hit when using Hadoop Streaming? . We’re planning to use this in a production env and wanted to know if the whole processing would be slower if MR is done in Ruby vs Java.
    Thanks,
    jo

  15. Brad says:

    Jo -

    There was a significant performance hit; part of it was Ruby, part of it was just the fact of leaving the JVM and pushing all this data across processes.

    There are three things you might do about it:

    1) If this is performing as well as you need it to, don’t sweat it. (But do keep an eye on your expected data throughput growth.)

    2) Add more machines. That’s what the horizontal scalability is for, after all.

    3) Switch languages. You could move up to Ruby 1.9, or JRuby, or Java or C/C++ (though if you’re going to do the latter, might as well go with regular Hadoop jobs). If you’re going to do this, do some profiling to determine that you’re going after the correct bottleneck. If your job chain is long – that is, if you have many scripts that would need replacing – pick one, profile it, swap it with an equivalent in the language you think will do better and profile that.

    If you do wind up switching languages, I’d recommend taking a deep look at JRuby. The last time I played with JRuby in a Hadoop context was a couple of years ago, but I was seeing order-of-magnitude improvements in speed (when I put the -server flag in my #! line).

    Good luck!

  16. jo says:

    Thanks Brad ; will check up on JRuby. Reg #2, we have a Hadoop cluster with enough machines, so shouldn’t be a big problem. In pt #3-Switch languages, you mention “regular Hadoop jobs” — hope you meant using stuff like Pig/Hive.

  17. Brad says:

    Pig/Hive is one option, but for raw performance, it’s tough to beat a custom-written Hadoop job in Java. This entails writing Java classes that inherit from Hadoop’s Mapper and Reducer classes, and packaging them as a .jar that gets distributed to your cluster.

    Naturally, speed is not the only concern – and you can always buy/rent more machines to get more speed. A lot depends on your team’s skills and what you’re paying for developer time vs. machine time.

  18. Zahide says:

    Thanks for the useful suggestions Brad.
    I have two questions about streaming.
    First, so NOT like the Java Reducer (in which every Reducer gets values associated with a single key), in streaming one reducer might get values from multiple keys. That’s something we need to take care of when we write the reducer code. Is it possible that values for single key might spread over different reducers ?

    Second, are you saying that in most cases, streaming is NOT as efficient as MapReduce written in java ?

    Thanks

  19. Brad says:

    Zahide: You’re correct about the reducer. One reducer instance can receive many keys, and you must store some state to detect when your reducer gets a new key. However, all values for a single key will pass through a single reducer instance. (It wouldn’t be MapReduce if that weren’t the case.)

    The times I’ve tried it, Streaming was slower. I was also using a 1.8 series Ruby, which is not the fastest of interpreted languages. JRuby or another more performant scripting language will give you better results; I happened to use Ruby because I wanted to prototype an algorithm quickly.

    The fact that Hadoop Streaming might not be as fast as writing your own Hadoop framework .jar files doesn’t make it a bad thing – if you’re doing something that doesn’t require extreme performance, it might be appropriate to write MR jobs in the language of your choice, either to re-use working code or just to save on developer time (which tends to be more expensive than cluster time, for small and medium clusters).

Leave a Reply

Your email address will not be published. Required fields are marked *

*

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong> <pre lang="" line="" escaped="" highlight="">