Monday, 25 June 2018

MongoDB - II

In the last blog we took a look arrays in MongoDB. In this post, we continue with arrays in MongoDB on the topics of querying arrays and then projecting arrays. For all the work in this post, we will be using the same version as in the last two posts: MongoDB version v3.6.5. We will refer to the same merchandise database and products collection that we created in the last post. Running a find against Mongo shell returns below result:

>  db.products.find()
{ "_id" : 100, "item" : "Pullover", "sizes" : [ "S", "M", "L" ] }
{ "_id" : 200, "item" : "T-shirt", "sizes" : [ "X", "XL", "XXL" ] }
{ "_id" : 300, "item" : "Bermuda Shorts", "sizes" : [ "M" ] }
{ "_id" : 400, "item" : "Hat", "sizes" : "M" }
{ "_id" : 500, "item" : "Wrist band" }
{ "_id" : 600, "item" : "Sweat band", "sizes" : null }
{ "_id" : 700, "item" : "Cap", "sizes" : [ ] }

Let us find only records that have an array for sizes field:

> db.products.find( { sizes: { $type: "array" } })
{ "_id" : 100, "item" : "Pullover", "sizes" : [ "S", "M", "L" ] }
{ "_id" : 200, "item" : "T-shirt", "sizes" : [ "X", "XL", "XXL" ] }
{ "_id" : 300, "item" : "Bermuda Shorts", "sizes" : [ "M" ] }

{ "_id" : 700, "item" : "Cap", "sizes" : [ ] }

The corresponding aggregation pipeline is:

db.products.aggregate( [ { $match : { sizes: { $type: "array" } } } ] )

Note that this even returns records having no element like the document with id, 700. To remove such documents, we can use elemMatch as shown below:

> db.products.find( {sizes: {$elemMatch: {$exists: true} } })
{ "_id" : 100, "item" : "Pullover", "sizes" : [ "S", "M", "L" ] }
{ "_id" : 200, "item" : "T-shirt", "sizes" : [ "X", "XL", "XXL" ] }
{ "_id" : 300, "item" : "Bermuda Shorts", "sizes" : [ "M" ] }

The corresponding aggregation pipeline is:

db.products.aggregate( [ { $match : {sizes: {$elemMatch: {$exists: true} } } } ] )

Let us now try to find documents having "S" in sizes array:

> db.products.find( { sizes: "S" } )

{ "_id" : 100, "item" : "Pullover", "sizes" : [ "S", "M", "L" ] }

The corresponding aggregation pipeline is:

db.products.aggregate( [ { $match : { sizes: "S" } } ] )

To return documents having either a "S" or "X":

> db.products.find( { $or: [ { sizes: "S" }, { sizes: "X" } ] } )
{ "_id" : 100, "item" : "Pullover", "sizes" : [ "S", "M", "L" ] }

{ "_id" : 200, "item" : "T-shirt", "sizes" : [ "X", "XL", "XXL" ] }

The corresponding aggregation pipeline is:

db.products.aggregate( [ { $match : { $or: [ { sizes: "S" }, { sizes: "X" } ] } } ] )

To return documents having an exact match of contents in the same order in arrays:

> db.products.find( { sizes: ["M"] } )
{ "_id" : 300, "item" : "Bermuda Shorts", "sizes" : [ "M" ] }
> db.products.find( { sizes: ["S","M","L"] } )
{ "_id" : 100, "item" : "Pullover", "sizes" : [ "S", "M", "L" ] }

The corresponding aggregation pipelines:

> db.products.aggregate( [ { $match : { sizes: ["M"] } } ] )
{ "_id" : 300, "item" : "Bermuda Shorts", "sizes" : [ "M" ] }
> db.products.aggregate( [ { $match : { sizes: ["S","M","L"] } } ] )
{ "_id" : 100, "item" : "Pullover", "sizes" : [ "S", "M", "L" ] }

To return documents having arrays with content in any order:

> db.products.find( { sizes: { $all: ["M"] } } )
{ "_id" : 100, "item" : "Pullover", "sizes" : [ "S", "M", "L" ] }
{ "_id" : 300, "item" : "Bermuda Shorts", "sizes" : [ "M" ] }
{ "_id" : 400, "item" : "Hat", "sizes" : "M" }
> db.products.find( { sizes: { $all: ["S", "L"] } } )
{ "_id" : 100, "item" : "Pullover", "sizes" : [ "S", "M", "L" ] }

