Wednesday, 17 January 2018

Spark - III

We continue to explore Spark in the third part of the series. The first part of the Spark post is here in case you wish to look at it for the environment setup. We will use be using Hortonworks Sandbox HDP 2.6.3 for all the work in this post. Note that the Spark version is 2.2.x. This is different from the version in first post on Spark. We will continue with the different types of transformations in Spark.

8) intersection(other Dataset): Returns a dataset that is an intersection of the calling dataset and the called dataset

rdd.intersection(rdd2).collect

This returns below results:

res132: Array[Int] = Array(6, 3, 4, 1, 5, 2)

9) distinct([numTasks]): Returns a dataset that contains distinct elements. numTasks is optional and is the number of tasks

sc.parallelize(Vector(1,1,2,2,3,3)).distinct.collect
This returns below results:

res141: Array[Int] = Array(2, 1, 3)

10) groupBy: Returns a RDD after grouping by set criteria

val Array1: Array[(String, Int)]  = Array(("King",24000), ("Kochhar",17000),("De Haan", 17000))
val rdd3 = sc.parallelize(Array1)
rdd3.groupBy(x => x._1.charAt(0)).collect


This returns below results:

Array1: Array[(String, Int)] = Array((King,24000), (Kochhar,17000), (De Haan,17000))
rdd3: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[160] at parallelize at <console>:29
res163: Array[(Char, Iterable[(String, Int)])] = Array((D,CompactBuffer((De Haan,17000))), (K,CompactBuffer((King,24000), (Kochhar,17000))))


Here the grouping is by the first character of the first element in each key value pair

11) groupByKey([numTasks]): Returns a RDD of (K, Iterable<V>) pairs on a RDD of (K, V) pairs

val Array2: Array[(String, Int)]  = Array(("King",24000), ("King",17000),("De Haan", 17000))
val rdd3 = sc.parallelize(Array2)
rdd3.groupByKey.collect


This returns below results:

Array2: Array[(String, Int)] = Array((King,24000), (King,17000), (De Haan,17000))
rdd3: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[163] at parallelize at <console>:29
res164: Array[(String, Iterable[Int])] = Array((King,CompactBuffer(24000, 17000)), (De Haan,CompactBuffer(17000)))


12) reduceByKey(func, [numTasks]): Returns a dataset of (K, V) pairs on a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V

val Array2: Array[(String, Int)]  = Array(("King",24000), ("King",17000),("De Haan", 17000))
val rdd3 = sc.parallelize(Array2)
rdd3.reduceByKey((x,y) => (x+y)).collect
rdd3.reduceByKey(_ + _).collect


This returns below results:

Array2: Array[(String, Int)] = Array((King,24000), (King,17000), (De Haan,17000))
rdd3: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[175] at parallelize at <console>:29
res172: Array[(String, Int)] = Array((King,41000), (De Haan,17000))
res173: Array[(String, Int)] = Array((King,41000), (De Haan,17000))


13) aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]): Returns a dataset of (K, U) pairs on a dataset of (K, V) pairs where the values for each key are aggregated using the given combine functions and a default "zero" value

val Array2: Array[(String, Int)]  = Array(("King",24000), ("King",17000),("De Haan", 17000))
val rdd3 = sc.parallelize(Array2)
rdd3.aggregateByKey(0)((accumulator, v) => accumulator + v, (v1, v2) => v1 + v2).collect


This returns below results:

Array2: Array[(String, Int)] = Array((King,24000), (King,17000), (De Haan,17000))
rdd3: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[182] at parallelize at <console>:29
res177: Array[(String, Int)] = Array((King,41000), (De Haan,17000))


14) sortByKey([ascending], [numTasks]): Returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending argument on a dataset of (K, V) pairs where K implements Ordered

val Array2: Array[(String, Int)]  = Array(("King",24000), ("King",17000),("De Haan", 17000)) 
val rdd3 = sc.parallelize(Array2)
rdd3.sortByKey().collect


This returns below results:

Array2: Array[(String, Int)] = Array((King,24000), (King,17000), (De Haan,17000))
rdd3: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[187] at parallelize at <console>:29
res180: Array[(String, Int)] = Array((De Haan,17000), (King,24000), (King,17000))
 
