import java.util.*; import java.io.*; import org.apache.hadoop.fs.Path; import org.apache.hadoop.conf.*; import org.apache.hadoop.io.*; import org.apache.hadoop.mapred.*; import org.apache.hadoop.util.*; public class WordCount{ public static class Map extends MapReduceBase implements Mapper { public void map(LongWritable key, Text value, OutputCollector output, Reporter report) throws IOException { // Program logic String line = value.toString(); Scanner scanner = new Scanner(line); while (scanner.hasNext()) { String token = scanner.next(); Text word = new Text(); word.set(token); output.collect(word, new IntWritable(1)); } } } public static class Reduce extends MapReduceBase implements Reducer { public void reduce(Text key, Iterator values, OutputCollector output, Reporter report) throws IOException { int count = 0; // Program logic while (values.hasNext()) { count += values.next().get(); } //Emit final key and value output.collect(key, new LongWritable(count)); } } public static void main(String args[]) throws Exception{ JobConf conf = new JobConf(WordCount.class); conf.setJobName("WordCount Example!"); conf.setMapperClass(Map.class); conf.setReducerClass(Reduce.class); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); conf.setNumReduceTasks(1); FileInputFormat.setInputPaths(conf, new Path(args[0])); FileOutputFormat.setOutputPath(conf, new Path(args[1])); JobClient.runJob(conf); } }