Thursday 4 January 2018

Spark SQL - II

We continue to explore Spark SQL in the second post. For all work in this post, we will use the Cloudera sandbox, Cloudera QuickStart VM 5.12. Please note that this VM comes with pre-installed Scala of version 1.6.0. So, all code in this post are compatible with Scala 1.6.0

Let us now register a dataframe as a table and then perform query operations. Recall that in the earlier post we created a dataframe using below command:

scala> val df = sqlContext.read.json("/user/cloudera/employees/employees.json")
df: org.apache.spark.sql.DataFrame = [COMMISSION_PCT: double, DEPARTMENT_ID: bigint, EMAIL: string, EMPLOYEE_ID: bigint, FIRST_NAME: string, HIRE_DATE: string, JOB_ID: string, LAST_NAME: string, MANAGER_ID: bigint, PHONE_NUMBER: string, SALARY: bigint]


Let us now register the dataframe as a temporary table called employees:

df.registerTempTable("employees")

Once the dataframe is registered as a table, we can issue queries as shown below:

scala> df.registerTempTable("employees")

scala> val employees = sqlContext.sql("SELECT * FROM employees").show(3)








Please note that like most SQL, Spark SQL is case insensitive, so a query like the one below will return the same above result:

val employees = sqlContext.sql("select * from Employees").show(3)
 

Use describe command to see the schema:

scala> val employees = sqlContext.sql("describe employees").show
+--------------+---------+-------+
|      col_name|data_type|comment|
+--------------+---------+-------+
|COMMISSION_PCT|   double|       |
| DEPARTMENT_ID|   bigint|       |
|         EMAIL|   string|       |
|   EMPLOYEE_ID|   bigint|       |
|    FIRST_NAME|   string|       |
|     HIRE_DATE|   string|       |
|        JOB_ID|   string|       |
|     LAST_NAME|   string|       |
|    MANAGER_ID|   bigint|       |
|  PHONE_NUMBER|   string|       |
|        SALARY|   bigint|       |
+--------------+---------+-------+

employees: Unit = ()


To select a single column and limiting the records to 3, use below command:

scala> val employees = sqlContext.sql("select first_name from  employees limit 3").show
+----------+
|first_name|
+----------+
|    Steven|
|     Neena|
|       Lex|
+----------+

employees: Unit = ()


The next query introduces an alias and also concatenation of columns:

scala> val employees = sqlContext.sql("select (concat_ws(' ',first_name,last_name)) as name, salary from  employees limit 3").show
+-------------+------+
|         name|salary|
+-------------+------+
|  Steven King| 24000|
|Neena Kochhar| 17000|
|  Lex De Haan| 17000|
+-------------+------+

employees: Unit = ()


We can add a filter on salary as shown below:

scala> val employees = sqlContext.sql("select (concat_ws(' ',first_name,last_name)) as  name, salary from employees where salary > 15000").show
+-------------+------+
|         name|salary|
+-------------+------+
|  Steven King| 24000|
|Neena Kochhar| 17000|
|  Lex De Haan| 17000|
+-------------+------+

employees: Unit = ()


Use of distinct is shown below:

scala> val employees = sqlContext.sql("select distinct job_id from employees").show
+----------+                                                                   
|    job_id|
+----------+
|     AD_VP|
|    MK_REP|
|   AD_ASST|
|   IT_PROG|
|  PU_CLERK|
|    ST_MAN|
|    MK_MAN|
|   AD_PRES|
|    AC_MGR|
|  ST_CLERK|
|    HR_REP|
|    SA_REP|
|    PR_REP|
|FI_ACCOUNT|
|  SH_CLERK|
|AC_ACCOUNT|
|    PU_MAN|
|    FI_MGR|
|    SA_MAN|
+----------+

employees: Unit = ()


We can use order by on job_id to sort it on alphabetical order as shown below:

scala> val employees = sqlContext.sql("select distinct job_id from employees order by job_id").show
+----------+                                                                   
|    job_id|
+----------+
|AC_ACCOUNT|
|    AC_MGR|
|   AD_ASST|
|   AD_PRES|
|     AD_VP|
|FI_ACCOUNT|
|    FI_MGR|
|    HR_REP|
|   IT_PROG|
|    MK_MAN|
|    MK_REP|
|    PR_REP|
|  PU_CLERK|
|    PU_MAN|
|    SA_MAN|
|    SA_REP|
|  SH_CLERK|
|  ST_CLERK|
|    ST_MAN|
+----------+

employees: Unit = ()


We can get the average salary for each job_id as shown below:

scala> val employees = sqlContext.sql("select job_id, avg(salary) from employees group by job_id order by job_id").show
+----------+-------+                                                           
|    job_id|    _c1|
+----------+-------+
|AC_ACCOUNT| 8300.0|
|    AC_MGR|12008.0|
|   AD_ASST| 4400.0|
|   AD_PRES|24000.0|
|     AD_VP|17000.0|
|FI_ACCOUNT| 7920.0|
|    FI_MGR|12008.0|
|    HR_REP| 6500.0|
|   IT_PROG| 5760.0|
|    MK_MAN|13000.0|
|    MK_REP| 6000.0|
|    PR_REP|10000.0|
|  PU_CLERK| 2780.0|
|    PU_MAN|11000.0|
|    SA_MAN|12200.0|
|    SA_REP| 8350.0|
|  SH_CLERK| 3215.0|
|  ST_CLERK| 2785.0|
|    ST_MAN| 7280.0|
+----------+-------+

employees: Unit = ()


Two measures are calculated in below query:

scala> val employees = sqlContext.sql("select job_id, avg(salary),count(*) from employees group by job_id order by job_id").show
+----------+-------+---+                                                       
|    job_id|    _c1|_c2|
+----------+-------+---+
|AC_ACCOUNT| 8300.0|  1|
|    AC_MGR|12008.0|  1|
|   AD_ASST| 4400.0|  1|
|   AD_PRES|24000.0|  1|
|     AD_VP|17000.0|  2|
|FI_ACCOUNT| 7920.0|  5|
|    FI_MGR|12008.0|  1|
|    HR_REP| 6500.0|  1|
|   IT_PROG| 5760.0|  5|
|    MK_MAN|13000.0|  1|
|    MK_REP| 6000.0|  1|
|    PR_REP|10000.0|  1|
|  PU_CLERK| 2780.0|  5|
|    PU_MAN|11000.0|  1|
|    SA_MAN|12200.0|  5|
|    SA_REP| 8350.0| 30|
|  SH_CLERK| 3215.0| 20|
|  ST_CLERK| 2785.0| 20|
|    ST_MAN| 7280.0|  5|
+----------+-------+---+

employees: Unit = ()


In the last query of this post, we are casting HIRE_DATE as DATE type and sort the first 10 records in descending order.

scala> val employees = sqlContext.sql("select cast(hire_date as date)  from employees order by hire_date desc limit 10").show
+----------+
| hire_date|
+----------+
|2008-04-21|
|2008-04-21|
|2008-03-24|
|2008-03-08|
|2008-02-23|
|2008-02-06|
|2008-02-03|
|2008-01-29|
|2008-01-24|
|2008-01-13|
+----------+

employees: Unit = ()


Like in other SQL, we can define a user defined function (UDF) and call it in queries. Let us define a simple UDF that concatenates two strings with a space between the two strings as shown below:

scala>  sqlContext.udf.register("simpleUDF", (x: String, y:String) => x + " " + y)
res9: org.apache.spark.sql.UserDefinedFunction = UserDefinedFunction(<function2>,StringType,List(StringType, StringType))


We can then call it in SQL as shown below:

scala> val employees = sqlContext.sql("select simpleUDF(first_name,last_name) as name from employees limit 3").show
+-------------+
|         name|
+-------------+
|  Steven King|
|Neena Kochhar|
|  Lex De Haan|
+-------------+

employees: Unit = ()


This concludes the second segment of Spark SQL