import java.util.*; import java.io.*; import org.apache.hadoop.fs.Path; import org.apache.hadoop.conf.*; import org.apache.hadoop.io.*; import org.apache.hadoop.mapred.*; import org.apache.hadoop.util.*; public class WordCount { public static class WordCountMapper extends MapReduceBase implements Mapper { public void map (LongWritable key, Text value, OutputCollector output, Reporter report) throws IOException { // Map logic: get words from the text, emit (Word, 1) tuples String line = value.toString(); Scanner scanner = new Scanner(line); while (scanner.hasNext()) { String token = scanner.next(); Text word = new Text(); word.set(token); output.collect(word, new IntWritable(1)); } } } public static class WordCountReducer extends MapReduceBase implements Reducer { public void reduce (Text key, Iterator values, OutputCollector output, Reporter report) throws IOException { // Reduce logic: sum up all the values from ..well.. values int count = 0; while (values.hasNext()) count += values.next().get(); output.collect(key, new LongWritable(count)); } } public static void main (String args[]) throws Exception { JobConf conf = new JobConf(WordCount.class); conf.setJobName("WordCount Example!"); conf.setMapperClass(WordCountMapper.class); conf.setReducerClass(WordCountReducer.class); conf.setMapOutputKeyClass(Text.class); conf.setMapOutputValueClass(IntWritable.class); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(LongWritable.class); FileInputFormat.setInputPaths(conf, new Path(args[0])); FileOutputFormat.setOutputPath(conf, new Path(args[1])); JobClient.runJob(conf); } }