Sunday 28 January 2018

Apache Kafka - III

In the third part of Apache Kafka, we continue with more commands. We will use the same environment that we set up in this post

Let us restart Zookeeper and Kafka we have done in previous post with below commands:

Start Zookeeper:

Open first terminal:

F:\>
F:\>cd kafka_2.12-1.0.0

F:\kafka_2.12-1.0.0>bin\windows\zookeeper-server-start.bat config\zookeeper.properties

Start Kafka:

Next, open another terminal and start Kafka as shown below:

F:\>
F:\>cd kafka_2.12-1.0.0

F:\kafka_2.12-1.0.0>bin\windows\kafka-server-start.bat config\server.properties


Create Topic:

In a third terminal, let us now proceed to create a topic called topic1 having a replication factor as 1 and number of partitions as 3:

F:\>
F:\>cd kafka_2.12-1.0.0

F:\kafka_2.12-1.0.0>bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic topic1

Start Producer:

In a fourth terminal, let us start a producer with below command as shown:

F:\>
F:\>cd kafka_2.12-1.0.0

F:\kafka_2.12-1.0.0>bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic topic1


In a fifth terminal, increase the retention time (in milliseconds) of messages as shown below:

F:\>
F:\>cd kafka_2.12-1.0.0

F:\kafka_2.12-1.0.0>bin\windows\kafka-configs --zookeeper localhost:2181 --entity-type topics --entity-name topic1 --alter --add-config retention.ms=1800000

In the fourth terminal, add the following messages:

>Message 1
>Message 2
>Message 3
>Message 4
>Message 5
>Message 6
>Message 7
>Message 8
>Message 9
>Message 10
>Message 11
>Messgae 12
>Message 13


Start Consumer:

On a sixth terminal, start a console consumer as follows:

bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic topic1 --from-beginning

Output is shown below:

F:\kafka_2.12-1.0.0>bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic topic1 --from-beginning
Message 1
Message 4
Message 7
Message 10
Message 13
Message 2
Message 5
Message 8
Message 11
Message 3
Message 6
Message 9
Messgae 12


Note that the order of messages is not the same.  You can see the messages through the command we ran in the post:

F:\kafka_2.12-1.0.0>bin\windows\kafka-console-consumer.bat --zookeeper localhost:2181 --topic topic1 --from-beginning
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
Message 2
Message 5
Message 8
Message 11
Message 3
Message 6
Message 9
Messgae 12
Message 1
Message 4
Message 7
Message 10
Message 13


To see the messages in a partition, run below command:

bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic topic1 --partition 0 --from-beginning

Output is shown below:

F:\kafka_2.12-1.0.0>bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic topic1 --partition 0 --from-beginning
Message 2
Message 5
Message 8
Message 11


Note that the messages within a partition are ordered 

This concludes the third post on Apache Kafka

Saturday 27 January 2018

Apache Kafka - II

In the first segment on Apache Kafka, we covered the introduction and installation of Apache Kafka. In this second part, we will take a look at how data is published and subscribed in Apache Kafka

For working with Kafka, Zookeeper needs to be up and running. Open a terminal and run the below command:

F:\>
F:\>cd kafka_2.12-1.0.0


Start Zookeeper:

F:\kafka_2.12-1.0.0>bin\windows\zookeeper-server-start.bat config\zookeeper.properties

Note that the argument to the batch file is the file config\zookeeper.properties. Opening the file we see that the port on which zookeeper will run is 2181












Last few lines of output is shown below:

[2018-01-27 10:11:23,876] INFO Server environment:user.dir=F:\kafka_2.12-1.0.0 (org.apache.zookeeper.server.ZooKeeperServer)
[2018-01-27 10:11:23,907] INFO tickTime set to 3000 (org.apache.zookeeper.server.ZooKeeperServer)
[2018-01-27 10:11:23,907] INFO minSessionTimeout set to -1 (org.apache.zookeeper.server.ZooKeeperServer)
[2018-01-27 10:11:23,907] INFO maxSessionTimeout set to -1 (org.apache.zookeeper.server.ZooKeeperServer)
[2018-01-27 10:11:23,985] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)


The last line shows that zookeeper server is running on 2181 port as per entry in the file

Start Kafka:

Next, open another terminal and start Kafka as shown below:

F:\>
F:\>cd kafka_2.12-1.0.0

F:\kafka_2.12-1.0.0>bin\windows\kafka-server-start.bat config\server.properties

Last few lines of output is shown below:

[2018-01-27 10:35:18,071] INFO [TransactionCoordinator id=0] Startup complete. (kafka.coordinator.transaction.TransactionCoordinator)
[2018-01-27 10:35:22,680] INFO Creating /brokers/ids/0 (is it secure? false) (kafka.utils.ZKCheckedEphemeral)
[2018-01-27 10:35:22,758] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
[2018-01-27 10:35:22,758] INFO Registered broker 0 at path /brokers/ids/0 with addresses: EndPoint(workstation,9092,ListenerName(PLAINTEXT),PLAINTEXT) (kafka.utils.ZkUtils)
[2018-01-27 10:35:22,774] INFO Kafka version : 1.0.0 (org.apache.kafka.common.utils.AppInfoParser)
[2018-01-27 10:35:22,774] INFO Kafka commitId : aaa7af6d4a11b29d (org.apache.kafka.common.utils.AppInfoParser)
[2018-01-27 10:35:22,789] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)


