Monday 7 May 2018

Apache Nifi - VI

The blog could be also called "SQL in Nifi". SQL is prevalent is most Big Data tools and Apache Nifi is no different. The SQL in Apache Nifi is based on another project called Calcite, the details of which can be found here. All the SQL grammar and the supported SQL keywords are here. In this post, we will build a very simple flow that uses SQL to separate the records. For all the work in this post, we will be using the latest version of Apache Nifi, 1.6.0

The flow is shown below:
















Note that there is a QueryRecord processor that forms the crux of this flow. The SQL statements are written as user defined properties in this processor. One can define a property and in the Value field, enter the SQL statement. The property names will serve as a routing route. This will become obvious in a short while on seeing the properties of this processor. The properties of GetFile are shown below:



















On the QueryRecord processor, there are two properties that have to be set: RecordReader and RecordWriter. The RecordReader set as a Controller Service will be used to read the incoming flowfile. The RecordWriter set as a Controller Service will be used to write the outgoing flowfile. The properties of QueryRecord processor is shown below:



















Since we will be using the familiar employees.csv file used often in earlier posts, the RecordReader will be CSVReader. The RecordWriter will also be CSVRecordSetWriter that will write the flowfile in CSV format. Two properties are added that specify the SQL that will be used to query the flowfile in real time as it traverses through this processor. The SQLs will separate the records in the flowfile based on the salary field. The flowfile is queried in real time as though it is a table containing records of the input. The connections out of this processor are four: two based on the SQLs, one is the original flowfile and, lastly, the failure connection. The failure connection is routed back to itself as shown in the flow above. The properties of CSVReader controller service is shown below:



















The properties of CSVRecordSetWriter controller service is shown below:



















The properties of PutFile tied to connection SAL_GTE_15000 is shown below:



















The properties of PutFile tied to connection original is shown below:




















The properties of PutFile tied to connection SAL_LT_15000 is shown below:



















After all the properties are correctly set, enable the Controller Services, drop the employees.csv in source folder and start the flow. The results are shown below:















Checking the Data Provenance on the QueryRecord processor,we see three entries as shown below:








FORK corresponds to the original connection while the two ROUTEs are for the two SQLs in the properties. The output corresponding to the first Route is shown below:


















Note that the salary field has salary less than 15000 value. The output corresponding to the second Route is shown below:








The output has only three records but the salary values are all above 15000. Lastly, the original file is seen in output from FORK type:
















The output from the FORK type has records of all salaries. The QueryRecord acts in this flow more like a RouteOnContent processor. But then, we have not fully unleashed the power of SQL in Nifi in this example 


Saturday 5 May 2018

Apache Nifi - V

In this post, we continue to explore transformation of formats of files in Apache Nifi. In the last post, we saw the transformation from CSV file format to JSON file format. In this post, we will just do the reverse. For all the work in this post, we will be using the latest version of Apache Nifi, 1.6.0

The steps are the same as in the previous flow with only a reversal of the properties of Record Reader and Record Writer properties in ConvertRecord Processor. The input for this flow will be the output of the flow in the earlier post. Note that even though the file name of output file from the earlier post remains the same as input filename, the format of the output file is JSON. The input file if opened in Excel will have content as shown below:














The flow is shown below:


























The warnings on the ConvertRecord processor is similar to the ones we saw in the last post. They are shown below:
























The properties of GetFile properties are shown below:



















The properties of ConvertRecord are shown below:




















The properties of JsonTreeReader are shown below:




















The properties of CSVRecordSetWriter are shown below:




















The properties of PutFile are shown below:



















Enable the JsonTreeReader, and CSVRecordSetWriterController Services are run the flow. The Data Provenance of input in ConvertRecord processor is shown below:
















The Data Provenance of output in ConvertRecord processor is shown below:









We can see that the format has changed from JSON to CSV format. This concludes this post

Thursday 3 May 2018

Apache Nifi - IV

This post continues from the last one that we wrote on Apache Nifi where we ran a simple flow comprising just three steps. For all the work in this post, we will be using the latest version of Apache Nifi, 1.6.0

In this post, we continue to explore Apache Nifi with a focus on transforming formats of files. While in the earlier post, we merely migrated files from one folder to another, in the first flow that we take a look at, we will add one additional step that will perform the format transformation from a CSV file to a JSON file. More details on the JSON format can be seen here. The flow is shown below:





















Note that the GetFile and PutFile processors are same as in the last post. The additional processor is called ConvertRecord. The properties of GetFile are shown below:
















All default values are taken and only the Input Directory is set. For the ConvertRecord Processor, the properties are shown below:
















Only two properties values are set: CSVReader and JsonRecordSetWriter. But, still we have warnings as shown below:


















Before we enable the Controller Services, we need to set a few properties on the CSVReader and JsonRecordSetWriter Controller Services. Clicking on the arrow next to CSVReader as shown below takes us to the Controller Services tab under NiFi Flow Configuration:






















Note that both services are disabled. Click on the Configure wheel shown above. On the properties shown, change only the Treat First Line as Header property to true as shown below:
















Similarly, for the properties on JsonRecordSetWriter Controller Services, set set Schema Write Strategy property to Do Not Write Schema as shown below:
















After these properties are set, enable the Controller Services on the NiFi Flow Configuration by clicking on the enable icon as shown below:






Once both the the Controller Services are enabled, the flow is ready for being run as shown below:

















We are using a .csv file to run the flow. We can see below the status after a successful run:


















Right clicking on ConvertRecord processor to check the data provenance allows us to see the transformation of format from CSV to JSON.
















Click on View button on left to see the input. The input is shown below:









Click on View button on left to see the output of this processor. The output is shown below:









Click on formatted in View as: dropdown to view the data in JSON format as shown below:




















We can see that the data has been transformed from CSV format to JSON format. With this, we conclude this post