Friday 23 March 2018

Hadoop Map Reduce IV

In the fourth segment, we take a look at how we can analyze data to come up with a Top N analysis. Like in the earlier posts, we will be using the Cloudera sandbox, Cloudera QuickStart VM 5.12. For the limited setup that we are working on, the code is a first take on Top N analysis and gives the Top 10 records having the highest prices (in USD) in descending order. For a full fledged Hadoop cluster, this code can be improvised

The input data for this analysis will be the same as it was in this post and is available here. More details about the different columns are available at that site. Sample data is shown below:

















The Mapper program that is shown below:

import java.io.IOException;
import java.util.Collections;
import java.util.Map.Entry;
import java.util.StringTokenizer;
import java.util.TreeMap;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class Top10Mapper extends Mapper<LongWritable,Text,NullWritable,Text> {
    private int price;
    private TreeMap<IntWritable, Text> priceMap = new TreeMap<>(Collections.reverseOrder());
  
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
  
    String line = value.toString();
    StringTokenizer tokenizer = new StringTokenizer(line.replaceAll("[^A-Za-z0-9,.]",""),",");
    if ( (key.get() == 0)) {
        return;
    } else {
    while (tokenizer.hasMoreTokens()) {
           tokenizer.nextToken();
           tokenizer.nextToken();
        tokenizer.nextToken();
        tokenizer.nextToken();
        tokenizer.nextToken();
        tokenizer.nextToken();
        tokenizer.nextToken();
        price = Integer.parseInt(tokenizer.nextToken());
        priceMap.put(new IntWritable(price), new Text(line));  
        if (priceMap.size() > 10) {
            priceMap.remove(priceMap.lastKey());
        }
        break;
    }
    }
}
    protected void cleanup(Context context) throws IOException,InterruptedException {
        for (Entry<IntWritable, Text> entry : priceMap.entrySet()) {
            context.write(NullWritable.get(),new Text(entry.getValue()));
        }
      
    }
}


The Driver program is shown below:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class Top10Driver {

    public static void main(String[] args) throws Exception {
        if(args.length !=2){
            System.err.println("Enter 'input path' and 'output path' arguments");
            System.exit(0);
        }
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf,"Top 10");
        job.setJarByClass(Top10Driver.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        job.setMapperClass(Top10Mapper.class);
        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(Text.class);
        System.exit(job.waitForCompletion(true)?0:1);
    }
}


The output is shown below:

















Some points to note are:

a) Only Mapper is used as this will suffice the input dataset
b) Duplicates are not addressed in the code. If there are duplicates, only one record is picked up

Notwithstanding above points, the code nevertheless gives an idea about the Top 10 prices (in USD) in descending order starting from 18823 through 18787 from the input dataset

Thursday 22 March 2018

Hadoop Map Reduce III

As we progress to the third segment in this series on Map Reduce in Hadoop, we look at some of the different calculations that can be done using Map Reduce. In particular, while we have already seen the count calculation here and here, in this, we look at calculating minimum, maximum and average measures. For the all the work in this post, we will be using the Cloudera sandbox, Cloudera QuickStart VM 5.12

The input data for the calculation of minimum, maximum and average will be diamond data available here. More details about the different columns are available at that site. We pick the carat data and price in USD data and calculate the measures for the price for a particular carat. Sample data is shown below:















Some of the points to be noted are:

a) Unlike the earlier cases where we dealt with text files, this is a .csv file
b) The first line consists of header data that needs to be ignored
c) Our data of interest are in carat, and price in USD columns

The Mapper program that is shown below:

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class MinMaxAverageMapper extends Mapper<LongWritable,Text,DoubleWritable,IntWritable> {
    private DoubleWritable key_carat = new DoubleWritable();
    private IntWritable key_price = new IntWritable();
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{

    String line = value.toString();
    StringTokenizer tokenizer = new StringTokenizer(line.replaceAll("[^A-Za-z0-9,.]",""),",");
    if ( (key.get() == 0)) {
        return;
    } else {
    while (tokenizer.hasMoreTokens()) {
           tokenizer.nextToken();
          key_carat.set(Double.parseDouble(tokenizer.nextToken()));
        tokenizer.nextToken();
        tokenizer.nextToken();
        tokenizer.nextToken();
        tokenizer.nextToken();
        tokenizer.nextToken();
        key_price.set(Integer.parseInt(tokenizer.nextToken()));
        context.write(key_carat,key_price);
        break;
    }
    }
}

}