The message indicates that Kafka server is up and running.

Create Topic:

In a third terminal, let us now proceed to create a topic called topic_1 having a replication factor as 1 and number of partitions as 3:

F:\>
F:\>cd kafka_2.12-1.0.0

F:\kafka_2.12-1.0.0>bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic topic_1

Output is shown below:

WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
Created topic "topic_1".


Start Producer:

Once the topic is created, we can start producers that publish data to this topic and consumers that will subscribe to the topic that will receive the messages. Open a fourth terminal to create console producer by entering below commands:

F:\>
F:\>cd kafka_2.12-1.0.0

F:\kafka_2.12-1.0.0>bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic topic_1

We get a prompt as shown below:

F:\kafka_2.12-1.0.0>bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic topic_1
>


Start Consumer:

On a fifth terminal, start a console consumer as follows:

F:\kafka_2.12-1.0.0>bin\windows\kafka-console-consumer.bat --zookeeper localhost:2181 --topic topic_1

Output is shown below:

Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].

Now, in the fourth terminal on the prompt enter below messages:

Hello, Kafka!
Message 1
Message 2
Message 3

The producer terminal is shown below:







These messages should appear in the consumer terminal as shown below:









List Topics:

Use below command to list all topics on third terminal:

bin\windows\kafka-topics --zookeeper localhost:2181 --list

Output is shown below:

F:\kafka_2.12-1.0.0>bin\windows\kafka-topics --zookeeper localhost:2181 --list
topic_1


Describe Topic:

bin\windows\kafka-topics --describe --zookeeper localhost:2181 --topic topic_1

Output is shown below:

F:\kafka_2.12-1.0.0>bin\windows\kafka-topics --describe --zookeeper localhost:2181 --topic topic_1
Topic:topic_1   PartitionCount:3        ReplicationFactor:1     Configs:
        Topic: topic_1  Partition: 0    Leader: 0       Replicas: 0     Isr: 0
        Topic: topic_1  Partition: 1    Leader: 0       Replicas: 0     Isr: 0
        Topic: topic_1  Partition: 2    Leader: 0       Replicas: 0     Isr: 0


Delete Topic:

bin\windows\kafka-topics --delete --zookeeper localhost:2181 --topic topic_1

Output is shown below:

F:\kafka_2.12-1.0.0>bin\windows\kafka-topics --delete --zookeeper localhost:2181 --topic topic_1
Topic topic_1 is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.


When we try to list the topics again on the third terminal, we get below output:

F:\kafka_2.12-1.0.0>bin\windows\kafka-topics --zookeeper localhost:2181 --list
__consumer_offsets
topic_1 - marked for deletion


On all terminals, click CTRL+C to exit

This concludes the introductory commands on Apache Kafka

Friday 26 January 2018

Apache Kafka - I

In the next few posts starting with this one, we will take a look at Apache Kafka. Apache Kafka is a distributed streaming platform written in Scala and Java languages. We have already seen Sqoop and Apache Flume as ingestion tools for migrating data into Big Data systems. Apache Kafka serves the twin purpose of:

1) migrating data between systems/applications in real time through streaming data pipelines, and
2) processing streams of data in real time through applications

Apacha Kafka uses the Publish Subscribe model of messaging pattern. In the Publish Subscribe model, publishers are applications that are senders of  messages and subscribers are applications are recipients of messages. Publishers publish messages without explicitly specifying recipients or having knowledge of intended recipients. Publishers do not program the messages to be sent to any particular subscriber. Instead, the publishers publish messages to topics. Subscribers that evince an interest in a particular topic can subscribe to it to receive all the messages that are published to that topic by the publishers. Note that all subscribers subscribing to the same topic will receive the same messages related to that topic. In this sense, a loosely coupled architecture between the publishers and the subscribers is achieved and either of them do not have any knowledge of the other and can operate independently of the other.

Apache Kafka terminology consists of:

1) Producers: publish data to any number of topics of their choice. The producer is responsible for choosing which record to assign to which partition within that topic. This assignment of a record to a partition can be done in a round-robin fashion to balance load or it can be done according to a custom partition function

2) Consumers: are consumer instances grouped under a consumer group name. For every record that is published by a Producer under a topic, that message is delivered to one consumer instance that is part of a subscribing consumer group

3) Topics: are categories or feed names to which records are published by Producers. Topics in Kafka are always multi-subscriber based in that a topic can have zero, one, or many consumers that subscribe to the data written to it. Topics are split into one or more partitions. For each topic, a minimum of one partition exists.

4) Partitions: are ordered, immutable sequence of records that is continually appended to by records being published by producers. A sequential id number called the offset that uniquely identifies each record within the partition is assigned to each record within the partition. The Kafka cluster retains all published records within a partition, regardless of their having been consumed by a consumer, using a configurable retention period

5) Records: consists of an optional key, a value, and a timestamp. Producers publish records to a partition under a topic based on the record key. If the record has a key, then, the default partitioner for Java uses a hash of the record key to choose the partition, else, uses a round-robin strategy if the record has no key





















The four core Kafka APIs (Refer above figure) are:

1) Producer API: allows an application to publish a stream of records to one or more Kafka topics
2) Consumer API: allows an application to subscribe to one or more topics and process the stream of records produced to them
3) Streams API: allows an application to act as a stream processor, consuming an input stream from one or more topics and producing an output stream to one or more output topics, effectively transforming the input streams to output streams
4) Connector API: allows building and running reusable producers or consumers that connect Kafka topics to existing applications or data systems

