Sunday 21 January 2018

Apache Flume - II

In the earlier post, we introduced Apache Flume that is useful for migrating logs into HDFS. In this post, we continue to share more details on Apache Flume. More details on Apache Flume can be found here. For all the work on Flume in this post, we will use the Cloudera sandbox, Cloudera QuickStart VM 5.12

We have already prepared the .conf file and it is located on Desktop. Click anywhere on Desktop in virtual machine, right click  and click on Open in Terminal to start the terminal window in the virtual machine and run the following command:

flume-ng version

This returns below results:

[cloudera@quickstart Desktop]$ flume-ng version
Flume 1.6.0-cdh5.12.0
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: 4d37abdc65a53a0af1246de51bdcf1d8557669eb
Compiled by jenkins on Thu Jun 29 04:37:39 PDT 2017
From source with checksum af8d398eddf3e3b4ddbada14895722ac


To see help for flume, run below command:

flume-ng help

This returns below results:

[cloudera@quickstart Desktop]$ flume-ng help
Usage: /usr/lib/flume-ng/bin/flume-ng <command> [options]...

commands:
  help                      display this help text
  agent                     run a Flume agent
  avro-client               run an avro Flume client
  version                   show Flume version info

global options:
  --conf,-c <conf>          use configs in <conf> directory
  --classpath,-C <cp>       append to the classpath
  --dryrun,-d               do not actually start Flume, just print the command
  --plugins-path <dirs>     colon-separated list of plugins.d directories. See the
                            plugins.d section in the user guide for more details.
                            Default: $FLUME_HOME/plugins.d
  -Dproperty=value          sets a Java system property value
  -Xproperty=value          sets a Java -X option

agent options:
  --name,-n <name>          the name of this agent (required)
  --conf-file,-f <file>     specify a config file (required if -z missing)
  --zkConnString,-z <str>   specify the ZooKeeper connection to use (required if -f missing)
  --zkBasePath,-p <path>    specify the base path in ZooKeeper for agent configs
  --no-reload-conf          do not reload config file if changed
  --help,-h                 display help text

avro-client options:
  --rpcProps,-P <file>   RPC client properties file with server connection params
  --host,-H <host>       hostname to which events will be sent
  --port,-p <port>       port of the avro source
  --dirname <dir>        directory to stream to avro source
  --filename,-F <file>   text file to stream to avro source (default: std input)
  --headerFile,-R <file> File containing event headers as key/value pairs on each new line
  --help,-h              display help text

  Either --rpcProps or both --host and --port must be specified.

Note that if <conf> directory is specified, then it is always included first
in the classpath.


To invoke the Flume Agent in a dry run, let us run below command:

flume-ng agent -f Flume_Agent1.conf -n Flume_Agent1 -Dflume.root.logger=DEBUG,console -d

-f is a place holder for the file name
-n is name of Flume agent
-d indicates that this command is a dry run
-D is for setting Java system properties. Here, flume.root.logger is set to DEBUG and output directed to console

The output is not shown for the sake of brevity. But, it ends with the following lines:

apache.flume.node.Application -f Flume_Agent1.conf -n Flume_Agent1
+ exit 0


Now let us run the same command but without the -d:

flume-ng agent -f Flume_Agent1.conf -n Flume_Agent1 -Dflume.root.logger=DEBUG,console

The output is not shown for the sake of brevity. But, last few lines are shown below:

18/01/21 08:28:20 INFO node.AbstractConfigurationProvider: Creating channels
18/01/21 08:28:20 INFO channel.DefaultChannelFactory: Creating instance of channel memory_channel type memory
18/01/21 08:28:20 INFO node.AbstractConfigurationProvider: Created channel memory_channel
18/01/21 08:28:20 INFO source.DefaultSourceFactory: Creating instance of source netcat_source, type netcat
18/01/21 08:28:20 INFO sink.DefaultSinkFactory: Creating instance of sink: hdfs_sink, type: hdfs
18/01/21 08:28:20 INFO node.AbstractConfigurationProvider: Channel memory_channel connected to [netcat_source, hdfs_sink]
18/01/21 08:28:20 INFO node.Application: Starting new configuration:{ sourceRunners:{netcat_source=EventDrivenSourceRunner: { source:org.apache.flume.source.NetcatSource{name:netcat_source,state:IDLE} }} sinkRunners:{hdfs_sink=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@71060c3e counterGroup:{ name:null counters:{} } }} channels:{memory_channel=org.apache.flume.channel.MemoryChannel{name: memory_channel}} }
18/01/21 08:28:20 INFO node.Application: Starting Channel memory_channel
18/01/21 08:28:20 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: memory_channel: Successfully registered new MBean.
18/01/21 08:28:20 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: memory_channel started
18/01/21 08:28:20 INFO node.Application: Starting Sink hdfs_sink
18/01/21 08:28:20 INFO node.Application: Starting Source netcat_source
18/01/21 08:28:20 INFO source.NetcatSource: Source starting
18/01/21 08:28:20 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: hdfs_sink: Successfully registered new MBean.
18/01/21 08:28:20 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: hdfs_sink started
18/01/21 08:28:20 INFO source.NetcatSource: Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/127.0.0.1:55555]


Note that all Flume components have been created as per the .conf file and the agent is listening on 127.0.0.1:55555

Open a second terminal window and run below command:

echo 'Hello, Flume!' | nc localhost 55555

Netcat connects to port 55555 on localhost and sends the Hello string to it. It then prints the response

This returns below results:

[cloudera@quickstart Desktop]$ echo 'Hello, Flume!' | nc localhost 55555
OK


We see an update on the first terminal window and below lines have been added:

18/01/21 08:49:54 INFO hdfs.HDFSDataStream: Serializer = TEXT, UseRawLocalFileSystem = false
18/01/21 08:49:54 INFO hdfs.BucketWriter: Creating hdfs://localhost:8020/user/cloudera/flume/events/FlumeData.1516553394580.log


When we check the directory /user/cloudera/flume/events/ directory, we see the file that has been created:











Contents of the file is shown below:














Let us run below command:

echo -e '\n Apache Flume: \n\n\tHello, \n\tFlume!' | nc localhost 55555
 

This gets appended to the existing file:














Let us delete this file and  now pass data form a file called employees.csv that was created in earlier posts.

head -n 5 employees.csv | nc localhost 55555

This will transmit the first 5 lines of employees.csv to the port on which Flume is listening

The contents of the file is shown below:











Now, stop the agent on the first terminal by using CTRL+C. Update the Flume_Agent1.conf file to set two new properties as shown below:

# Below lines describe the sink properties
Flume_Agent1.sinks.hdfs_sink.type      = hdfs
Flume_Agent1.sinks.hdfs_sink.hdfs.path = hdfs://localhost:8020/user/cloudera/flume/events
Flume_Agent1.sinks.hdfs_sink.hdfs.inUseSuffix = .log
Flume_Agent1.sinks.hdfs_sink.hdfs.rollInterval = 0
Flume_Agent1.sinks.hdfs_sink.hdfs.rollSize = 1048576
Flume_Agent1.sinks.hdfs_sink.hdfs.rollCount = 200
Flume_Agent1.sinks.hdfs_sink.hdfs.writeFormat = Text
Flume_Agent1.sinks.hdfs_sink.hdfs.fileType = DataStream

hdfs.rollSize is the file size to trigger roll, in bytes. hdfs.rollCount is number of events written to file before it rolled. We can manipulate these properties to reduce number of files created in HDFS. In our case, setting these parameters will result only in one target file for the single source file, employees_file.json. Note that employees_file.json has been created in earlier posts

Now, start the Flume agent once again in the first terminal using below command:

flume-ng agent -f Flume_Agent1.conf -n Flume_Agent1 -Dflume.root.logger=DEBUG,console -d

In the second terminal, type the below command:

cat ./employees_file.json | nc localhost 55555

Then, check for the single file created in /user/cloudera/flume/events:

 


 







You may stop the Flume agent by using CTRL+C on the first terminal window.

This concludes the discussion on Apache Flume