Note that even though sizes in id :400 is not an array, it is still returned. To return only records with arrays, we can add a check as we shall see later. The corresponding aggregation pipelines:

db.products.aggregate( [ { $match : { sizes: { $all: ["M"] } } } ] )

db.products.aggregate( [ { $match : { sizes: { $all: ["S", "L"] } } } ] )

To add multiple conditions on the contents of arrays:

> db.products.find( { sizes: { $gt: "A", $lt: "O" } } )
{ "_id" : 100, "item" : "Pullover", "sizes" : [ "S", "M", "L" ] }
{ "_id" : 300, "item" : "Bermuda Shorts", "sizes" : [ "M" ] }
{ "_id" : 400, "item" : "Hat", "sizes" : "M" }

The corresponding aggregation pipeline is:

db.products.aggregate( [ { $match : { sizes: { $gt: "A", $lt: "O" } } } ] )

If the conditions are for one item only in the array, then, use elemMatch:

> db.products.find( { sizes: { $elemMatch: { $gt: "L", $lt: "S" } } } )
{ "_id" : 100, "item" : "Pullover", "sizes" : [ "S", "M", "L" ] }
{ "_id" : 300, "item" : "Bermuda Shorts", "sizes" : [ "M" ] }

The corresponding aggregation pipeline is:

db.products.aggregate( [ { $match : { sizes: { $elemMatch: { $gt: "L", $lt: "S" } } } } ] )

If we wish to query on an index position, then, we can use dot to indicate the position:

> db.products.find( { "sizes.0": { $gt: "R" } } )
{ "_id" : 100, "item" : "Pullover", "sizes" : [ "S", "M", "L" ] }
{ "_id" : 200, "item" : "T-shirt", "sizes" : [ "X", "XL", "XXL" ] }
> db.products.find( { "sizes.0": { $gt: "S" } } )
{ "_id" : 200, "item" : "T-shirt", "sizes" : [ "X", "XL", "XXL" ] }

The corresponding aggregation pipeline:

db.products.aggregate( [ { $match : { "sizes.0": { $gt: "R" } }  } ] )

db.products.aggregate( [ { $match : { "sizes.0": { $gt: "S" } }  } ] )

We can query an array based on the size of array as shown below:

> db.products.find( { "sizes": { $size: 0 } } )
{ "_id" : 700, "item" : "Cap", "sizes" : [ ] }
>
> db.products.find( { "sizes": { $size: 1 } } )
{ "_id" : 300, "item" : "Bermuda Shorts", "sizes" : [ "M" ] }
>
> db.products.find( { "sizes": { $size: 3 } } )
{ "_id" : 100, "item" : "Pullover", "sizes" : [ "S", "M", "L" ] }
{ "_id" : 200, "item" : "T-shirt", "sizes" : [ "X", "XL", "XXL" ] }

The corresponding aggregation pipeline:

db.products.aggregate( [ { $match : { "sizes": { $size: 0 } }  } ] )

db.products.aggregate( [ { $match : { "sizes": { $size: 1 } }  } ] )

db.products.aggregate( [ { $match : { "sizes": { $size: 3 } }  } ] )

Lastly, on the querying side, we take a look at regex that can be used for querying on the contents of arrays:

> db.products.find( { "sizes":  { $regex: /^S/ } } )
{ "_id" : 100, "item" : "Pullover", "sizes" : [ "S", "M", "L" ] }
>
> db.products.find( { "sizes":  { $regex: /^M/ } } )
{ "_id" : 100, "item" : "Pullover", "sizes" : [ "S", "M", "L" ] }
{ "_id" : 300, "item" : "Bermuda Shorts", "sizes" : [ "M" ] }
{ "_id" : 400, "item" : "Hat", "sizes" : "M" }
>
> db.products.find( { "sizes":  { $regex: /L$/ } } )
{ "_id" : 100, "item" : "Pullover", "sizes" : [ "S", "M", "L" ] }
{ "_id" : 200, "item" : "T-shirt", "sizes" : [ "X", "XL", "XXL" ] }
>
> db.products.find( { "sizes":  { $regex: /M/ } } )
{ "_id" : 100, "item" : "Pullover", "sizes" : [ "S", "M", "L" ] }
{ "_id" : 300, "item" : "Bermuda Shorts", "sizes" : [ "M" ] }
{ "_id" : 400, "item" : "Hat", "sizes" : "M" }
>
> db.products.find( { "sizes":  { $regex: /X.L/ } } )
{ "_id" : 200, "item" : "T-shirt", "sizes" : [ "X", "XL", "XXL" ] }
>
> db.products.find( { "sizes":  { $regex: /X..L/ } } )
>

