Wednesday, 17 January 2018

Spark - IV

We continue to explore Spark in the fourth part of the series. The first part of the Spark post is here in case you wish to look at it for the environment setup. We will use be using Hortonworks Sandbox HDP 2.6.3 for all the work in this post. Note that the Spark version is 2.2.x. This is different from the version in first post on Spark. We will look at the different types of actions in Spark.

Let us create a RDD first before we look at actions in Spark as shown below and print it out:

val rdd = sc.makeRDD(Vector(1,2,3,4,5,6))
rdd.collect


This returns below results:

rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[254] at makeRDD at <console>:34
res217: Array[Int] = Array(1, 2, 3, 4, 5, 6)
Actions in Spark are described below:
1)  reduce(func): Returns the result after aggregation of the elements of calling RDD using a function, func (which takes two arguments and returns one value). func should be both commutative and associative so that it can be computed correctly in parallel
val result = rdd.reduce((x,y) => x+y)

This returns below results:

result: Int = 21

Same commend in a different syntax:

val result = rdd.reduce(_+_)

This returns below results:

result: Int = 21
2)  collect: Returns all the elements of the RDD as an array
We have seen collect when created rdd above
3)  count: Return the number of elements in RDD

rdd.count

This returns below results:

res221: Long = 6

4) first: Return the first element of RDD

rdd.first

This returns below results:

res222: Int = 1

5) take(n): Returns the first n elements of RDD

rdd.take(3)

This returns below results:

res223: Array[Int] = Array(1, 2, 3)

6) takeSample(withReplacement, num, [seed]): Returns an array with a random sample of num elements of RDD, with or without replacement, optionally pre-specifying a random number generator seed

With replacement:

rdd.takeSample(true, 3)

This returns below results:

res224: Array[Int] = Array(2, 5, 5)

Without replacement:

rdd.takeSample(false, 3)

This returns below results:

res225: Array[Int] = Array(3, 5, 6)

7)  takeOrdered(n, [ordering]): Returns the first n elements of the RDD using an optional parameter that specifies either their natural order or a custom comparator

val rdd1 = sc.makeRDD(Vector("Hello, ","This ","is ","Spark ","language."))
rdd1.takeOrdered(4)


This returns below results:
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[259] at makeRDD at <console>:34
res227: Array[String] = Array("Hello, ", "Spark ", "This ", "is ")

8)  saveAsTextFile(path): Outputs the elements of RDD as a text file (or set of text files) in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system

rdd1.saveAsTextFile("/user/spark_test/test/")

This returns below results:

content of part-00000 file at /user/spark_test/test/:

Hello,
This


content of part-00001 file at /user/spark_test/test/:

is
Spark
language. 


9) saveAsSequenceFile(path): Outputs the elements of RDD as a Hadoop Sequence file in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system

10)  saveAsObjectFile(path): Outputs the elements of RDD  in a simple format using Java serialization in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system

rdd1.saveAsTextFile("/user/spark_test/test1/")

11) countByKey: Returns a hashmap of (K, Int) pairs with the count of each key for RDDs of type (K, V)

val rdd2 = sc.makeRDD(Array(("foo",1), ("bar",2),("foo","foo"), ("bar","bar"),("foo","foo"), ("bar","bar"),("foo","foo"), ("bar","bar"),("foo",3)))
rdd2.countByKey 


This returns below results:

rdd2: org.apache.spark.rdd.RDD[(String, Any)] = ParallelCollectionRDD[269] at makeRDD at <console>:34
res236: scala.collection.Map[String,Long] = Map(foo -> 5, bar -> 4)
 
12) foreach(func): Run a function func on each element of RDD
 
rdd.collect.foreach(println)

This returns below results:

1
2
3
4
5
6

This concludes the topic of Actions in Spark