Sunday 21 January 2018

Apache Flume - I

In an earlier post, we looked at Sqoop as a means to get data into HDFS. In this post, we will look at another Apache project, Flume, that is useful for migrating logs into HDFS. More details on Apache Flume can be found here. For all the work on Flume in this post, we will use the Cloudera sandbox, Cloudera QuickStart VM 5.12.

Apache Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data. While Sqoop migrates data from any RDBMS to HDFS, Flume is suited for streaming data sources that are generated continuously like logs and port this data to HDFS

Apache Flume terminology consists of:

1) Events: are the starting point for Apache Flume. They are a unit of data flow having a byte payload and an optional set of string attributes

2) Sources: consume events delivered to it by an external source like a web server. The external source sends events to Flume in a format that is recognized by the target Flume source. When a Flume source receives an event, it stores it into one or more channels

3) Channels: are a passive store that keep the event until it is consumed by a Flume sink

4) Sinks: removes the event from the channel and puts it either into an external repository like HDFS (via Flume HDFS sink) or forward it to the Flume source of the next Flume agent (next hop) in the flow

The source and sink within the given agent run asynchronously with the events staged in the channel.

The data flow in Apache Flume is shown below:













Apache Flume has the following prerequisites:

1) Java Runtime Environment
2) Memory - Sufficient memory for configurations used by sources, channels or sinks
3) Disk Space - Sufficient disk space for configurations used by channels or sinks
4) Directory Permissions - Read/Write permissions for directories used by agent

Let us do a quick check for two points mentioned above in our virtual machine:

java -version

This returns below results on the terminal window in the virtual machine:

[cloudera@quickstart ~]$ java -version
java version "1.7.0_67"
Java(TM) SE Runtime Environment (build 1.7.0_67-b01)
Java HotSpot(TM) 64-Bit Server VM (build 24.65-b04, mixed mode)


For directory permissions, create a directory structure under /user/cloudera/ directory as follows:

/user/cloudera/flume/events/

and set Write access for Group and Other users by right clicking on events folder and setting the permissions:






































Creating a directory in File Browser has been dealt with in earlier posts.

Now, we need to write a single .conf file that will have details about source, channel, sink, and their types.

To see sample template file, navigate as shown below:

[cloudera@quickstart ~]$ cd /etc/alternatives/flume-ng-conf
[cloudera@quickstart flume-ng-conf]$ ls -l
total 16
-rw-r--r-- 1 root root    0 Jun 29  2017 flume.conf
-rw-r--r-- 1 root root 1661 Jun 29  2017 flume-conf.properties.template
-rw-r--r-- 1 root root 1455 Jun 29  2017 flume-env.ps1.template
-rw-r--r-- 1 root root 1565 Jun 29  2017 flume-env.sh.template
-rw-r--r-- 1 root root 3118 Jun 29  2017 log4j.properties
[cloudera@quickstart flume-ng-conf]$ gedit flume-conf.properties.template


The last command will show the file contents:

*******************flume-conf.properties.template contents******************************
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#  http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.


# The configuration file needs to define the sources,
# the channels and the sinks.
# Sources, channels and sinks are defined per agent,
# in this case called 'agent'

agent.sources = seqGenSrc
agent.channels = memoryChannel
agent.sinks = loggerSink

# For each one of the sources, the type is defined
agent.sources.seqGenSrc.type = seq

# The channel can be defined as follows.
agent.sources.seqGenSrc.channels = memoryChannel

# Each sink's type must be defined
agent.sinks.loggerSink.type = logger

#Specify the channel the sink should use
agent.sinks.loggerSink.channel = memoryChannel

# Each channel's type is defined.
agent.channels.memoryChannel.type = memory

# Other config values specific to each type of channel(sink or source)
# can be defined as well
# In this case, it specifies the capacity of the memory channel
agent.channels.memoryChannel.capacity = 100


*********************************************************************************

Use the File -> Save As feature to save the file as Flume_Agent1.conf on Desktop

We will use below names while editing this file that will be used for our Flume work:

1) Flume Agent: Flume_Agent1
2) Source: netcat_source
3) Sink: hdfs_sink
4) Channel: memory_channel