Note that the last query did not return any result as there is no element in any array that starts with X and end in L and has exactly two characters in between them. The corresponding aggregation pipeline:

db.products.aggregate( [ { $match : { sizes: { $regex: /^S/ } } } ] )

db.products.aggregate( [ { $match : { sizes: { $regex: /^M/ } } } ] )

db.products.aggregate( [ { $match : { sizes: { $regex: /L$/ } } } ] )

db.products.aggregate( [ { $match : { sizes: { $regex: /M/ } } } ] )

db.products.aggregate( [ { $match : { sizes: { $regex: /X.L/ } } } ] )

db.products.aggregate( [ { $match : { sizes: { $regex: /X..L/ } } } ] )

Now, we take a look at projection of arrays. $ operator returns the first matched element of the array that matches the query and is used as array_name.$ as shown below:

db.products.find( {},{ "sizes.$": 1 } )

But this results in error:

> db.products.find( {},{ "sizes.$": 1 } )
Error: error: {
        "ok" : 0,
        "errmsg" : "Positional projection 'sizes.$' does not match the query document.",
        "code" : 2,
        "codeName" : "BadValue"
}

This is only to be expected as there are some documents that are not arrays or contain array with no elements. So, we select only the records that are arrays and then apply this operator as shown:

> db.products.find( {sizes: {$elemMatch: {$exists: true} } })
{ "_id" : 100, "item" : "Pullover", "sizes" : [ "S", "M", "L" ] }
{ "_id" : 200, "item" : "T-shirt", "sizes" : [ "X", "XL", "XXL" ] }
{ "_id" : 300, "item" : "Bermuda Shorts", "sizes" : [ "M" ] }
> db.products.find( {sizes: {$elemMatch: {$exists: true} } },{ "sizes.$": 1 } )
{ "_id" : 100, "sizes" : [ "S" ] }
{ "_id" : 200, "sizes" : [ "X" ] }
{ "_id" : 300, "sizes" : [ "M" ] }

Note that we can combine the $ operator with the query is the first argument of find() as shown below:

> db.products.find( { $and: [ { sizes: { $type: "array" } }, { "sizes": { $gt: "O" } } ] } )
{ "_id" : 100, "item" : "Pullover", "sizes" : [ "S", "M", "L" ] }
{ "_id" : 200, "item" : "T-shirt", "sizes" : [ "X", "XL", "XXL" ] }
> db.products.find( { $and: [ { sizes: { $type: "array" } }, { "sizes": { $gt: "O" } } ] },{ "sizes.$": 1 } )
{ "_id" : 100, "sizes" : [ "S" ] }
{ "_id" : 200, "sizes" : [ "X" ] }

The $elemMatch in project also returns the first element of the arrays as $ operator but has the advantage of taking arguments as shown below:

> db.products.find( {sizes: {$elemMatch: {$exists: true} } })
{ "_id" : 100, "item" : "Pullover", "sizes" : [ "S", "M", "L" ] }
{ "_id" : 200, "item" : "T-shirt", "sizes" : [ "X", "XL", "XXL" ] }
{ "_id" : 300, "item" : "Bermuda Shorts", "sizes" : [ "M" ] }
> db.products.find( {sizes: {$elemMatch: {$exists: true} } },{ sizes: { $elemMatch: { $gt: "O" } } } )
{ "_id" : 100, "sizes" : [ "S" ] }
{ "_id" : 200, "sizes" : [ "X" ] }
{ "_id" : 300 }

