As we progress to the third segment in this series on Map Reduce in Hadoop, we look at some of the different calculations that can be done using Map Reduce. In particular, while we have already seen the count calculation here and here, in this, we look at calculating minimum, maximum and average measures. For the all the work in this post, we will be using the Cloudera sandbox, Cloudera QuickStart VM 5.12
The input data for the calculation of minimum, maximum and average will be diamond data available here. More details about the different columns are available at that site. We pick the carat data and price in USD data and calculate the measures for the price for a particular carat. Sample data is shown below:
Some of the points to be noted are:
a) Unlike the earlier cases where we dealt with text files, this is a .csv file
b) The first line consists of header data that needs to be ignored
c) Our data of interest are in carat, and price in USD columns
The Mapper program that is shown below:
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class MinMaxAverageMapper extends Mapper<LongWritable,Text,DoubleWritable,IntWritable> {
private DoubleWritable key_carat = new DoubleWritable();
private IntWritable key_price = new IntWritable();
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line.replaceAll("[^A-Za-z0-9,.]",""),",");
if ( (key.get() == 0)) {
return;
} else {
while (tokenizer.hasMoreTokens()) {
tokenizer.nextToken();
key_carat.set(Double.parseDouble(tokenizer.nextToken()));
tokenizer.nextToken();
tokenizer.nextToken();
tokenizer.nextToken();
tokenizer.nextToken();
tokenizer.nextToken();
key_price.set(Integer.parseInt(tokenizer.nextToken()));
context.write(key_carat,key_price);
break;
}
}
}
}
The Reducer program is shown below:
import java.io.IOException;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class MinMaxAverageReducer extends Reducer<DoubleWritable, IntWritable,Text,Text> {
public void reduce(DoubleWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException{
int count = 0;
int sum = 0;
int maxValue = 0;
int minValue = 0;
while(values.iterator().hasNext()){
int currValue = values.iterator().next().get();
count++;
sum += currValue;
if (count==1){
maxValue = currValue;
minValue = currValue;
}
if( currValue > maxValue){
maxValue = currValue;
}
if( currValue < minValue){
minValue = currValue;
}
}
context.write(new Text("For " + key + " carat "), new Text("number of records is " + count + " Average Price is " + (sum/count) + " maxPrice is " + maxValue + " minPrice is " + minValue ));
}
}
The Driver program is shown below:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MinMaxAverageDriver {
public static void main(String[] args) throws Exception {
if(args.length !=2){
System.err.println("Enter 'input path' and 'output path' arguments");
System.exit(0);
}
Configuration conf = new Configuration();
Job job = Job.getInstance(conf,"Third Word Count");
job.setJarByClass(MinMaxAverageDriver.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(MinMaxAverageMapper.class);
job.setReducerClass(MinMaxAverageReducer.class);
job.setOutputKeyClass(DoubleWritable.class);
job.setOutputValueClass(IntWritable.class);
System.exit(job.waitForCompletion(true)?0:1);
}
}
The output is shown below:
Per the goal of this post, we see details of number of records, maximum price, minimum price and the average price (all prices are in USD) per category
The input data for the calculation of minimum, maximum and average will be diamond data available here. More details about the different columns are available at that site. We pick the carat data and price in USD data and calculate the measures for the price for a particular carat. Sample data is shown below:
Some of the points to be noted are:
a) Unlike the earlier cases where we dealt with text files, this is a .csv file
b) The first line consists of header data that needs to be ignored
c) Our data of interest are in carat, and price in USD columns
The Mapper program that is shown below:
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class MinMaxAverageMapper extends Mapper<LongWritable,Text,DoubleWritable,IntWritable> {
private DoubleWritable key_carat = new DoubleWritable();
private IntWritable key_price = new IntWritable();
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line.replaceAll("[^A-Za-z0-9,.]",""),",");
if ( (key.get() == 0)) {
return;
} else {
while (tokenizer.hasMoreTokens()) {
tokenizer.nextToken();
key_carat.set(Double.parseDouble(tokenizer.nextToken()));
tokenizer.nextToken();
tokenizer.nextToken();
tokenizer.nextToken();
tokenizer.nextToken();
tokenizer.nextToken();
key_price.set(Integer.parseInt(tokenizer.nextToken()));
context.write(key_carat,key_price);
break;
}
}
}
}
The Reducer program is shown below:
import java.io.IOException;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class MinMaxAverageReducer extends Reducer<DoubleWritable, IntWritable,Text,Text> {
public void reduce(DoubleWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException{
int count = 0;
int sum = 0;
int maxValue = 0;
int minValue = 0;
while(values.iterator().hasNext()){
int currValue = values.iterator().next().get();
count++;
sum += currValue;
if (count==1){
maxValue = currValue;
minValue = currValue;
}
if( currValue > maxValue){
maxValue = currValue;
}
if( currValue < minValue){
minValue = currValue;
}
}
context.write(new Text("For " + key + " carat "), new Text("number of records is " + count + " Average Price is " + (sum/count) + " maxPrice is " + maxValue + " minPrice is " + minValue ));
}
}
The Driver program is shown below:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MinMaxAverageDriver {
public static void main(String[] args) throws Exception {
if(args.length !=2){
System.err.println("Enter 'input path' and 'output path' arguments");
System.exit(0);
}
Configuration conf = new Configuration();
Job job = Job.getInstance(conf,"Third Word Count");
job.setJarByClass(MinMaxAverageDriver.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(MinMaxAverageMapper.class);
job.setReducerClass(MinMaxAverageReducer.class);
job.setOutputKeyClass(DoubleWritable.class);
job.setOutputValueClass(IntWritable.class);
System.exit(job.waitForCompletion(true)?0:1);
}
}
The output is shown below:
Per the goal of this post, we see details of number of records, maximum price, minimum price and the average price (all prices are in USD) per category