Let us edit above file to enable our Flume work. The hash symbol is used for entering comments. The first three lines registers the names of Flume Agent, Source, Sink, and Channel as shown below:

Flume_Agent1.sources = netcat_source
Flume_Agent1.sinks = hdfs_sink
Flume_Agent1.channels = memory_channel


The next three lines list the properties of the source:

Flume_Agent1.sources.netcat_source.type = netcat
Flume_Agent1.sources.netcat_source.bind = localhost
Flume_Agent1.sources.netcat_source.port = 55555

Note that we are using netcat, a  computer networking utility for reading from and writing to network connections using TCP or UDP. There is no need to install it as it already available in the virtual machine. The agent listens to source, netcat_source, that is of type, netcat, on localhost at port 55555

The next few lines list the properties of the sink:

Flume_Agent1.sinks.hdfs_sink.type      = hdfs
Flume_Agent1.sinks.hdfs_sink.hdfs.path = hdfs://localhost:8020/user/cloudera/flume/events
Flume_Agent1.sinks.hdfs_sink.hdfs.inUseSuffix = .log
Flume_Agent1.sinks.hdfs_sink.hdfs.rollInterval = 0
Flume_Agent1.sinks.hdfs_sink.hdfs.writeFormat = Text
Flume_Agent1.sinks.hdfs_sink.hdfs.fileType = DataStream
The hdfs.sink type is hdfs and the target directory is /user/cloudera/flume/events that we created earlier. The hdfs.rollInterval set to 0 is the number of seconds to wait before rolling current file. The hdfs.inUseSuffix is set to .log and is suffix that is used for temporal files that flume actively writes into. The hdfs.writeFormat property is set to Text and is format for sequence file records. The hdfs.fileType is file format of output. Setting it to DataStream means that the file output will not be in compressed format

The next three lines describe the properties of the memory channel:

Flume_Agent1.channels.memory_channel.type = memory
Flume_Agent1.channels.memory_channel.capacity = 1000
Flume_Agent1.channels.memory_channel.transactionCapacity = 100


capacity sets the maximum number of events in the channel is set to 1000 here. transactionCapacity is the maximum number of events the channel will take from a source or give to a sink per transaction and is set to 100 here

The last two line bind the source and sink to the memory channel

Flume_Agent1.sources.netcat_source.channels = memory_channel
Flume_Agent1.sinks.hdfs_sink.channel = memory_channel


The final edited file is shown below for reference:

***************************Flume_Agent1.conf contents*******************************
# Flume_Agent1.conf: A single-node Flume configuration

# Below lines describe the names of components on this agent
Flume_Agent1.sources = netcat_source
Flume_Agent1.sinks = hdfs_sink
Flume_Agent1.channels = memory_channel

# Below lines describe/configure the source properties
Flume_Agent1.sources.netcat_source.type = netcat
Flume_Agent1.sources.netcat_source.bind = localhost
Flume_Agent1.sources.netcat_source.port = 55555

# Below lines describe the sink properties

Flume_Agent1.sinks.hdfs_sink.type      = hdfs
Flume_Agent1.sinks.hdfs_sink.hdfs.path = hdfs://localhost:8020/user/cloudera/flume/events
Flume_Agent1.sinks.hdfs_sink.hdfs.inUseSuffix = .log
Flume_Agent1.sinks.hdfs_sink.hdfs.rollInterval = 0
Flume_Agent1.sinks.hdfs_sink.hdfs.writeFormat = Text
Flume_Agent1.sinks.hdfs_sink.hdfs.fileType = DataStream

# Below lines describe channel properties

Flume_Agent1.channels.memory_channel.type = memory
Flume_Agent1.channels.memory_channel.capacity = 1000
Flume_Agent1.channels.memory_channel.transactionCapacity = 100

# Below lines bind the source and sink to the channel
Flume_Agent1.sources.netcat_source.channels = memory_channel
Flume_Agent1.sinks.hdfs_sink.channel = memory_channel
*********************************************************************************

We will continue with Apache Flume in the next post ...