Note that $elemMatch is used both in the query and projection arguments of find(). Also, in the case of  $elemMatch in projection returns the _id when the array does not match the query. The last operator is $slice that returns records by position. $slice takes two arguments, the first is the position to start and second one indicates the number of elements:

> db.products.find( { sizes: { $type: "array" } })
{ "_id" : 100, "item" : "Pullover", "sizes" : [ "S", "M", "L" ] }
{ "_id" : 200, "item" : "T-shirt", "sizes" : [ "X", "XL", "XXL" ] }
{ "_id" : 300, "item" : "Bermuda Shorts", "sizes" : [ "M" ] }
{ "_id" : 700, "item" : "Cap", "sizes" : [ ] }
>
> db.products.find( { sizes: { $type: "array" } },{ item : 1 ,  sizes: { $slice: [ 0, 1 ] } } )
{ "_id" : 100, "item" : "Pullover", "sizes" : [ "S" ] }
{ "_id" : 200, "item" : "T-shirt", "sizes" : [ "X" ] }
{ "_id" : 300, "item" : "Bermuda Shorts", "sizes" : [ "M" ] }
{ "_id" : 700, "item" : "Cap", "sizes" : [ ] }
>
> db.products.find( { sizes: { $type: "array" } },{ item : 1 ,  sizes: { $slice: [ 0, 2 ] } } )
{ "_id" : 100, "item" : "Pullover", "sizes" : [ "S", "M" ] }
{ "_id" : 200, "item" : "T-shirt", "sizes" : [ "X", "XL" ] }
{ "_id" : 300, "item" : "Bermuda Shorts", "sizes" : [ "M" ] }
{ "_id" : 700, "item" : "Cap", "sizes" : [ ] }
>
> db.products.find( { sizes: { $type: "array" } },{ item : 1 ,  sizes: { $slice: [ 1, 2 ] } } )
{ "_id" : 100, "item" : "Pullover", "sizes" : [ "M", "L" ] }
{ "_id" : 200, "item" : "T-shirt", "sizes" : [ "XL", "XXL" ] }
{ "_id" : 300, "item" : "Bermuda Shorts", "sizes" : [ ] }
{ "_id" : 700, "item" : "Cap", "sizes" : [ ] }

The corresponding aggregate pipeline:

db.products.aggregate( [ { $match : { sizes: { $type: "array" } } } ] )

db.products.aggregate( [ { $match : { sizes: { $type: "array" } } }, { $project : { item : 1 , sizes : { "$slice": [ "$sizes", 0, 1 ] } } }  ] )

db.products.aggregate( [ { $match : { sizes: { $type: "array" } } }, { $project : { item : 1 , sizes : { "$slice": [ "$sizes", 0, 2 ] } } }  ] )

db.products.aggregate( [ { $match : { sizes: { $type: "array" } } }, { $project : { item : 1 , sizes : { "$slice": [ "$sizes", 1, 2 ] } } }  ] )

This concludes the projection of arrays

Saturday, 23 June 2018

MongoDB - I

We introduced MongoDB in the last two posts here and here. In this post, we explore some use cases of Aggregation Framework with reference to arrays. For all the work in this post, we will be using the same version as in the last two posts: MongoDB version v3.6.5

Once we are logged into the Mongo Shell, run the below command to use merchandise database:

> use merchandise
switched to db merchandise

Let us add a collection called products to merchandise database and insert a few records as shown below:

db.products.insert(
   [
    { "_id" : 100, "item" : "Pullover", "sizes": [ "S", "M", "L"] },
    { "_id" : 200, "item" : "T-shirt", "sizes" : ["X", "XL", "XXL"] },
    { "_id" : 300, "item" : "Bermuda Shorts", "sizes": ["M"] },
    { "_id" : 400, "item" : "Hat", "sizes": "M" },
    { "_id" : 500, "item" : "Wrist band" },
    { "_id" : 600, "item" : "Sweat band", "sizes" : null },
    { "_id" : 700, "item" : "Cap", "sizes" : [ ] },
   ],
   { ordered: false }
)

The output is shown below:

> db.products.insert(
...    [
...     { "_id" : 100, "item" : "Pullover", "sizes": [ "S", "M", "L"] },
...     { "_id" : 200, "item" : "T-shirt", "sizes" : ["X", "XL", "XXL"] },
...     { "_id" : 300, "item" : "Bermuda Shorts", "sizes": ["M"] },
...     { "_id" : 400, "item" : "Hat", "sizes": "M" },
...     { "_id" : 500, "item" : "Wrist band" },
...     { "_id" : 600, "item" : "Sweat band", "sizes" : null },
...     { "_id" : 700, "item" : "Cap", "sizes" : [ ] },
...    ],
...    { ordered: false }
... )
BulkWriteResult({
        "writeErrors" : [ ],
        "writeConcernErrors" : [ ],
        "nInserted" : 7,
        "nUpserted" : 0,
        "nMatched" : 0,
        "nModified" : 0,
        "nRemoved" : 0,
        "upserted" : [ ]
})

We can validate the insertion by running below command:

> db.products.find()
{ "_id" : 100, "item" : "Pullover", "sizes" : [ "S", "M", "L" ] }
{ "_id" : 200, "item" : "T-shirt", "sizes" : [ "X", "XL", "XXL" ] }
{ "_id" : 300, "item" : "Bermuda Shorts", "sizes" : [ "M" ] }
{ "_id" : 400, "item" : "Hat", "sizes" : "M" }
{ "_id" : 500, "item" : "Wrist band" }
{ "_id" : 600, "item" : "Sweat band", "sizes" : null }
{ "_id" : 700, "item" : "Cap", "sizes" : [ ] }

Using $unwind, we can split the array into separate records as shown below:

> db.products.aggregate( [ { $unwind : "$sizes" } ] )
{ "_id" : 100, "item" : "Pullover", "sizes" : "S" }
{ "_id" : 100, "item" : "Pullover", "sizes" : "M" }
{ "_id" : 100, "item" : "Pullover", "sizes" : "L" }
{ "_id" : 200, "item" : "T-shirt", "sizes" : "X" }
{ "_id" : 200, "item" : "T-shirt", "sizes" : "XL" }
{ "_id" : 200, "item" : "T-shirt", "sizes" : "XXL" }
{ "_id" : 300, "item" : "Bermuda Shorts", "sizes" : "M" }
{ "_id" : 400, "item" : "Hat", "sizes" : "M" }

Note that only those records are returned that have a valid size. Now let us group using $group as shown below:

> db.products.aggregate(
...    [
...       { $unwind : "$sizes" },
...       {
...         $group : {_id :  "$_id",
...            sizes: { $push: "$sizes" }
...         }
...       }
...    ]
... )
{ "_id" : 400, "sizes" : [ "M" ] }
{ "_id" : 300, "sizes" : [ "M" ] }
{ "_id" : 200, "sizes" : [ "X", "XL", "XXL" ] }
{ "_id" : 100, "sizes" : [ "S", "M", "L" ] }

The records are returned with sizes grouped by id. But, note that the item field is missing. Let us try to add the item field as show below:

> db.products.aggregate(
...    [
...       { $unwind : "$sizes" },
...       {
...         $group : {_id :  "$_id", item: "$item",
...            sizes: { $push: "$sizes" }
...         }
...       }
...    ]
... )
assert: command failed: {
        "ok" : 0,
        "errmsg" : "The field 'item' must be an accumulator object",
        "code" : 40234,
        "codeName" : "Location40234"
} : aggregate failed
_getErrorWithCode@src/mongo/shell/utils.js:25:13
doassert@src/mongo/shell/assert.js:16:14
assert.commandWorked@src/mongo/shell/assert.js:403:5
DB.prototype._runAggregate@src/mongo/shell/db.js:260:9
DBCollection.prototype.aggregate@src/mongo/shell/collection.js:1212:12
@(shell):1:1

2018-06-23T18:11:39.668+0530 E QUERY    [thread1] Error: command failed: {
        "ok" : 0,
        "errmsg" : "The field 'item' must be an accumulator object",
        "code" : 40234,
        "codeName" : "Location40234"
} : aggregate failed :
_getErrorWithCode@src/mongo/shell/utils.js:25:13
doassert@src/mongo/shell/assert.js:16:14
assert.commandWorked@src/mongo/shell/assert.js:403:5
DB.prototype._runAggregate@src/mongo/shell/db.js:260:9
DBCollection.prototype.aggregate@src/mongo/shell/collection.js:1212:12
@(shell):1:1

