In a series of posts, we will be looking at analytic functions in Impala. For all queries in this post, we will use the Cloudera sandbox, Cloudera QuickStart VM 5.12.
Impala is an open-source massively parallel procesing (MPP) SQL engine architected in C++ and Java for the Hadoop data processing environment. Its key features include low latency and high concurrency for Business Intelligence (BI) and Analytic read-mostly queries on Hadoop, not usually seen by batch frameworks such as Apache Hive.
Impala not only offers the distinct possibility of building an analytic DBMS solution on top of a batch processing system such as Hadoop performing just as well or better than most current commercial solutions, but at the same time retains all the flexibility and cost-effectiveness of Hadoop.
It utilizes standard Hadoop components like HDFS, HBase, Metastore, YARN, etc. It has the ability to read the majority of the widely-used file formats like Parquet, Avro, RCFile. To reduce latency incurred by operations such as MapReduce or by reading data remotely, Impala implements a distributed architecture based on daemon processes that are responsible for all aspects of query execution and that run on the same machines as the rest of the Hadoop infrastructure.
Impala is shipped by Cloudera, MapR, Oracle, and Amazon.
In the first step, let us import the employee data into the file system as shown in the steps below.
Click on HDFS icon as shown below:
Click on user folder and then on the cloudera folder, right click as shown below:
Click on Open in File Browser. Enter employees in the window that pops up as shown below:
Click on the newly created folder as shown below to navigate to the employees folder:
In the employees folder, click on Upload -> File as shown below:
Click on Select Files button to add the file as shown below:
Once the file is added, we can see it under the employees folder as shown below:
Let us navigate to the Impala editor as shown below:
In the Impala Query Editor enter the query for external table as shown below:
CREATE EXTERNAL TABLE IF NOT EXISTS staging_employees (EMPLOYEE_ID SMALLINT, FIRST_NAME STRING, LAST_NAME STRING, EMAIL STRING, PHONE_NUMBER STRING, HIRE_DATE STRING, JOB_ID STRING, SALARY SMALLINT, COMMISSION_PCT DOUBLE, MANAGER_ID SMALLINT, DEPARTMENT_ID TINYINT) COMMENT 'Staging table for employee data' ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION '/user/cloudera/employees' TBLPROPERTIES ("skip.header.line.count"="1");
Click on Execute button as shown above. We get messages as shown below:
We can check the data in table with below query:
select count(*) from staging_employees
The results returned are shown below:
Note that the query had HIRE_DATE as STRING. Let us try to run below query to create a final employees table that will have the correct data type:
CREATE TABLE IF NOT EXISTS employees (EMPLOYEE_ID SMALLINT, FIRST_NAME STRING, LAST_NAME STRING, EMAIL STRING, PHONE_NUMBER STRING, HIRE_DATE DATE, JOB_ID STRING, SALARY SMALLINT, COMMISSION_PCT DOUBLE, MANAGER_ID SMALLINT,DEPARTMENT_ID TINYINT) COMMENT 'Table holding employee details';
We get below error:
Unfortunately, the DATE data type is not supported. The only date related data type is TIMESTAMP. So, we rewrite the query as follows:
CREATE TABLE IF NOT EXISTS employees (EMPLOYEE_ID SMALLINT, FIRST_NAME STRING, LAST_NAME STRING, EMAIL STRING, PHONE_NUMBER STRING, HIRE_DATE TIMESTAMP, JOB_ID STRING, SALARY SMALLINT, COMMISSION_PCT DOUBLE, MANAGER_ID SMALLINT,DEPARTMENT_ID TINYINT) COMMENT 'Table holding employee details';
The results returned are shown below:
Let us insert records into the newly created table using below query:
INSERT INTO employees SELECT EMPLOYEE_ID, FIRST_NAME, LAST_NAME, EMAIL, PHONE_NUMBER, TO_DATE(FROM_UNIXTIME(UNIX_TIMESTAMP(HIRE_DATE,'dd-MMM-yy'))), JOB_ID, SALARY, COMMISSION_PCT, MANAGER_ID, DEPARTMENT_ID FROM staging_employees;
The results returned are shown below:
A quick validation like the one below suggests that the data looks okay.
Let us now create the departments table as follows. Click on SQL in explorer on left as shown below:
Then, click on Impala and, then, on default as shown below:
Then, click on + to create a table as shown above:
Click on the button to right of path field to bring up the Choose a file window as shown below:
Click on Upload file button to add file. The file when added appears as shown below:
Then, click on departments.csv to add the file as shown below:
Then, click on Next button.
Then, click on Submit button. This will conclude the table creation process as shown below:
Now that we have data in both employees and department table, we can go ahead with analytic functions in Impala. We will be largely drawing from the queries that we have written before in the case of Oracle.
The first one will be the group by clause:
select job_id, sum(salary) from employees where department_id < 40 group by (job_id);
The results are shown below and they are the same that we have got earlier.
Let us try to run below query having ROLLUP keyword:
select job_id, sum(salary) from employees where department_id < 40 group by rollup(job_id);
We get below error:
We will write a work around as show below:
select job_id, sum(salary) from employees where department_id < 40 group by (job_id)
union
select null, sum(salary) from employees where department_id < 40;
The results are shown below and they are the same that we have got earlier.
For cube too, we have to implement a work around.
Below grouping set will not run as an error will be thrown like earlier in the case of ROLLUP.
The query that will mimic the grouping set query is:
select null, job_id, sum(salary) from employees where department_id < 40 group by job_id
union
select department_id, null, sum(salary) from employees where department_id < 40 group by department_id;
The results are shown below and they are the same that we have got earlier.
Likewise, the equivalent of below query
select department_id, job_id, sum(salary) from employees where department_id < 40 group by grouping sets (department_id,job_id,());
in Hive will be:
select null, job_id, sum(salary) from employees where department_id < 40 group by job_id
union
select department_id, null, sum(salary) from employees where department_id < 40 group by department_id
union
select null, null, sum(salary) from employees where department_id < 40;
The results are shown below and they are the same that we have got earlier.
Let us run below query as in Hive:
select department_id, job_id, sum(salary) from employees where department_id < 40 group by department_id, job_id with rollup;
We get below error:
The correct results are shown if we run below equivalent query:
select department_id, job_id, sum(salary) from employees where department_id < 40 group by department_id,job_id
union
select department_id, null, sum(salary) from employees where department_id < 40 group by department_id
union
select null, null, sum(salary) from employees where department_id < 40;
The results are shown below:
A query using CUBE is shown below:
select department_id, job_id, sum(salary) from employees where department_id < 40 group by department_id, job_id with cube;
Above query results in error that is shown below:
The correct results are shown if we run below query:
select department_id, job_id, sum(salary) from employees where department_id < 40 group by department_id,job_id
union
select null, job_id, sum(salary) from employees where department_id < 40 group by job_id
union
select department_id, null, sum(salary) from employees where department_id < 40 group by department_id
union
select null, null, sum(salary) from employees where department_id < 40;
The results are shown below:
We will continue with this topic in the next part of this series.
Impala is an open-source massively parallel procesing (MPP) SQL engine architected in C++ and Java for the Hadoop data processing environment. Its key features include low latency and high concurrency for Business Intelligence (BI) and Analytic read-mostly queries on Hadoop, not usually seen by batch frameworks such as Apache Hive.
Impala not only offers the distinct possibility of building an analytic DBMS solution on top of a batch processing system such as Hadoop performing just as well or better than most current commercial solutions, but at the same time retains all the flexibility and cost-effectiveness of Hadoop.
It utilizes standard Hadoop components like HDFS, HBase, Metastore, YARN, etc. It has the ability to read the majority of the widely-used file formats like Parquet, Avro, RCFile. To reduce latency incurred by operations such as MapReduce or by reading data remotely, Impala implements a distributed architecture based on daemon processes that are responsible for all aspects of query execution and that run on the same machines as the rest of the Hadoop infrastructure.
Impala is shipped by Cloudera, MapR, Oracle, and Amazon.
In the first step, let us import the employee data into the file system as shown in the steps below.
Click on HDFS icon as shown below:
Click on user folder and then on the cloudera folder, right click as shown below:
Click on Open in File Browser. Enter employees in the window that pops up as shown below:
Click on the newly created folder as shown below to navigate to the employees folder:
In the employees folder, click on Upload -> File as shown below:
Click on Select Files button to add the file as shown below:
Once the file is added, we can see it under the employees folder as shown below:
Let us navigate to the Impala editor as shown below:
In the Impala Query Editor enter the query for external table as shown below:
CREATE EXTERNAL TABLE IF NOT EXISTS staging_employees (EMPLOYEE_ID SMALLINT, FIRST_NAME STRING, LAST_NAME STRING, EMAIL STRING, PHONE_NUMBER STRING, HIRE_DATE STRING, JOB_ID STRING, SALARY SMALLINT, COMMISSION_PCT DOUBLE, MANAGER_ID SMALLINT, DEPARTMENT_ID TINYINT) COMMENT 'Staging table for employee data' ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION '/user/cloudera/employees' TBLPROPERTIES ("skip.header.line.count"="1");
Click on Execute button as shown above. We get messages as shown below:
We can check the data in table with below query:
select count(*) from staging_employees
The results returned are shown below:
Note that the query had HIRE_DATE as STRING. Let us try to run below query to create a final employees table that will have the correct data type:
CREATE TABLE IF NOT EXISTS employees (EMPLOYEE_ID SMALLINT, FIRST_NAME STRING, LAST_NAME STRING, EMAIL STRING, PHONE_NUMBER STRING, HIRE_DATE DATE, JOB_ID STRING, SALARY SMALLINT, COMMISSION_PCT DOUBLE, MANAGER_ID SMALLINT,DEPARTMENT_ID TINYINT) COMMENT 'Table holding employee details';
We get below error:
Unfortunately, the DATE data type is not supported. The only date related data type is TIMESTAMP. So, we rewrite the query as follows:
CREATE TABLE IF NOT EXISTS employees (EMPLOYEE_ID SMALLINT, FIRST_NAME STRING, LAST_NAME STRING, EMAIL STRING, PHONE_NUMBER STRING, HIRE_DATE TIMESTAMP, JOB_ID STRING, SALARY SMALLINT, COMMISSION_PCT DOUBLE, MANAGER_ID SMALLINT,DEPARTMENT_ID TINYINT) COMMENT 'Table holding employee details';
The results returned are shown below:
Let us insert records into the newly created table using below query:
INSERT INTO employees SELECT EMPLOYEE_ID, FIRST_NAME, LAST_NAME, EMAIL, PHONE_NUMBER, TO_DATE(FROM_UNIXTIME(UNIX_TIMESTAMP(HIRE_DATE,'dd-MMM-yy'))), JOB_ID, SALARY, COMMISSION_PCT, MANAGER_ID, DEPARTMENT_ID FROM staging_employees;
The results returned are shown below:
A quick validation like the one below suggests that the data looks okay.
Let us now create the departments table as follows. Click on SQL in explorer on left as shown below:
Then, click on Impala and, then, on default as shown below:
Then, click on + to create a table as shown above:
Click on the button to right of path field to bring up the Choose a file window as shown below:
Click on Upload file button to add file. The file when added appears as shown below:
Then, click on departments.csv to add the file as shown below:
Then, click on Next button.
Then, click on Submit button. This will conclude the table creation process as shown below:
Now that we have data in both employees and department table, we can go ahead with analytic functions in Impala. We will be largely drawing from the queries that we have written before in the case of Oracle.
The first one will be the group by clause:
select job_id, sum(salary) from employees where department_id < 40 group by (job_id);
The results are shown below and they are the same that we have got earlier.
Let us try to run below query having ROLLUP keyword:
select job_id, sum(salary) from employees where department_id < 40 group by rollup(job_id);
We get below error:
We will write a work around as show below:
select job_id, sum(salary) from employees where department_id < 40 group by (job_id)
union
select null, sum(salary) from employees where department_id < 40;
The results are shown below and they are the same that we have got earlier.
For cube too, we have to implement a work around.
Below grouping set will not run as an error will be thrown like earlier in the case of ROLLUP.
The query that will mimic the grouping set query is:
select null, job_id, sum(salary) from employees where department_id < 40 group by job_id
union
select department_id, null, sum(salary) from employees where department_id < 40 group by department_id;
The results are shown below and they are the same that we have got earlier.
Likewise, the equivalent of below query
select department_id, job_id, sum(salary) from employees where department_id < 40 group by grouping sets (department_id,job_id,());
in Hive will be:
select null, job_id, sum(salary) from employees where department_id < 40 group by job_id
union
select department_id, null, sum(salary) from employees where department_id < 40 group by department_id
union
select null, null, sum(salary) from employees where department_id < 40;
The results are shown below and they are the same that we have got earlier.
Let us run below query as in Hive:
select department_id, job_id, sum(salary) from employees where department_id < 40 group by department_id, job_id with rollup;
We get below error:
The correct results are shown if we run below equivalent query:
select department_id, job_id, sum(salary) from employees where department_id < 40 group by department_id,job_id
union
select department_id, null, sum(salary) from employees where department_id < 40 group by department_id
union
select null, null, sum(salary) from employees where department_id < 40;
The results are shown below:
A query using CUBE is shown below:
select department_id, job_id, sum(salary) from employees where department_id < 40 group by department_id, job_id with cube;
Above query results in error that is shown below:
The correct results are shown if we run below query:
select department_id, job_id, sum(salary) from employees where department_id < 40 group by department_id,job_id
union
select null, job_id, sum(salary) from employees where department_id < 40 group by job_id
union
select department_id, null, sum(salary) from employees where department_id < 40 group by department_id
union
select null, null, sum(salary) from employees where department_id < 40;
The results are shown below:
We will continue with this topic in the next part of this series.