Wednesday 17 January 2018

Spark - II

We continue to explore Spark in the second part of the series. The first part of the Spark post is here in case you wish to look at it for the environment setup. We will use be using Hortonworks Sandbox HDP 2.6.3 for all the work in this post. Note that the Spark version is 2.2.x. This is different from the version in first post on Spark. We will begin by looking at the different types of transformations in Spark.

Running below code as in Spark I:

%spark2

sc
sc.appName
sc.applicationId
sc.defaultMinPartitions
sc.defaultParallelism
sc.deployMode
sc.isLocal
sc.master
sc.sparkUser
sc.startTime
sc.version


This returns below results:

res0: org.apache.spark.SparkContext = org.apache.spark.SparkContext@294437ac
res1: String = Zeppelin
res2: String = application_1516119957831_0001
res3: Int = 2
res4: Int = 2
res5: String = client
res6: Boolean = false
res7: String = yarn-client
res8: String = zeppelin
res9: Long = 1516120813279
res10: String = 2.2.0.2.6.3.0-235

Let us create a RDD first before we look at the transformations in Spark as shown below and print it out:

val rdd = sc.makeRDD(Array(1,2,3,4,5,6))
rdd.collect.foreach(print)

This returns below results:

rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[20] at makeRDD at <console>:27

123456

Since the results are not clear, let us print as shown below:

print(rdd.collect.mkString(","))

This returns below results:

1,2,3,4,5,6

Now we have a RDD. let us start with the different transformations:

1) map(func): Returns a new RDD that is created by passing the function to each element of the source RDD


rdd.map(x => x*x).collect

This returns below results:

 res47: Array[Int] = Array(1, 4, 9, 16, 25, 36)

The square of each element in source RDD is returned

2) filter(func): Returns a new RDD with elements that satisfy the function

rdd.filter(x => x%3 == 0).collect
  
This returns below results:

res52: Array[Int] = Array(3, 6)

3) flatMap(func): Returns a new RDD that is created by passing the function to each element of the source RDD such that each input element can be mapped to zero or more output elements. The input elements are flattened to give the output

rdd.flatMap(x => Array(x,x*x)).collect

This returns below results:

res65: Array[Int] = Array(1, 1, 2, 4, 3, 9, 4, 16, 5, 25, 6, 36)

Note that the input values are flattened

4) mapPartitions(func): Similar to map. While the function in map is applied to each element, in mapPartition, func is applied to each partition. func must be of type Iterator<T> => Iterator<U> for an RDD of type T

Let us create a new RDD with three partitions:

val rdd2 = sc.parallelize(1 to 6,3)
rdd2.collect
rdd2.getNumPartitions


This returns below results:

rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[42] at parallelize at <console>:27
res83: Array[Int] = Array(1, 2, 3, 4, 5, 6)
res84: Int = 3

Applying mapPartition to collect the sum of each partition: 

rdd2.mapPartitions( (iterator: Iterator[Int]) => { var sum = 0
  while (iterator.hasNext) {
    sum += iterator.next
  }
  Iterator(sum)
}).collect

This returns below results:

res87: Array[Int] = Array(3, 7, 11)

5) mapPartitionsWithIndex(func): Similar to mapPartitions but with the addition of an index. func must be of type (Int, Iterator<T>) => Iterator<U>  for an RDD of type T

We will modify above code to add the index:

rdd2.mapPartitionsWithIndex( (index: Int,iterator: Iterator[Int]) => { var sum = 0
  while (iterator.hasNext) {
    sum += iterator.next
  }
  Iterator(index, sum)
}).collect

This returns below results:

res88: Array[Int] = Array(0, 3, 1, 7, 2, 11)

6)  sample(withReplacement, fraction, seed): Return a sample of elements from source RDD based on fraction and seed

rdd.sample(true,0.5).collect.foreach(println)

This returns below results:

2
4
4

Note that values repeat in above case

rdd.sample(false,0.5).collect.foreach(println) 

This returns below results: 

2
5
6

Note that values don't repeat 

7) union(other Dataset): Returns a dataset that is a union of the calling dataset and the called dataset

rdd.union(rdd2).collect 

This returns below results: 

res130: Array[Int] = Array(1, 2, 3, 4, 5, 6, 1, 2, 3, 4, 5, 6)

We continue in the next post ...