When we try to add item, we get an error. Unlike SQL where can do a GROUP BY on multiple fields, we can can group only on a single field in MongoDB. One work around is to project the item field as shown below:

db.products.aggregate(
   [
      { $unwind : "$sizes" },
      {
        $group : {_id :  "$_id",
           sizes: { $push: "$sizes" }
        }
      },
      {$project: {"_id": 1, "item":1,"sizes":1}}
   ]
)

The results of the output of above query is shown below:

 db.products.aggregate(
...    [
...       { $unwind : "$sizes" },
...       {
...         $group : {_id :  "$_id",
...            sizes: { $push: "$sizes" }
...         }
...       },
...       {$project: {"_id": 1, "item":1,"sizes":1}}
...    ]
... )
{ "_id" : 400, "sizes" : [ "M" ] }
{ "_id" : 300, "sizes" : [ "M" ] }
{ "_id" : 200, "sizes" : [ "X", "XL", "XXL" ] }
{ "_id" : 100, "sizes" : [ "S", "M", "L" ] }

Note that the results are the same as we got earlier with just the group and without the project clause. The item field is lost in translation. The work around is to subsume all fields to one field and then do a group by and then finally project the subsumed fields to separate fields to retain the original schema as shown below:

> db.products.aggregate(
...    [
...       { $unwind : "$sizes" },
...       {
...         $group : {_id :  { _id: "$_id", item: "$item" },
...            sizes: { $push: "$sizes" }
...         }
...       },
...       {$project: {"_id": "$_id._id", "item":"$_id.item","sizes":1}}
...    ]
... )
{ "sizes" : [ "M" ], "_id" : 400, "item" : "Hat" }
{ "sizes" : [ "M" ], "_id" : 300, "item" : "Bermuda Shorts" }
{ "sizes" : [ "X", "XL", "XXL" ], "_id" : 200, "item" : "T-shirt" }
{ "sizes" : [ "S", "M", "L" ], "_id" : 100, "item" : "Pullover" }

This is very similar to doing a GROUP BY in relational databases on multiple fields. Note that we have added the item field

This kind of unwind and later grouping to retain the original schema can be useful in a pipeline involving multiple stages

We can do a count on the size of arrays as shown below:

> db.products.aggregate(
...    [
...       { $unwind : "$sizes" },
...       {
...         $group : {_id :  { _id: "$_id", item: "$item" },
...            sizes: { $push: "$sizes" }
...         }
...       },
...       {$project: {"_id": "$_id._id", "item":"$_id.item","sizes":"$sizes",CountSizes: { $size: "$sizes" }}}
...    ]
... )
{ "_id" : 400, "item" : "Hat", "sizes" : [ "M" ], "CountSizes" : 1 }
{ "_id" : 300, "item" : "Bermuda Shorts", "sizes" : [ "M" ], "CountSizes" : 1 }
{ "_id" : 200, "item" : "T-shirt", "sizes" : [ "X", "XL", "XXL" ], "CountSizes" : 3 }
{ "_id" : 100, "item" : "Pullover", "sizes" : [ "S", "M", "L" ], "CountSizes" : 3 }

Adding a match can limit the results as shown below:

> db.products.aggregate(
...    [
...       { $unwind : "$sizes" },
...       {
...         $group : {_id :  { _id: "$_id", item: "$item" },
...            sizes: { $push: "$sizes" }
...         }
...       },
...       {$project: {"_id": "$_id._id", "item":"$_id.item","sizes":"$sizes",CountSizes: { $size: "$sizes" }}},
...       {$match: { CountSizes: { $gte: 2 } }}
...    ]
... )
{ "_id" : 200, "item" : "T-shirt", "sizes" : [ "X", "XL", "XXL" ], "CountSizes" : 3 }
{ "_id" : 100, "item" : "Pullover", "sizes" : [ "S", "M", "L" ], "CountSizes" : 3 }

Tuesday, 5 June 2018

Apache Nifi - VIII