Apache Kafka has the following prerequisites:

1) Java Runtime Environment
2) Memory - Sufficient memory for all Kafka related configurations
3) Disk Space - Sufficient disk space for all Kafka related configurations
4) Directory Permissions - Read/Write permissions for directories for Kafka related configurations
5) Zookeeper

In the first part of the series, we will install Kafka on Windows.

Download latest Kafka binaries from this link. As of writing this post, 1.0.0 is the latest release. The current stable version is 1.0.0. The link to the binary is:

https://www.apache.org/dyn/closer.cgi?path=/kafka/1.0.0/kafka_2.12-1.0.0.tgz

Click on sha512 to see this:

kafka_2.12-1.0.0.tgz: 1B647B7F 392148AA 2B9D4755 0A1502E5 0BE4B005 C70C82DA
                                E03065B8 9484E664 00528BE4 0CA2D54F 35EB2C0E 70F35C88
                                A04777C2 B625DAA5 D5546CAA B4ED6818


Move the downloaded binary to a drive, F drive in this case. Now, let us use the utility mentioned in this post to ensure completeness of the downloaded binary:

F:\>CertUtil -hashfile kafka_2.12-1.0.0.tgz SHA512
SHA512 hash of kafka_2.12-1.0.0.tgz:
1b647b7f392148aa2b9d47550a1502e50be4b005c70c82dae03065b89484e66400528be40ca2d54f35eb2c0e70f35c88a04777c2b625daa5d5546caab4ed6818
CertUtil: -hashfile command completed successfully.


The values are the same. Move the file to a folder created under F drive called kafka_2.12-1.0.0. We will use 7-Zip Manager to unzip this file. Open 7-Zip File Manager and point it to this file:







Click on Extract and enter F:\kafka_2.12-1.0.0\ in Extract to. Then, click OK:
















Select newly created file, kafka_2.12-1.0.0.tar. Click on Extract again and enter F:\ in Extract to. Then, click OK:























The final extracted files are shown below:










On the command line, navigate to kafka folder as follows:

F:\>
F:\>cd kafka_2.12-1.0.0\bin\windows
F:\kafka_2.12-1.0.0\bin\windows>


To see zookeeper files, enter below command:

F:\kafka_2.12-1.0.0\bin\windows>dir zookeeper*

Output is shown below: 

 Directory of F:\kafka_2.12-1.0.0\bin\windows

10/27/2017  09:26 PM             1,192 zookeeper-server-start.bat
10/27/2017  09:26 PM               905 zookeeper-server-stop.bat
10/27/2017  09:26 PM               977 zookeeper-shell.bat
               3 File(s)          3,074 bytes


To see kafka files, enter below command:

F:\kafka_2.12-1.0.0\bin\windows>dir kafka*

Output is shown below:

  Directory of F:\kafka_2.12-1.0.0\bin\windows

10/27/2017  09:26 PM               873 kafka-acls.bat
10/27/2017  09:26 PM               885 kafka-broker-api-versions.bat
10/27/2017  09:26 PM               876 kafka-configs.bat
10/27/2017  09:26 PM               925 kafka-console-consumer.bat
10/27/2017  09:26 PM               925 kafka-console-producer.bat
10/27/2017  09:26 PM               883 kafka-consumer-groups.bat
10/27/2017  09:26 PM               884 kafka-consumer-offset-checker.bat
10/27/2017  09:26 PM               938 kafka-consumer-perf-test.bat
10/27/2017  09:26 PM               874 kafka-mirror-maker.bat
10/27/2017  09:26 PM               900 kafka-preferred-replica-election.bat
10/27/2017  09:26 PM               940 kafka-producer-perf-test.bat
10/27/2017  09:26 PM               888 kafka-reassign-partitions.bat
10/27/2017  09:26 PM               880 kafka-replay-log-producer.bat
10/27/2017  09:26 PM               886 kafka-replica-verification.bat
10/27/2017  09:26 PM             5,276 kafka-run-class.bat
10/27/2017  09:26 PM             1,377 kafka-server-start.bat
10/27/2017  09:26 PM               997 kafka-server-stop.bat
10/27/2017  09:26 PM               882 kafka-simple-consumer-shell.bat
10/27/2017  09:26 PM               875 kafka-topics.bat
              19 File(s)         21,964 bytes


This concludes the first post on Apache Kafka ...

Sunday 21 January 2018

Apache Flume - II

In the earlier post, we introduced Apache Flume that is useful for migrating logs into HDFS. In this post, we continue to share more details on Apache Flume. More details on Apache Flume can be found here. For all the work on Flume in this post, we will use the Cloudera sandbox, Cloudera QuickStart VM 5.12

We have already prepared the .conf file and it is located on Desktop. Click anywhere on Desktop in virtual machine, right click  and click on Open in Terminal to start the terminal window in the virtual machine and run the following command:

flume-ng version

This returns below results:

[cloudera@quickstart Desktop]$ flume-ng version
Flume 1.6.0-cdh5.12.0
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: 4d37abdc65a53a0af1246de51bdcf1d8557669eb
Compiled by jenkins on Thu Jun 29 04:37:39 PDT 2017
From source with checksum af8d398eddf3e3b4ddbada14895722ac


To see help for flume, run below command:

flume-ng help

This returns below results:

[cloudera@quickstart Desktop]$ flume-ng help
Usage: /usr/lib/flume-ng/bin/flume-ng <command> [options]...

