Friday 5 January 2018

Spark SQL - III

In the last segment of Spark SQL, we explore the integration of Spark SQL with Hive. For all work in this post, we will use the Cloudera sandbox, Cloudera QuickStart VM 5.12. Please note that this VM comes with pre-installed Scala of version 1.6.0. So, all code in this post are compatible with Scala 1.6.0

One preparatory work is needed to complete the integration of Spark SQL with Hive. The file called hive-site.xml located at /etc/hive/conf/ has to be copied to /etc/spark/conf. Once this is done, we can invoke the spark-shell and create a Hive Context that will be used by Spark SQL environment to run Hive SQL to access Hive tables. The command for creating a Hive Context using existing Spark Context is shown below:

val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)

Running this gives below result:

scala> val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
hiveContext: org.apache.spark.sql.hive.HiveContext = org.apache.spark.sql.hive.HiveContext@2a0c48ca


Now, we will run a series of queries as shown below. The first one checks out all databases in Hive:

val hive_databases=hiveContext.sql("show databases").show

Running this gives below result:

scala> val hive_databases=hiveContext.sql("show databases").show
+-------+
| result|
+-------+
|default|
+-------+


default is the default existing schema in Hive. To check all tables, use command shown below:

val hive_tables = hiveContext.sql("show tables").show

Running this gives below result:

scala> val hive_tables = hiveContext.sql("show tables").show
+-----------------+-----------+
|        tableName|isTemporary|
+-----------------+-----------+
|      departments|      false|
|        employees|      false|
|staging_employees|      false|
|             text|      false|
+-----------------+-----------+

hive_tables: Unit = ()

Note that above tables have been created in earlier posts on Hive. The first post in that series is here.

To show only the tableName, use below command:

scala> val hive_tables = hiveContext.sql("show tables").select("tableName").show()

Running this gives below result:

scala> val hive_tables = hiveContext.sql("show tables").select("tableName").show()
+-----------------+
|        tableName|
+-----------------+
|      departments|
|        employees|
|staging_employees|
|             text|
+-----------------+


hive_tables: Unit = ()


Let us now run some queries on our most favored table, employees and print it out

val hive_employees = hiveContext.sql("select * from employees limit 5").foreach(println)

Running this gives below result:

scala>  val hive_employees = hiveContext.sql("select * from employees limit 5").foreach(println)
[100,"Steven","King","SKING","515.123.4567",2003-06-17 00:00:00.0,"AD_PRES",24000,null,null,90]
[101,"Neena","Kochhar","NKOCHHAR","515.123.4568",2005-09-21 00:00:00.0,"AD_VP",17000,null,100,90]
[102,"Lex","De Haan","LDEHAAN","515.123.4569",2001-01-13 00:00:00.0,"AD_VP",17000,null,100,90]
[103,"Alexander","Hunold","AHUNOLD","590.423.4567",2006-01-03 00:00:00.0,"IT_PROG",9000,null,102,60]
[104,"Bruce","Ernst","BERNST","590.423.4568",2007-05-21 00:00:00.0,"IT_PROG",6000,null,103,60]
hive_employees: Unit = ()


Instead of printing it out, we can collect it and then use the Array returned for any further analysis:

val hive_employees = hiveContext.sql("select * from employees limit 5").collect

Running this gives below result:

scala>  val hive_employees = hiveContext.sql("select * from employees limit 5").collect
hive_employees: Array[org.apache.spark.sql.Row] = Array([100,"Steven","King","SKING","515.123.4567",2003-06-17 00:00:00.0,"AD_PRES",24000,null,null,90], [101,"Neena","Kochhar","NKOCHHAR","515.123.4568",2005-09-21 00:00:00.0,"AD_VP",17000,null,100,90], [102,"Lex","De Haan","LDEHAAN","515.123.4569",2001-01-13 00:00:00.0,"AD_VP",17000,null,100,90], [103,"Alexander","Hunold","AHUNOLD","590.423.4567",2006-01-03 00:00:00.0,"IT_PROG",9000,null,102,60], [104,"Bruce","Ernst","BERNST","590.423.4568",2007-05-21 00:00:00.0,"IT_PROG",6000,null,103,60])


To extract the first row, use hive_employees(0) and to extract second element in first row, use hive _employees(0)(1) as shown below:

scala> hive_employees(0)
res20: org.apache.spark.sql.Row = [100,"Steven","King","SKING","515.123.4567",2003-06-17 00:00:00.0,"AD_PRES",24000,null,null,90]

scala> hive_employees(0)(1)
res21: Any = "Steven"


The next query uses concat function as shown below:

val employees = hiveContext.sql("select concat(first_name,' ',last_name) as name, salary from  employees limit 3").show
 

scala> val employees = hiveContext.sql("select concat(first_name,' ',last_name) as name, salary from  employees limit 3").show
+-----------------+------+
|             name|salary|
+-----------------+------+
|  "Steven" "King"| 24000|
|"Neena" "Kochhar"| 17000|
|  "Lex" "De Haan"| 17000|
+-----------------+------+

employees: Unit = ()