In the last post, we saw how we can fetch data from MongoDB using Apache Nifi. We extend the discussion further by introducing another processor called RunMongoAggregation. GetMongo processor offers us a small window to query the MongoDB database but RunMongoAggregation processor ups the game by offering exciting possibilities. In the world of MongoDB, Aggregation Framework is a data processing tool for performing analysis in real-time and generating pre-aggregated reports. The fact that we have a separate processor, RunMongoAggregation, dedicated to run Aggregation against MongoDB only goes to show the significance of this processing framework even in Apache Nifi. More details about this framework are here. Perhaps we will explore the capabilities of Aggregation Framework when we deal with MongoDB context in some later articles. But, for now, let us focus on how we can leverage the Aggregation Framework in MongoDB using Nifi to process data

The environments and sample data details are in the previous post. We begin by showing a very simple aggregation pipeline in a flow consisting of just two steps as shown below:





















The properties of RunMongoAggregation are shown below:


As you can see, we have entered the pipeline in the Query field. Note that we have coded the same conditions that we used in the earlier post in form of Aggregation pipeline. So, we should get the same results as in the previous post. The properties of PutFile remain the same but are shown below for the sake of completeness:



















Let us run this flow:

























Let us examine the data provenance on RunMongoAggregation processor:












We can see that the three records. Further confirmation is seen by checking the file at the target directory:












This is in line with the results in the last post

In the next example, we attempt a self-join using $lookup stage and fetch the manager first name and manager last name. The flow remains the same but we merely switch the value in the Query field as shown below:


The properties for PutFile remain the same as shown earlier. Let us run this flow:
























Let us now examine the results in the file at target directory:







Note that are are 5 records as per the aggregation pipeline. Also, note that the manager first name and manager last name appears along with the employee record. In particular not that Steven King has no manager and that all the other four employees have Steven as their manager

Monday, 4 June 2018

Apache Nifi - VII

In this post, we will see how Apache Nifi integrates with MongoDB. MongoDB is a document database that supports querying and indexing along with scalability and flexibility. More details about MongoDB are here. We will use the latest versions of Nifi and MongoDB for all the work in this post: Apache Nifi 1.6 and MongoDB 3.6.5. MongoDB Community Edition is installed and employees data has already been loaded into the database using mongoimport command. This utility is reminiscent of SQL*Loader in Oracle database. The employees data has been repeatedly used in earlier blogs. Since this is only sample data, any other data may also be used

Using Compass, we can view the employee collection under users database and is shown below for reference:













It can been seen from Compass that there are 107 documents in the employees collection. We will use Nifi to interact with MongoDB and as a first activity fetch data from MongoDB. Note that like mongoimport, we have a corresponding mongoexport that can be used to export data out of  MongoDB. But, we are interested in fetching data from MongoDB using Nifi as the fetched data can be later processed using the rich features of Nifi.

The first flow that we will see is shown below:
























The flow consists of just two processors: GetMongo and PutFile. The properties of GetMongo are shown below:



















We set the properties for Mongo URI, Database, Collection and Results per FlowFile. Notice that the Mongo URI (Uniform Resource Identifier) points to the running instance of MongoDB that listening for client connections on TCP port 27017. The Results per FlowFile (shown below) offers the flexibility to limit the data extracted per FlowFile from MongoDB.



















On the PutFile processor, we just set the destination directory as shown below:



















Running this simple flow, we can see the Data Provenance on the GetMongo processor as shown below:
















Clicking on View above shows the data that has been extracted from MongoDB as shown below:





















We can also see the file under the destination directory:

























In the second attempt the flow used is the same but, we add bells and whistles to GetMongo processor as shown below:





































Query filters the records to only those having DEPARTMENT_ID as 90. This filter is passed to an attribute called query_detail to the FlowFile. Only four columns are projected and the _id field explicitly skipped. The salary is sorted in descending order and only the top 3 records are picked. The properties of PutFile remain the same. The query_detail attribute can be seen in the Attributes tab under Data Provenance as shown below:
















On the Content tab, we can view the result that is shown below:


























We can also see the file in destination folder as shown below:


























Note that there are only three records as we had configured. We can finally confirm this by running the same properties that we set in Nifi in Compass and the results compare well between the two as shown below:











With this last step, we conclude this post