Monday 7 May 2018

Apache Nifi - VI

The blog could be also called "SQL in Nifi". SQL is prevalent is most Big Data tools and Apache Nifi is no different. The SQL in Apache Nifi is based on another project called Calcite, the details of which can be found here. All the SQL grammar and the supported SQL keywords are here. In this post, we will build a very simple flow that uses SQL to separate the records. For all the work in this post, we will be using the latest version of Apache Nifi, 1.6.0

The flow is shown below:
















Note that there is a QueryRecord processor that forms the crux of this flow. The SQL statements are written as user defined properties in this processor. One can define a property and in the Value field, enter the SQL statement. The property names will serve as a routing route. This will become obvious in a short while on seeing the properties of this processor. The properties of GetFile are shown below:



















On the QueryRecord processor, there are two properties that have to be set: RecordReader and RecordWriter. The RecordReader set as a Controller Service will be used to read the incoming flowfile. The RecordWriter set as a Controller Service will be used to write the outgoing flowfile. The properties of QueryRecord processor is shown below:



















Since we will be using the familiar employees.csv file used often in earlier posts, the RecordReader will be CSVReader. The RecordWriter will also be CSVRecordSetWriter that will write the flowfile in CSV format. Two properties are added that specify the SQL that will be used to query the flowfile in real time as it traverses through this processor. The SQLs will separate the records in the flowfile based on the salary field. The flowfile is queried in real time as though it is a table containing records of the input. The connections out of this processor are four: two based on the SQLs, one is the original flowfile and, lastly, the failure connection. The failure connection is routed back to itself as shown in the flow above. The properties of CSVReader controller service is shown below:



















The properties of CSVRecordSetWriter controller service is shown below:



















The properties of PutFile tied to connection SAL_GTE_15000 is shown below:



















The properties of PutFile tied to connection original is shown below:




















The properties of PutFile tied to connection SAL_LT_15000 is shown below:



















After all the properties are correctly set, enable the Controller Services, drop the employees.csv in source folder and start the flow. The results are shown below:















Checking the Data Provenance on the QueryRecord processor,we see three entries as shown below:








FORK corresponds to the original connection while the two ROUTEs are for the two SQLs in the properties. The output corresponding to the first Route is shown below:


















Note that the salary field has salary less than 15000 value. The output corresponding to the second Route is shown below:








The output has only three records but the salary values are all above 15000. Lastly, the original file is seen in output from FORK type:
















The output from the FORK type has records of all salaries. The QueryRecord acts in this flow more like a RouteOnContent processor. But then, we have not fully unleashed the power of SQL in Nifi in this example