Default sorting is in ascending order
 
15)  join(otherDataset, [numTasks]):Returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key on datasets of type (K, V) and (K, W)
 
val rdd4 = sc.makeRDD(Array(("foo",1),("bar",2)))
rdd4.collect
val rdd5 = sc.makeRDD(Array(("foo","foo"),("bar","bar")))
rdd5.collect
rdd4.join(rdd5).collect


This returns below results:

rdd4: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[191] at makeRDD at <console>:27
res182: Array[(String, Int)] = Array((foo,1), (bar,2))
rdd5: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[192] at makeRDD at <console>:27
res183: Array[(String, String)] = Array((foo,foo), (bar,bar))
res184: Array[(String, (Int, String))] = Array((foo,(1,foo)), (bar,(2,bar)))
 
16) cogroup(otherDataset, [numTasks]): Returns a dataset of (K, (Iterable<V>, Iterable<W>)) tuples on datasets of type (K, V) and (K, W)
 
rdd4.cogroup(rdd5).collect
 
This returns below results:

res186: Array[(String, (Iterable[Int], Iterable[String]))] = Array((foo,(CompactBuffer(1),CompactBuffer(foo))), (bar,(CompactBuffer(2),CompactBuffer(bar))))

17) cartesian(otherDataset): Returns a dataset of (T, U) pairs (all pairs of elements) on datasets of types T and U

rdd4.cartesian(rdd4).collect

This returns below results:

res188: Array[((String, Int), (String, Int))] = Array(((foo,1),(foo,1)), ((foo,1),(bar,2)), ((bar,2),(foo,1)), ((bar,2),(bar,2)))

18) pipe(command, [envVars]): Returns output after piping each partition of RDD through shell command

19) coalesce(numPartitions): Returns RDD with number of partitions in numPartitions

val rdd2 = sc.parallelize(1 to 6,3)
rdd2.collect
rdd2.getNumPartitions
rdd2.coalesce(2).getNumPartitions

This returns below results:

rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[205] at parallelize at <console>:27
res195: Array[Int] = Array(1, 2, 3, 4, 5, 6)
res196: Int = 3
res197: Int = 2
 
20) repartition(numPartitions): Reshuffles the data in the RDD randomly to create either more or fewer partitions and balance it across them

val rdd6 = sc.parallelize(1 to 12,3)
rdd6.getNumPartitions
rdd6.saveAsTextFile("/user/spark_test/test/")

This returns below results:

rdd6: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[219] at parallelize at <console>:27
res206: Int = 3
 
Looking at the files using Ambari in /user/spark_test/test/ directory, we see three data files with entries as:
 
1
2
3
4

5
6
7
8

9
10
11
12


Now, let us change the partitions to 4 using repartition:
 
rdd6.repartition(4).getNumPartitions
rdd6.repartition(4).saveAsTextFile("/user/spark_test/test1/")

This returns below results:
 
res208: Int = 4
 
Looking at the files using Ambari in /user/spark_test/test1/ directory, we see four data files with entries as:
 
2
6
10

3
7
11

4
8
12

1
5
9

 
21)  repartitionAndSortWithinPartitions(partitioner): Repartitions the RDD according to the given partitioner and, within each resulting partition, sort records by their keys

import org.apache.spark.HashPartitioner
val Array2: Array[(String, Int)]  = Array(("d",4),("a",1),("c",3),("f",6),("e",5),("b",2))
val rdd7 = sc.makeRDD(Array2)
val repartitioned = rdd7.repartitionAndSortWithinPartitions(new HashPartitioner(2))
repartitioned.saveAsTextFile("/user/spark_test/test/")

This returns below results:
 
import org.apache.spark.HashPartitioner
Array2: Array[(String, Int)] = Array((d,4), (a,1), (c,3), (f,6), (e,5), (b,2))
rdd7: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[249] at makeRDD at <console>:36
repartitioned: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[250] at repartitionAndSortWithinPartitions at <console>:38
 
We see two files in that directory with contents as:

(b,2)
(d,4)
(f,6)
 
(a,1)
(c,3)
(e,5)
 
This concludes the topic of transformations in Scala