Friday 23 March 2018

Hadoop Map Reduce IV

In the fourth segment, we take a look at how we can analyze data to come up with a Top N analysis. Like in the earlier posts, we will be using the Cloudera sandbox, Cloudera QuickStart VM 5.12. For the limited setup that we are working on, the code is a first take on Top N analysis and gives the Top 10 records having the highest prices (in USD) in descending order. For a full fledged Hadoop cluster, this code can be improvised

The input data for this analysis will be the same as it was in this post and is available here. More details about the different columns are available at that site. Sample data is shown below:

















The Mapper program that is shown below:

import java.io.IOException;
import java.util.Collections;
import java.util.Map.Entry;
import java.util.StringTokenizer;
import java.util.TreeMap;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class Top10Mapper extends Mapper<LongWritable,Text,NullWritable,Text> {
    private int price;
    private TreeMap<IntWritable, Text> priceMap = new TreeMap<>(Collections.reverseOrder());
  
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
  
    String line = value.toString();
    StringTokenizer tokenizer = new StringTokenizer(line.replaceAll("[^A-Za-z0-9,.]",""),",");
    if ( (key.get() == 0)) {
        return;
    } else {
    while (tokenizer.hasMoreTokens()) {
           tokenizer.nextToken();
           tokenizer.nextToken();
        tokenizer.nextToken();
        tokenizer.nextToken();
        tokenizer.nextToken();
        tokenizer.nextToken();
        tokenizer.nextToken();
        price = Integer.parseInt(tokenizer.nextToken());
        priceMap.put(new IntWritable(price), new Text(line));  
        if (priceMap.size() > 10) {
            priceMap.remove(priceMap.lastKey());
        }
        break;
    }
    }
}
    protected void cleanup(Context context) throws IOException,InterruptedException {
        for (Entry<IntWritable, Text> entry : priceMap.entrySet()) {
            context.write(NullWritable.get(),new Text(entry.getValue()));
        }
      
    }
}


The Driver program is shown below:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class Top10Driver {

    public static void main(String[] args) throws Exception {
        if(args.length !=2){
            System.err.println("Enter 'input path' and 'output path' arguments");
            System.exit(0);
        }
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf,"Top 10");
        job.setJarByClass(Top10Driver.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        job.setMapperClass(Top10Mapper.class);
        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(Text.class);
        System.exit(job.waitForCompletion(true)?0:1);
    }
}


The output is shown below:

















Some points to note are:

a) Only Mapper is used as this will suffice the input dataset
b) Duplicates are not addressed in the code. If there are duplicates, only one record is picked up

Notwithstanding above points, the code nevertheless gives an idea about the Top 10 prices (in USD) in descending order starting from 18823 through 18787 from the input dataset