The Reducer program is shown below:

import java.io.IOException;

import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class MinMaxAverageReducer extends Reducer<DoubleWritable, IntWritable,Text,Text> {

    public void reduce(DoubleWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException{
      
        int count = 0;
        int sum = 0;
        int maxValue = 0;
        int minValue = 0;

        while(values.iterator().hasNext()){
            int currValue = values.iterator().next().get();
            count++;
            sum += currValue;
            if (count==1){
                maxValue = currValue;
                minValue = currValue;
            }
            if( currValue > maxValue){
                maxValue = currValue;
              }
            if( currValue < minValue){
                minValue = currValue;
            }
          
        }
       context.write(new Text("For " + key + " carat "), new Text("number of records is " + count + " Average Price is " + (sum/count) + " maxPrice is " + maxValue +  " minPrice is " + minValue ));

    }
}


The Driver program is shown below:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MinMaxAverageDriver {

    public static void main(String[] args) throws Exception {
        if(args.length !=2){
            System.err.println("Enter 'input path' and 'output path' arguments");
            System.exit(0);
        }
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf,"Third Word Count");
        job.setJarByClass(MinMaxAverageDriver.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        job.setMapperClass(MinMaxAverageMapper.class);
        job.setReducerClass(MinMaxAverageReducer.class);
        job.setOutputKeyClass(DoubleWritable.class);
        job.setOutputValueClass(IntWritable.class);
        System.exit(job.waitForCompletion(true)?0:1);
    }
}


The output is shown below:


Per the goal of this post, we see details of number of records, maximum price, minimum price and the average price (all prices are in USD) per category

Tuesday 20 March 2018

Hadoop Map Reduce II

In the second part of the series, we will refine the program written earlier. For the all the work in this post, we will be using the Cloudera sandbox, Cloudera QuickStart VM 5.12

We notice from the results of the word count from the last exercise that it has punctuation marks and this leads to incorrect classification of the words. Like for example, you can see from the results below that the word I is classified as "I, I and I; with different count values:












Ideally, we should only have a single I as they are all same. So, in this post, we will strip the input of all the punctuation characters and then attempt the word count. Only the mapper program will be refined with the reducer program and driver program remaining the same

The Mapper program that is shown below:

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class SecondWordCountMapper extends Mapper<LongWritable,Text,Text,IntWritable> {
    private Text word = new Text();
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{

    String line = value.toString();
    StringTokenizer tokenizer = new StringTokenizer(line.replaceAll("[^A-Za-z0-9]", " "));
    while (tokenizer.hasMoreTokens()) {
        word.set(tokenizer.nextToken());
        context.write(new Text(word),new IntWritable(1));
    }
}

}




The Reducer program is shown below:


import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class SecondWordCountReducer extends Reducer<Text, IntWritable,Text,IntWritable> {

    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException{

        int count = 0;
        for (IntWritable value:values)
        {
            count += value.get();
        }

        context.write(key, new IntWritable(count));

    }
}



The Driver program is shown below:


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class SecondWordCountDriver {

    public static void main(String[] args) throws Exception {
        if(args.length !=2){
            System.err.println("Enter 'input path' and 'output path' arguments");
            System.exit(0);
        }
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf,"Second Word Count");
        job.setJarByClass(SecondWordCountDriver.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        job.setMapperClass(SecondWordCountMapper.class);
        job.setReducerClass(SecondWordCountReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        System.exit(job.waitForCompletion(true)?0:1);
    }
}



The output is shown below:











Note that all punctuation characters are gone, and now I has a count of 10 in line with the aim of this post

Monday 19 March 2018

Hadoop Map Reduce I

In a series of posts, we plan to dive into Map Reduce programs in the Hadoop framework. For the all the work in this post, we will be using the Cloudera sandbox, Cloudera QuickStart VM 5.12

Map Reduce programming paradigm has become common place in the last decade. In brief, a typical big data program as word count in text format is seen as the classic hello world program. So, we will also start with the word count program

We are mostly interested in understanding the logic behind Map Reduce programming model. So, we will simplify the exercise to running the programs from Eclipse that comes with the sandbox in a very stand alone mode that will use the local file system and not HDFS. Though this setup will use only a single JVM, the ease of use is to appreciated as we will read the files directly and not writing any hdfs commands to read the files. Besides, we are not using any build tool as Maven in Eclipse and we will add the necessary jar files manually. So, running Map Reduce cannot really get any simpler than this!

There are three programs in all: the Mapper program that is shown below:

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class FirstWordCountMapper extends Mapper<LongWritable,Text,Text,IntWritable> {

    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{


    String line = value.toString();
    for (String word : line.split(" ")){
        if(word.length()>0){
            context.write(new Text(word),new IntWritable(1));
        }
    }
}

}


In brief, the above program reads the text input line by line, splits each line with space as the delimiter and then emits a (key, value) pair for each word as (word, 1). Then, we have the Reducer program shown below:

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class FirstWordCountReducer extends Reducer<Text, IntWritable,Text,IntWritable> {

    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException{
 

        int count = 0;
        for (IntWritable value:values)
        {
            count += value.get();
        }

        context.write(key, new IntWritable(count));

    }
}


The above program reads the (word, 1) pairs output from the mapper program and reduces it to (word, count) where count is the number of word occurrences in the text input

Lastly, to call the Mapper and Reducer programs, shown below is the Driver program:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class FirstWordCountDriver {

    public static void main(String[] args) throws Exception {
        if(args.length !=2){
            System.err.println("Enter 'input path' and 'output path' arguments");
            System.exit(0);
        }
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf,"First Word Count");
        job.setJarByClass(FirstWordCountDriver.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        job.setMapperClass(FirstWordCountMapper.class);
        job.setReducerClass(FirstWordCountReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        System.exit(job.waitForCompletion(true)?0:1);
    }
}


While the above three programs are the simplest of programs in the Map Reduce scheme of things, if you wish to understanding the finer details of above program, please refer to any site that may suit your taste. The internet is flooded with tons of such sites eager to help developers come online on Map Reduce

Once the Cloudera Sandbox comes up, click on the Eclipse icon on the desktop. This will bring up Eclipse Luna Service Release 2 (4.4.2). Notice that there is already a project as shown below:













The existing project is called training that we will ignore for now. Let us create a new project from scratch by right clicking in the space below training project as shown below:



In the window (shown below) that comes up, enter hadoop as the Project name and click on Finish button:

























Note that JavaSE-1.7 JRE will serve as the Execution environment. Notice that a new project has been created called hadoop. Select it and right click it to select Properties at the bottom as shown below:




















In the new window, select Java Build Path as shown below:














Click on Add External JARs ...

















Now, navigate to /usr/lib/hadoop/ directory and select jars indicated below under that directory and add them

hadoop-annotations.jar
hadoop-auth.jar
hadoop-common.jar
















In the same way, add commons-httpclient-3.1.jar from under /usr/lib/hadoop/lib/. Lastly, add the jars under /usr/lib/hadoop/client-0.20 as shown below:
















If all the jars are added, then, they look like this:















Click on OK. Select src under hadoop root folder and add a new class as shown below:
















In the window that comes up, clear the package name and enter FirstWordCountMapper as the class name and click on Finish:













Now, plug the above code for FirstWordCountMapper written at the start of this blog into the editor on the right and right click and click Save to save the program:










In the same manner, add the FirstWordCountReducer and FirstWordCountDriver programs:











Select hadoop, right click to select Folder to create a new folder called input as shown below:















Enter input in Folder name and click on Finish:




















Right click on the newly created input folder and click on Import ...:




















Click on Next >:



















In the next window, select the directory of interest by clicking Browse... and selecting a text file that will serve as text input. It is called blue_carbuncle.txt in our case. This is from an earlier blog on word count in Hive




















You can see the file added under input folder. We now have all the parts to run the Map Reduce program. Right click on the Driver program and click on Run As and then Run Configurations... to set the arguments for the Driver program:

















On the new window, enter input and output as shown in the Program arguments as shown below:

















Then, click on Run to run the program:














We can see the output in the Console in particular the below line:

18/03/19 10:46:38 INFO mapred.JobClient:  map 100% reduce 100%

Now, right click on hadoop and click on Refresh as shown below:



We can see a new folder called output having two files. Double click on part-r-00000 file to see the contents:












The results looks similar to one in the earlier post. This concludes the running of our first Map Reduce program