commands:
  help                      display this help text
  agent                     run a Flume agent
  avro-client               run an avro Flume client
  version                   show Flume version info

global options:
  --conf,-c <conf>          use configs in <conf> directory
  --classpath,-C <cp>       append to the classpath
  --dryrun,-d               do not actually start Flume, just print the command
  --plugins-path <dirs>     colon-separated list of plugins.d directories. See the
                            plugins.d section in the user guide for more details.
                            Default: $FLUME_HOME/plugins.d
  -Dproperty=value          sets a Java system property value
  -Xproperty=value          sets a Java -X option

agent options:
  --name,-n <name>          the name of this agent (required)
  --conf-file,-f <file>     specify a config file (required if -z missing)
  --zkConnString,-z <str>   specify the ZooKeeper connection to use (required if -f missing)
  --zkBasePath,-p <path>    specify the base path in ZooKeeper for agent configs
  --no-reload-conf          do not reload config file if changed
  --help,-h                 display help text

avro-client options:
  --rpcProps,-P <file>   RPC client properties file with server connection params
  --host,-H <host>       hostname to which events will be sent
  --port,-p <port>       port of the avro source
  --dirname <dir>        directory to stream to avro source
  --filename,-F <file>   text file to stream to avro source (default: std input)
  --headerFile,-R <file> File containing event headers as key/value pairs on each new line
  --help,-h              display help text

  Either --rpcProps or both --host and --port must be specified.

Note that if <conf> directory is specified, then it is always included first
in the classpath.


To invoke the Flume Agent in a dry run, let us run below command:

flume-ng agent -f Flume_Agent1.conf -n Flume_Agent1 -Dflume.root.logger=DEBUG,console -d

-f is a place holder for the file name
-n is name of Flume agent
-d indicates that this command is a dry run
-D is for setting Java system properties. Here, flume.root.logger is set to DEBUG and output directed to console

The output is not shown for the sake of brevity. But, it ends with the following lines:

apache.flume.node.Application -f Flume_Agent1.conf -n Flume_Agent1
+ exit 0


Now let us run the same command but without the -d:

flume-ng agent -f Flume_Agent1.conf -n Flume_Agent1 -Dflume.root.logger=DEBUG,console

The output is not shown for the sake of brevity. But, last few lines are shown below:

18/01/21 08:28:20 INFO node.AbstractConfigurationProvider: Creating channels
18/01/21 08:28:20 INFO channel.DefaultChannelFactory: Creating instance of channel memory_channel type memory
18/01/21 08:28:20 INFO node.AbstractConfigurationProvider: Created channel memory_channel
18/01/21 08:28:20 INFO source.DefaultSourceFactory: Creating instance of source netcat_source, type netcat
18/01/21 08:28:20 INFO sink.DefaultSinkFactory: Creating instance of sink: hdfs_sink, type: hdfs
18/01/21 08:28:20 INFO node.AbstractConfigurationProvider: Channel memory_channel connected to [netcat_source, hdfs_sink]
18/01/21 08:28:20 INFO node.Application: Starting new configuration:{ sourceRunners:{netcat_source=EventDrivenSourceRunner: { source:org.apache.flume.source.NetcatSource{name:netcat_source,state:IDLE} }} sinkRunners:{hdfs_sink=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@71060c3e counterGroup:{ name:null counters:{} } }} channels:{memory_channel=org.apache.flume.channel.MemoryChannel{name: memory_channel}} }
18/01/21 08:28:20 INFO node.Application: Starting Channel memory_channel
18/01/21 08:28:20 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: memory_channel: Successfully registered new MBean.
18/01/21 08:28:20 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: memory_channel started
18/01/21 08:28:20 INFO node.Application: Starting Sink hdfs_sink
18/01/21 08:28:20 INFO node.Application: Starting Source netcat_source
18/01/21 08:28:20 INFO source.NetcatSource: Source starting
18/01/21 08:28:20 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: hdfs_sink: Successfully registered new MBean.
18/01/21 08:28:20 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: hdfs_sink started
18/01/21 08:28:20 INFO source.NetcatSource: Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/127.0.0.1:55555]


Note that all Flume components have been created as per the .conf file and the agent is listening on 127.0.0.1:55555

Open a second terminal window and run below command:

echo 'Hello, Flume!' | nc localhost 55555

Netcat connects to port 55555 on localhost and sends the Hello string to it. It then prints the response

This returns below results:

[cloudera@quickstart Desktop]$ echo 'Hello, Flume!' | nc localhost 55555
OK


We see an update on the first terminal window and below lines have been added:

18/01/21 08:49:54 INFO hdfs.HDFSDataStream: Serializer = TEXT, UseRawLocalFileSystem = false
18/01/21 08:49:54 INFO hdfs.BucketWriter: Creating hdfs://localhost:8020/user/cloudera/flume/events/FlumeData.1516553394580.log


When we check the directory /user/cloudera/flume/events/ directory, we see the file that has been created:











Contents of the file is shown below:














Let us run below command:

echo -e '\n Apache Flume: \n\n\tHello, \n\tFlume!' | nc localhost 55555
 

This gets appended to the existing file:














Let us delete this file and  now pass data form a file called employees.csv that was created in earlier posts.

head -n 5 employees.csv | nc localhost 55555

This will transmit the first 5 lines of employees.csv to the port on which Flume is listening

The contents of the file is shown below:











Now, stop the agent on the first terminal by using CTRL+C. Update the Flume_Agent1.conf file to set two new properties as shown below:

# Below lines describe the sink properties
Flume_Agent1.sinks.hdfs_sink.type      = hdfs
Flume_Agent1.sinks.hdfs_sink.hdfs.path = hdfs://localhost:8020/user/cloudera/flume/events
Flume_Agent1.sinks.hdfs_sink.hdfs.inUseSuffix = .log
Flume_Agent1.sinks.hdfs_sink.hdfs.rollInterval = 0
Flume_Agent1.sinks.hdfs_sink.hdfs.rollSize = 1048576
Flume_Agent1.sinks.hdfs_sink.hdfs.rollCount = 200
Flume_Agent1.sinks.hdfs_sink.hdfs.writeFormat = Text
Flume_Agent1.sinks.hdfs_sink.hdfs.fileType = DataStream

hdfs.rollSize is the file size to trigger roll, in bytes. hdfs.rollCount is number of events written to file before it rolled. We can manipulate these properties to reduce number of files created in HDFS. In our case, setting these parameters will result only in one target file for the single source file, employees_file.json. Note that employees_file.json has been created in earlier posts

Now, start the Flume agent once again in the first terminal using below command:

flume-ng agent -f Flume_Agent1.conf -n Flume_Agent1 -Dflume.root.logger=DEBUG,console -d

In the second terminal, type the below command:

cat ./employees_file.json | nc localhost 55555

Then, check for the single file created in /user/cloudera/flume/events:

 


 







You may stop the Flume agent by using CTRL+C on the first terminal window.

This concludes the discussion on Apache Flume

Apache Flume - I

In an earlier post, we looked at Sqoop as a means to get data into HDFS. In this post, we will look at another Apache project, Flume, that is useful for migrating logs into HDFS. More details on Apache Flume can be found here. For all the work on Flume in this post, we will use the Cloudera sandbox, Cloudera QuickStart VM 5.12.

Apache Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data. While Sqoop migrates data from any RDBMS to HDFS, Flume is suited for streaming data sources that are generated continuously like logs and port this data to HDFS

Apache Flume terminology consists of:

1) Events: are the starting point for Apache Flume. They are a unit of data flow having a byte payload and an optional set of string attributes

2) Sources: consume events delivered to it by an external source like a web server. The external source sends events to Flume in a format that is recognized by the target Flume source. When a Flume source receives an event, it stores it into one or more channels

3) Channels: are a passive store that keep the event until it is consumed by a Flume sink

4) Sinks: removes the event from the channel and puts it either into an external repository like HDFS (via Flume HDFS sink) or forward it to the Flume source of the next Flume agent (next hop) in the flow

The source and sink within the given agent run asynchronously with the events staged in the channel.

The data flow in Apache Flume is shown below:













Apache Flume has the following prerequisites:

1) Java Runtime Environment
2) Memory - Sufficient memory for configurations used by sources, channels or sinks
3) Disk Space - Sufficient disk space for configurations used by channels or sinks
4) Directory Permissions - Read/Write permissions for directories used by agent

Let us do a quick check for two points mentioned above in our virtual machine:

java -version

This returns below results on the terminal window in the virtual machine:

[cloudera@quickstart ~]$ java -version
java version "1.7.0_67"
Java(TM) SE Runtime Environment (build 1.7.0_67-b01)
Java HotSpot(TM) 64-Bit Server VM (build 24.65-b04, mixed mode)


For directory permissions, create a directory structure under /user/cloudera/ directory as follows:

/user/cloudera/flume/events/

and set Write access for Group and Other users by right clicking on events folder and setting the permissions:






































Creating a directory in File Browser has been dealt with in earlier posts.

Now, we need to write a single .conf file that will have details about source, channel, sink, and their types.

To see sample template file, navigate as shown below:

[cloudera@quickstart ~]$ cd /etc/alternatives/flume-ng-conf
[cloudera@quickstart flume-ng-conf]$ ls -l
total 16
-rw-r--r-- 1 root root    0 Jun 29  2017 flume.conf
-rw-r--r-- 1 root root 1661 Jun 29  2017 flume-conf.properties.template
-rw-r--r-- 1 root root 1455 Jun 29  2017 flume-env.ps1.template
-rw-r--r-- 1 root root 1565 Jun 29  2017 flume-env.sh.template
-rw-r--r-- 1 root root 3118 Jun 29  2017 log4j.properties
[cloudera@quickstart flume-ng-conf]$ gedit flume-conf.properties.template


The last command will show the file contents:

*******************flume-conf.properties.template contents******************************
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#  http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.


# The configuration file needs to define the sources,
# the channels and the sinks.
# Sources, channels and sinks are defined per agent,
# in this case called 'agent'

agent.sources = seqGenSrc
agent.channels = memoryChannel
agent.sinks = loggerSink

# For each one of the sources, the type is defined
agent.sources.seqGenSrc.type = seq

# The channel can be defined as follows.
agent.sources.seqGenSrc.channels = memoryChannel

# Each sink's type must be defined
agent.sinks.loggerSink.type = logger

#Specify the channel the sink should use
agent.sinks.loggerSink.channel = memoryChannel

# Each channel's type is defined.
agent.channels.memoryChannel.type = memory

# Other config values specific to each type of channel(sink or source)
# can be defined as well
# In this case, it specifies the capacity of the memory channel
agent.channels.memoryChannel.capacity = 100


*********************************************************************************

