Saturday, 27 January 2018

Apache Kafka - II

In the first segment on Apache Kafka, we covered the introduction and installation of Apache Kafka. In this second part, we will take a look at how data is published and subscribed in Apache Kafka

For working with Kafka, Zookeeper needs to be up and running. Open a terminal and run the below command:

F:\>
F:\>cd kafka_2.12-1.0.0


Start Zookeeper:

F:\kafka_2.12-1.0.0>bin\windows\zookeeper-server-start.bat config\zookeeper.properties

Note that the argument to the batch file is the file config\zookeeper.properties. Opening the file we see that the port on which zookeeper will run is 2181












Last few lines of output is shown below:

[2018-01-27 10:11:23,876] INFO Server environment:user.dir=F:\kafka_2.12-1.0.0 (org.apache.zookeeper.server.ZooKeeperServer)
[2018-01-27 10:11:23,907] INFO tickTime set to 3000 (org.apache.zookeeper.server.ZooKeeperServer)
[2018-01-27 10:11:23,907] INFO minSessionTimeout set to -1 (org.apache.zookeeper.server.ZooKeeperServer)
[2018-01-27 10:11:23,907] INFO maxSessionTimeout set to -1 (org.apache.zookeeper.server.ZooKeeperServer)
[2018-01-27 10:11:23,985] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)


The last line shows that zookeeper server is running on 2181 port as per entry in the file

Start Kafka:

Next, open another terminal and start Kafka as shown below:

F:\>
F:\>cd kafka_2.12-1.0.0

F:\kafka_2.12-1.0.0>bin\windows\kafka-server-start.bat config\server.properties

Last few lines of output is shown below:

[2018-01-27 10:35:18,071] INFO [TransactionCoordinator id=0] Startup complete. (kafka.coordinator.transaction.TransactionCoordinator)
[2018-01-27 10:35:22,680] INFO Creating /brokers/ids/0 (is it secure? false) (kafka.utils.ZKCheckedEphemeral)
[2018-01-27 10:35:22,758] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
[2018-01-27 10:35:22,758] INFO Registered broker 0 at path /brokers/ids/0 with addresses: EndPoint(workstation,9092,ListenerName(PLAINTEXT),PLAINTEXT) (kafka.utils.ZkUtils)
[2018-01-27 10:35:22,774] INFO Kafka version : 1.0.0 (org.apache.kafka.common.utils.AppInfoParser)
[2018-01-27 10:35:22,774] INFO Kafka commitId : aaa7af6d4a11b29d (org.apache.kafka.common.utils.AppInfoParser)
[2018-01-27 10:35:22,789] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)


The message indicates that Kafka server is up and running.

Create Topic:

In a third terminal, let us now proceed to create a topic called topic_1 having a replication factor as 1 and number of partitions as 3:

F:\>
F:\>cd kafka_2.12-1.0.0

F:\kafka_2.12-1.0.0>bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic topic_1

Output is shown below:

WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
Created topic "topic_1".


Start Producer:

Once the topic is created, we can start producers that publish data to this topic and consumers that will subscribe to the topic that will receive the messages. Open a fourth terminal to create console producer by entering below commands:

F:\>
F:\>cd kafka_2.12-1.0.0

F:\kafka_2.12-1.0.0>bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic topic_1

We get a prompt as shown below:

F:\kafka_2.12-1.0.0>bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic topic_1
>


Start Consumer:

On a fifth terminal, start a console consumer as follows:

F:\kafka_2.12-1.0.0>bin\windows\kafka-console-consumer.bat --zookeeper localhost:2181 --topic topic_1

Output is shown below:

Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].

Now, in the fourth terminal on the prompt enter below messages:

Hello, Kafka!
Message 1
Message 2
Message 3

The producer terminal is shown below:







These messages should appear in the consumer terminal as shown below:









List Topics:

Use below command to list all topics on third terminal:

bin\windows\kafka-topics --zookeeper localhost:2181 --list

Output is shown below:

F:\kafka_2.12-1.0.0>bin\windows\kafka-topics --zookeeper localhost:2181 --list
topic_1


Describe Topic:

bin\windows\kafka-topics --describe --zookeeper localhost:2181 --topic topic_1

Output is shown below:

F:\kafka_2.12-1.0.0>bin\windows\kafka-topics --describe --zookeeper localhost:2181 --topic topic_1
Topic:topic_1   PartitionCount:3        ReplicationFactor:1     Configs:
        Topic: topic_1  Partition: 0    Leader: 0       Replicas: 0     Isr: 0
        Topic: topic_1  Partition: 1    Leader: 0       Replicas: 0     Isr: 0
        Topic: topic_1  Partition: 2    Leader: 0       Replicas: 0     Isr: 0


Delete Topic:

bin\windows\kafka-topics --delete --zookeeper localhost:2181 --topic topic_1

Output is shown below:

F:\kafka_2.12-1.0.0>bin\windows\kafka-topics --delete --zookeeper localhost:2181 --topic topic_1
Topic topic_1 is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.


When we try to list the topics again on the third terminal, we get below output:

F:\kafka_2.12-1.0.0>bin\windows\kafka-topics --zookeeper localhost:2181 --list
__consumer_offsets
topic_1 - marked for deletion


On all terminals, click CTRL+C to exit

This concludes the introductory commands on Apache Kafka