We can add a filter on salary as shown below:

scala> val employees = hiveContext.sql("select (concat(first_name,' ',last_name)) as  name, salary from employees where salary > 15000").show
+-----------------+------+
|             name|salary|
+-----------------+------+
|  "Steven" "King"| 24000|
|"Neena" "Kochhar"| 17000|
|  "Lex" "De Haan"| 17000|
+-----------------+------+

employees: Unit = ()
 

Query to fetch distinct JOB_ID is shown below:

 scala> val job_id = hiveContext.sql("select distinct job_id from employees").show
+------------+                                                                
|      job_id|
+------------+
|   "AD_PRES"|
|     "AD_VP"|
|    "SA_REP"|
|    "ST_MAN"|
|    "PU_MAN"|
|   "IT_PROG"|
|    "PR_REP"|
|  "ST_CLERK"|
|   "AD_ASST"|
|    "AC_MGR"|
|    "MK_MAN"|
|    "HR_REP"|
|    "FI_MGR"|
|"FI_ACCOUNT"|
|  "SH_CLERK"|
|"AC_ACCOUNT"|
|    "MK_REP"|
|    "SA_MAN"|
|  "PU_CLERK"|
+------------+

job_id: Unit = ()


 We can use order by on job_id to sort it on alphabetical order as shown below:

scala> val job_id = hiveContext.sql("select distinct job_id from employees order by job_id").show
+------------+                                                                
|      job_id|
+------------+
|"AC_ACCOUNT"|
|    "AC_MGR"|
|   "AD_ASST"|
|   "AD_PRES"|
|     "AD_VP"|
|"FI_ACCOUNT"|
|    "FI_MGR"|
|    "HR_REP"|
|   "IT_PROG"|
|    "MK_MAN"|
|    "MK_REP"|
|    "PR_REP"|
|  "PU_CLERK"|
|    "PU_MAN"|
|    "SA_MAN"|
|    "SA_REP"|
|  "SH_CLERK"|
|  "ST_CLERK"|
|    "ST_MAN"|
+------------+

job_id: Unit = ()


We can get the average salary for each job_id as shown below:

scala> val job_id = hiveContext.sql("select job_id, avg(salary) from employees group by job_id order by job_id").show
+------------+-------+                                                         
|      job_id|    _c1|
+------------+-------+
|"AC_ACCOUNT"| 8300.0|
|    "AC_MGR"|12008.0|
|   "AD_ASST"| 4400.0|
|   "AD_PRES"|24000.0|
|     "AD_VP"|17000.0|
|"FI_ACCOUNT"| 7920.0|
|    "FI_MGR"|12008.0|
|    "HR_REP"| 6500.0|
|   "IT_PROG"| 5760.0|
|    "MK_MAN"|13000.0|
|    "MK_REP"| 6000.0|
|    "PR_REP"|10000.0|
|  "PU_CLERK"| 2780.0|
|    "PU_MAN"|11000.0|
|    "SA_MAN"|12200.0|
|    "SA_REP"| 8350.0|
|  "SH_CLERK"| 3215.0|
|  "ST_CLERK"| 2785.0|
|    "ST_MAN"| 7280.0|
+------------+-------+

job_id: Unit = ()


Two measures: average salary and number of JOB_IDs are calculated in below query:

scala> val job_id = hiveContext.sql("select job_id, avg(salary),count(*) from employees group by job_id order by job_id").show
+------------+-------+---+                                                     
|      job_id|    _c1|_c2|
+------------+-------+---+
|"AC_ACCOUNT"| 8300.0|  1|
|    "AC_MGR"|12008.0|  1|
|   "AD_ASST"| 4400.0|  1|
|   "AD_PRES"|24000.0|  1|
|     "AD_VP"|17000.0|  2|
|"FI_ACCOUNT"| 7920.0|  5|
|    "FI_MGR"|12008.0|  1|
|    "HR_REP"| 6500.0|  1|
|   "IT_PROG"| 5760.0|  5|
|    "MK_MAN"|13000.0|  1|
|    "MK_REP"| 6000.0|  1|
|    "PR_REP"|10000.0|  1|
|  "PU_CLERK"| 2780.0|  5|
|    "PU_MAN"|11000.0|  1|
|    "SA_MAN"|12200.0|  5|
|    "SA_REP"| 8350.0| 30|
|  "SH_CLERK"| 3215.0| 20|
|  "ST_CLERK"| 2785.0| 20|
|    "ST_MAN"| 7280.0|  5|
+------------+-------+---+

job_id: Unit = ()


In the last query of this post, we are casting HIRE_DATE as DATE type and sort the first 10 records in descending order.


scala> val employees = hiveContext.sql("select cast(hire_date as date)  from employees order by hire_date desc limit 10").show
+----------+
| hire_date|
+----------+
|2008-04-21|
|2008-04-21|
|2008-03-24|
|2008-03-08|
|2008-02-23|
|2008-02-06|
|2008-02-03|
|2008-01-29|
|2008-01-24|
|2008-01-13|
+----------+

employees: Unit = ()


This concludes the last segment of Spark SQL