Use the File -> Save As feature to save the file as Flume_Agent1.conf on Desktop

We will use below names while editing this file that will be used for our Flume work:

1) Flume Agent: Flume_Agent1
2) Source: netcat_source
3) Sink: hdfs_sink
4) Channel: memory_channel

Let us edit above file to enable our Flume work. The hash symbol is used for entering comments. The first three lines registers the names of Flume Agent, Source, Sink, and Channel as shown below:

Flume_Agent1.sources = netcat_source
Flume_Agent1.sinks = hdfs_sink
Flume_Agent1.channels = memory_channel


The next three lines list the properties of the source:

Flume_Agent1.sources.netcat_source.type = netcat
Flume_Agent1.sources.netcat_source.bind = localhost
Flume_Agent1.sources.netcat_source.port = 55555

Note that we are using netcat, a  computer networking utility for reading from and writing to network connections using TCP or UDP. There is no need to install it as it already available in the virtual machine. The agent listens to source, netcat_source, that is of type, netcat, on localhost at port 55555

The next few lines list the properties of the sink:

Flume_Agent1.sinks.hdfs_sink.type      = hdfs
Flume_Agent1.sinks.hdfs_sink.hdfs.path = hdfs://localhost:8020/user/cloudera/flume/events
Flume_Agent1.sinks.hdfs_sink.hdfs.inUseSuffix = .log
Flume_Agent1.sinks.hdfs_sink.hdfs.rollInterval = 0
Flume_Agent1.sinks.hdfs_sink.hdfs.writeFormat = Text
Flume_Agent1.sinks.hdfs_sink.hdfs.fileType = DataStream
The hdfs.sink type is hdfs and the target directory is /user/cloudera/flume/events that we created earlier. The hdfs.rollInterval set to 0 is the number of seconds to wait before rolling current file. The hdfs.inUseSuffix is set to .log and is suffix that is used for temporal files that flume actively writes into. The hdfs.writeFormat property is set to Text and is format for sequence file records. The hdfs.fileType is file format of output. Setting it to DataStream means that the file output will not be in compressed format

The next three lines describe the properties of the memory channel:

Flume_Agent1.channels.memory_channel.type = memory
Flume_Agent1.channels.memory_channel.capacity = 1000
Flume_Agent1.channels.memory_channel.transactionCapacity = 100


capacity sets the maximum number of events in the channel is set to 1000 here. transactionCapacity is the maximum number of events the channel will take from a source or give to a sink per transaction and is set to 100 here

The last two line bind the source and sink to the memory channel

Flume_Agent1.sources.netcat_source.channels = memory_channel
Flume_Agent1.sinks.hdfs_sink.channel = memory_channel


The final edited file is shown below for reference:

***************************Flume_Agent1.conf contents*******************************
# Flume_Agent1.conf: A single-node Flume configuration

# Below lines describe the names of components on this agent
Flume_Agent1.sources = netcat_source
Flume_Agent1.sinks = hdfs_sink
Flume_Agent1.channels = memory_channel

# Below lines describe/configure the source properties
Flume_Agent1.sources.netcat_source.type = netcat
Flume_Agent1.sources.netcat_source.bind = localhost
Flume_Agent1.sources.netcat_source.port = 55555

# Below lines describe the sink properties

Flume_Agent1.sinks.hdfs_sink.type      = hdfs
Flume_Agent1.sinks.hdfs_sink.hdfs.path = hdfs://localhost:8020/user/cloudera/flume/events
Flume_Agent1.sinks.hdfs_sink.hdfs.inUseSuffix = .log
Flume_Agent1.sinks.hdfs_sink.hdfs.rollInterval = 0
Flume_Agent1.sinks.hdfs_sink.hdfs.writeFormat = Text
Flume_Agent1.sinks.hdfs_sink.hdfs.fileType = DataStream

# Below lines describe channel properties

Flume_Agent1.channels.memory_channel.type = memory
Flume_Agent1.channels.memory_channel.capacity = 1000
Flume_Agent1.channels.memory_channel.transactionCapacity = 100

# Below lines bind the source and sink to the channel
Flume_Agent1.sources.netcat_source.channels = memory_channel
Flume_Agent1.sinks.hdfs_sink.channel = memory_channel
*********************************************************************************

We will continue with Apache Flume in the next post ...

Wednesday 17 January 2018

Spark - IV

We continue to explore Spark in the fourth part of the series. The first part of the Spark post is here in case you wish to look at it for the environment setup. We will use be using Hortonworks Sandbox HDP 2.6.3 for all the work in this post. Note that the Spark version is 2.2.x. This is different from the version in first post on Spark. We will look at the different types of actions in Spark.

Let us create a RDD first before we look at actions in Spark as shown below and print it out:

val rdd = sc.makeRDD(Vector(1,2,3,4,5,6))
rdd.collect


This returns below results:

rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[254] at makeRDD at <console>:34
res217: Array[Int] = Array(1, 2, 3, 4, 5, 6)
Actions in Spark are described below:
1)  reduce(func): Returns the result after aggregation of the elements of calling RDD using a function, func (which takes two arguments and returns one value). func should be both commutative and associative so that it can be computed correctly in parallel
val result = rdd.reduce((x,y) => x+y)

This returns below results:

result: Int = 21

Same commend in a different syntax:

val result = rdd.reduce(_+_)

This returns below results:

result: Int = 21
2)  collect: Returns all the elements of the RDD as an array
We have seen collect when created rdd above
3)  count: Return the number of elements in RDD

