Thursday 4 January 2018

Spark SQL - I

Apache Spark is a open source fast processing engine that offers speed with ease of use and is ideally suited for analyzing large datasets

The Apache Spark Ecosystem consists of the following:

1) Spark Core: Spark Core is the basic execution engine on top of which all other Spark functionality are built. It relies on in-memory computing capabilities to deliver speed. It is a generalized execution model that supports a wide variety of applications, and Java, Scala, and Python APIs for ease of development

2) Spark Steaming: Spark Streaming runs on top of Spark and has the capability to process and analyze both batch and streaming data in real-time. It can integrate with data sources as HDFS, Flume, Kafka, Twitter, etc

3) Spark MLlib: Running on top of Spark, Spark MLlib is a scalable machine learning library that delivers high-quality Machine Learning algorithms that usually involve multiple iterations with impressive speed. The library is usable in Java, Scala, and Python languages as part of Spark applications.

4) Spark GraphX: Using GraphX users can interactively build, transform and analyze graph structured data. It is a graph computation engine built on top of Sparkand has its own library of common algorithms.

5) Spark SQL: Spark SQL is a module built on top of Spark and provides a interactive SQL platform to Analysts, Data Scientists and, Business Intelligence professionals. It also supports a dataset called DataFrames. Spark SQL has shown up to 100x faster performance compared to Hadoop Hive while running the Hive queries

The focus of this series of posts will be Spark SQL. For all work in this post, we will use the Cloudera sandbox, Cloudera QuickStart VM 5.12. Before we begin writing any queries in Spark SQL, some preparatory work needs to be done. Refer to this post for uploading the JSON file that we will be using for this post. A sample of the data is shown below for reference:















We will place the JSON file in the same location as in the earlier mentioned post. Then, we can proceed to the terminal window for more exciting actions. On the terminal window, enter spark-shell to start the interactive Spark Shell as shown below:












It shows that the Spark version is 1.6.0. It is important to note the version as only APIs corresponding to that version will be valid. Since Spark is evolving, we can expect changes in the APIs and APIs in later versions of Spark may not work in Spark 1.6.0.

Form the above output, we can see that there is already a Spark Context and SQL Context that are available. that we will use. We can check them as shown below:

scala> sc
res0: org.apache.spark.SparkContext = org.apache.spark.SparkContext@2a5b5673

scala> sqlContext
res1: org.apache.spark.sql.SQLContext = org.apache.spark.sql.hive.HiveContext@5602e0d5


We can then read the JSON file that we uploaded with below command:

val df = sqlContext.read.json("/user/cloudera/employees/employees.json")

The results are shown below: 

scala> val df = sqlContext.read.json("/user/cloudera/employees/employees.json")
df: org.apache.spark.sql.DataFrame = [COMMISSION_PCT: double, DEPARTMENT_ID: bigint, EMAIL: string, EMPLOYEE_ID: bigint, FIRST_NAME: string, HIRE_DATE: string, JOB_ID: string, LAST_NAME: string, MANAGER_ID: bigint, PHONE_NUMBER: string, SALARY: bigint]


Note that the column names are picked up by default and data types are inferred. We can see the schema using below command:

df.printSchema

The results are shown below:
  
scala> df.printSchema
root
 |-- COMMISSION_PCT: double (nullable = true)
 |-- DEPARTMENT_ID: long (nullable = true)
 |-- EMAIL: string (nullable = true)
 |-- EMPLOYEE_ID: long (nullable = true)
 |-- FIRST_NAME: string (nullable = true)
 |-- HIRE_DATE: string (nullable = true)
 |-- JOB_ID: string (nullable = true)
 |-- LAST_NAME: string (nullable = true)
 |-- MANAGER_ID: long (nullable = true)
 |-- PHONE_NUMBER: string (nullable = true)
 |-- SALARY: long (nullable = true)


 To see data, we use show as shown below:

 df.show

 The results are shown below:













Note that only first 20 records are shown. In the next example, we select one column only and display only first 3 records:

scala> df.select("FIRST_NAME").show(3)
+----------+
|FIRST_NAME|
+----------+
|    Steven|
|     Neena|
|       Lex|
+----------+
only showing top 3 rows


We can select multiple columns as shown below:

scala> df.select(("FIRST_NAME"),("LAST_NAME")).show(3)
+----------+---------+
|FIRST_NAME|LAST_NAME|
+----------+---------+
|    Steven|     King|
|     Neena|  Kochhar|
|       Lex|  De Haan|
+----------+---------+
only showing top 3 rows


We can concatenate strings as shown below with a separator added:

scala> df.select(concat_ws(" ",$"FIRST_NAME", $"LAST_NAME")).show(3)
+---------------------------------+
|concat_ws( ,FIRST_NAME,LAST_NAME)|
+---------------------------------+
|                      Steven King|
|                    Neena Kochhar|
|                      Lex De Haan|
+---------------------------------+
only showing top 3 rows


We can add calculations as shown below:

scala> df.select(concat($"FIRST_NAME", lit(" "), $"LAST_NAME"),($"SALARY")+1000).show(3)
+------------------------------+---------------+
|concat(FIRST_NAME, ,LAST_NAME)|(SALARY + 1000)|
+------------------------------+---------------+
|                   Steven King|          25000|
|                 Neena Kochhar|          18000|
|                   Lex De Haan|          18000|
+------------------------------+---------------+
only showing top 3 rows
 

We add a filter next and select on a few columns only. Note that the column name for the first column has been replaced by an alias:

scala> df.filter(df("SALARY") > 15000).select(concat(df("FIRST_NAME"), lit(" "),df("LAST_NAME")).alias("NAME"),df("SALARY")).show(3)
+-------------+------+
|         NAME|SALARY|
+-------------+------+
|  Steven King| 24000|
|Neena Kochhar| 17000|
|  Lex De Haan| 17000|
+-------------+------+


The next query shows unique JOB_IDs:

scala> df.select("JOB_ID").distinct.show
+----------+                                                                   
|    JOB_ID|
+----------+
|     AD_VP|
|    MK_REP|
|   AD_ASST|
|   IT_PROG|
|  PU_CLERK|
|    ST_MAN|
|    MK_MAN|
|   AD_PRES|
|    AC_MGR|
|  ST_CLERK|
|    HR_REP|
|    SA_REP|
|    PR_REP|
|FI_ACCOUNT|
|  SH_CLERK|
|AC_ACCOUNT|
|    PU_MAN|
|    FI_MGR|
|    SA_MAN|
+----------+


We can use order by as shown below:

scala> df.select("JOB_ID").distinct.orderBy("JOB_ID").show
+----------+                                                                   
|    JOB_ID|
+----------+
|AC_ACCOUNT|
|    AC_MGR|
|   AD_ASST|
|   AD_PRES|
|     AD_VP|
|FI_ACCOUNT|
|    FI_MGR|
|    HR_REP|
|   IT_PROG|
|    MK_MAN|
|    MK_REP|
|    PR_REP|
|  PU_CLERK|
|    PU_MAN|
|    SA_MAN|
|    SA_REP|
|  SH_CLERK|
|  ST_CLERK|
|    ST_MAN|
+----------+ 


We can get a count of distinct JOB_ID with below query:

scala> df.select("JOB_ID").distinct.count
res36: Long = 19  


The next query calculates the average salary of each JOB_ID category:

scala> df.groupBy("JOB_ID").avg("SALARY").show
+----------+-----------+
|    JOB_ID|avg(SALARY)|
+----------+-----------+
|     AD_VP|    17000.0|
|    MK_REP|     6000.0|
|   AD_ASST|     4400.0|
|   IT_PROG|     5760.0|
|  PU_CLERK|     2780.0|
|    ST_MAN|     7280.0|
|    MK_MAN|    13000.0|
|   AD_PRES|    24000.0|
|    AC_MGR|    12008.0|
|  ST_CLERK|     2785.0|
|    HR_REP|     6500.0|
|    SA_REP|     8350.0|
|    PR_REP|    10000.0|
|FI_ACCOUNT|     7920.0|
|  SH_CLERK|     3215.0|
|AC_ACCOUNT|     8300.0|
|    PU_MAN|    11000.0|
|    FI_MGR|    12008.0|
|    SA_MAN|    12200.0|
+----------+-----------+


The next query groups two measures:

scala> df.groupBy($"JOB_ID").agg(Map("SALARY" -> "avg","JOB_ID" -> "count")).show
+----------+-----------+-------------+                                         
|    JOB_ID|avg(SALARY)|count(JOB_ID)|
+----------+-----------+-------------+
|     AD_VP|    17000.0|            2|
|    MK_REP|     6000.0|            1|
|   AD_ASST|     4400.0|            1|
|   IT_PROG|     5760.0|            5|
|  PU_CLERK|     2780.0|            5|
|    ST_MAN|     7280.0|            5|
|    MK_MAN|    13000.0|            1|
|   AD_PRES|    24000.0|            1|
|    AC_MGR|    12008.0|            1|
|  ST_CLERK|     2785.0|           20|
|    HR_REP|     6500.0|            1|
|    SA_REP|     8350.0|           30|
|    PR_REP|    10000.0|            1|
|FI_ACCOUNT|     7920.0|            5|
|  SH_CLERK|     3215.0|           20|
|AC_ACCOUNT|     8300.0|            1|
|    PU_MAN|    11000.0|            1|
|    FI_MGR|    12008.0|            1|
|    SA_MAN|    12200.0|            5|
+----------+-----------+-------------+


selectExpr can be used to add expressions to fetch results as shown below:

scala> df.selectExpr("cast(HIRE_DATE as DATE)").sort($"HIRE_DATE".desc).limit(10).show
+----------+
| HIRE_DATE|
+----------+
|2008-04-21|
|2008-04-21|
|2008-03-24|
|2008-03-08|
|2008-02-23|
|2008-02-06|
|2008-02-03|
|2008-01-29|
|2008-01-24|
|2008-01-13|
+----------+


In the last query of this post, we are casting HIRE_DATE as DATE type and sort the first 10 records in descending order. In this first post on Spark SQL, we have covered some basic SQL operations in Spark SQL