rdd.count

This returns below results:

res221: Long = 6

4) first: Return the first element of RDD

rdd.first

This returns below results:

res222: Int = 1

5) take(n): Returns the first n elements of RDD

rdd.take(3)

This returns below results:

res223: Array[Int] = Array(1, 2, 3)

6) takeSample(withReplacement, num, [seed]): Returns an array with a random sample of num elements of RDD, with or without replacement, optionally pre-specifying a random number generator seed

With replacement:

rdd.takeSample(true, 3)

This returns below results:

res224: Array[Int] = Array(2, 5, 5)

Without replacement:

rdd.takeSample(false, 3)

This returns below results:

res225: Array[Int] = Array(3, 5, 6)

7)  takeOrdered(n, [ordering]): Returns the first n elements of the RDD using an optional parameter that specifies either their natural order or a custom comparator

val rdd1 = sc.makeRDD(Vector("Hello, ","This ","is ","Spark ","language."))
rdd1.takeOrdered(4)


This returns below results:
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[259] at makeRDD at <console>:34
res227: Array[String] = Array("Hello, ", "Spark ", "This ", "is ")

8)  saveAsTextFile(path): Outputs the elements of RDD as a text file (or set of text files) in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system

rdd1.saveAsTextFile("/user/spark_test/test/")

This returns below results:

content of part-00000 file at /user/spark_test/test/:

Hello,
This


content of part-00001 file at /user/spark_test/test/:

is
Spark
language. 


9) saveAsSequenceFile(path): Outputs the elements of RDD as a Hadoop Sequence file in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system

10)  saveAsObjectFile(path): Outputs the elements of RDD  in a simple format using Java serialization in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system

rdd1.saveAsTextFile("/user/spark_test/test1/")

11) countByKey: Returns a hashmap of (K, Int) pairs with the count of each key for RDDs of type (K, V)

val rdd2 = sc.makeRDD(Array(("foo",1), ("bar",2),("foo","foo"), ("bar","bar"),("foo","foo"), ("bar","bar"),("foo","foo"), ("bar","bar"),("foo",3)))
rdd2.countByKey 


This returns below results:

rdd2: org.apache.spark.rdd.RDD[(String, Any)] = ParallelCollectionRDD[269] at makeRDD at <console>:34
res236: scala.collection.Map[String,Long] = Map(foo -> 5, bar -> 4)
 
12) foreach(func): Run a function func on each element of RDD
 
rdd.collect.foreach(println)

This returns below results:

1
2
3
4
5
6

This concludes the topic of Actions in Spark

Spark - III

We continue to explore Spark in the third part of the series. The first part of the Spark post is here in case you wish to look at it for the environment setup. We will use be using Hortonworks Sandbox HDP 2.6.3 for all the work in this post. Note that the Spark version is 2.2.x. This is different from the version in first post on Spark. We will continue with the different types of transformations in Spark.

8) intersection(other Dataset): Returns a dataset that is an intersection of the calling dataset and the called dataset

rdd.intersection(rdd2).collect

This returns below results:

res132: Array[Int] = Array(6, 3, 4, 1, 5, 2)

9) distinct([numTasks]): Returns a dataset that contains distinct elements. numTasks is optional and is the number of tasks

sc.parallelize(Vector(1,1,2,2,3,3)).distinct.collect
This returns below results:

res141: Array[Int] = Array(2, 1, 3)

10) groupBy: Returns a RDD after grouping by set criteria

val Array1: Array[(String, Int)]  = Array(("King",24000), ("Kochhar",17000),("De Haan", 17000))
val rdd3 = sc.parallelize(Array1)
rdd3.groupBy(x => x._1.charAt(0)).collect


This returns below results:

Array1: Array[(String, Int)] = Array((King,24000), (Kochhar,17000), (De Haan,17000))
rdd3: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[160] at parallelize at <console>:29
res163: Array[(Char, Iterable[(String, Int)])] = Array((D,CompactBuffer((De Haan,17000))), (K,CompactBuffer((King,24000), (Kochhar,17000))))


Here the grouping is by the first character of the first element in each key value pair

11) groupByKey([numTasks]): Returns a RDD of (K, Iterable<V>) pairs on a RDD of (K, V) pairs

val Array2: Array[(String, Int)]  = Array(("King",24000), ("King",17000),("De Haan", 17000))
val rdd3 = sc.parallelize(Array2)
rdd3.groupByKey.collect


This returns below results:

Array2: Array[(String, Int)] = Array((King,24000), (King,17000), (De Haan,17000))
rdd3: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[163] at parallelize at <console>:29
res164: Array[(String, Iterable[Int])] = Array((King,CompactBuffer(24000, 17000)), (De Haan,CompactBuffer(17000)))


12) reduceByKey(func, [numTasks]): Returns a dataset of (K, V) pairs on a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V

val Array2: Array[(String, Int)]  = Array(("King",24000), ("King",17000),("De Haan", 17000))
val rdd3 = sc.parallelize(Array2)
rdd3.reduceByKey((x,y) => (x+y)).collect
rdd3.reduceByKey(_ + _).collect


This returns below results:

Array2: Array[(String, Int)] = Array((King,24000), (King,17000), (De Haan,17000))
rdd3: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[175] at parallelize at <console>:29
res172: Array[(String, Int)] = Array((King,41000), (De Haan,17000))
res173: Array[(String, Int)] = Array((King,41000), (De Haan,17000))


13) aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]): Returns a dataset of (K, U) pairs on a dataset of (K, V) pairs where the values for each key are aggregated using the given combine functions and a default "zero" value

val Array2: Array[(String, Int)]  = Array(("King",24000), ("King",17000),("De Haan", 17000))
val rdd3 = sc.parallelize(Array2)
rdd3.aggregateByKey(0)((accumulator, v) => accumulator + v, (v1, v2) => v1 + v2).collect


This returns below results:

Array2: Array[(String, Int)] = Array((King,24000), (King,17000), (De Haan,17000))
rdd3: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[182] at parallelize at <console>:29
res177: Array[(String, Int)] = Array((King,41000), (De Haan,17000))


14) sortByKey([ascending], [numTasks]): Returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending argument on a dataset of (K, V) pairs where K implements Ordered

val Array2: Array[(String, Int)]  = Array(("King",24000), ("King",17000),("De Haan", 17000)) 
val rdd3 = sc.parallelize(Array2)
rdd3.sortByKey().collect


This returns below results:

Array2: Array[(String, Int)] = Array((King,24000), (King,17000), (De Haan,17000))
rdd3: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[187] at parallelize at <console>:29
res180: Array[(String, Int)] = Array((De Haan,17000), (King,24000), (King,17000))
 
Default sorting is in ascending order
 
15)  join(otherDataset, [numTasks]):Returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key on datasets of type (K, V) and (K, W)
 
val rdd4 = sc.makeRDD(Array(("foo",1),("bar",2)))
rdd4.collect
val rdd5 = sc.makeRDD(Array(("foo","foo"),("bar","bar")))
rdd5.collect
rdd4.join(rdd5).collect


This returns below results:

rdd4: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[191] at makeRDD at <console>:27
res182: Array[(String, Int)] = Array((foo,1), (bar,2))
rdd5: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[192] at makeRDD at <console>:27
res183: Array[(String, String)] = Array((foo,foo), (bar,bar))
res184: Array[(String, (Int, String))] = Array((foo,(1,foo)), (bar,(2,bar)))
 
16) cogroup(otherDataset, [numTasks]): Returns a dataset of (K, (Iterable<V>, Iterable<W>)) tuples on datasets of type (K, V) and (K, W)
 
rdd4.cogroup(rdd5).collect
 
This returns below results:

res186: Array[(String, (Iterable[Int], Iterable[String]))] = Array((foo,(CompactBuffer(1),CompactBuffer(foo))), (bar,(CompactBuffer(2),CompactBuffer(bar))))

17) cartesian(otherDataset): Returns a dataset of (T, U) pairs (all pairs of elements) on datasets of types T and U

rdd4.cartesian(rdd4).collect

This returns below results:

res188: Array[((String, Int), (String, Int))] = Array(((foo,1),(foo,1)), ((foo,1),(bar,2)), ((bar,2),(foo,1)), ((bar,2),(bar,2)))

18) pipe(command, [envVars]): Returns output after piping each partition of RDD through shell command

19) coalesce(numPartitions): Returns RDD with number of partitions in numPartitions

val rdd2 = sc.parallelize(1 to 6,3)
rdd2.collect
rdd2.getNumPartitions
rdd2.coalesce(2).getNumPartitions

This returns below results:

rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[205] at parallelize at <console>:27
res195: Array[Int] = Array(1, 2, 3, 4, 5, 6)
res196: Int = 3
res197: Int = 2
 
20) repartition(numPartitions): Reshuffles the data in the RDD randomly to create either more or fewer partitions and balance it across them

val rdd6 = sc.parallelize(1 to 12,3)
rdd6.getNumPartitions
rdd6.saveAsTextFile("/user/spark_test/test/")

This returns below results:

rdd6: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[219] at parallelize at <console>:27
res206: Int = 3
 
Looking at the files using Ambari in /user/spark_test/test/ directory, we see three data files with entries as:
 
1
2
3
4

5
6
7
8

9
10
11
12


Now, let us change the partitions to 4 using repartition:
 
rdd6.repartition(4).getNumPartitions
rdd6.repartition(4).saveAsTextFile("/user/spark_test/test1/")

This returns below results:
 
res208: Int = 4
 
Looking at the files using Ambari in /user/spark_test/test1/ directory, we see four data files with entries as:
 
2
6
10

3
7
11

4
8
12

1
5
9

 
21)  repartitionAndSortWithinPartitions(partitioner): Repartitions the RDD according to the given partitioner and, within each resulting partition, sort records by their keys

import org.apache.spark.HashPartitioner
val Array2: Array[(String, Int)]  = Array(("d",4),("a",1),("c",3),("f",6),("e",5),("b",2))
val rdd7 = sc.makeRDD(Array2)
val repartitioned = rdd7.repartitionAndSortWithinPartitions(new HashPartitioner(2))
repartitioned.saveAsTextFile("/user/spark_test/test/")

This returns below results:
 
import org.apache.spark.HashPartitioner
Array2: Array[(String, Int)] = Array((d,4), (a,1), (c,3), (f,6), (e,5), (b,2))
rdd7: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[249] at makeRDD at <console>:36
repartitioned: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[250] at repartitionAndSortWithinPartitions at <console>:38
 
We see two files in that directory with contents as:

(b,2)
(d,4)
(f,6)
 
(a,1)
(c,3)
(e,5)
 
This concludes the topic of transformations in Scala