Sunday, 4 February 2018

Apache Kafka - IV

In the third part of Apache Kafka, we will write a producer in Java. We will use the same environment that we set up in this post.

In addition, we are using IntelliJ IDEA, Community Edition (Version: 2017.3.4, Build: 173.4548.28, Released: January 30, 2018) as IDE for the Java program. A screenshot is shown below:














In this post, we will write a producer in Java. This will read a file and publish the contents to a topic. A consumer will subscribe to this topic and output the message to the console

We invoke zookeeper and kafka as shown below. Then, we create a topic called topic1:

Start Zookeeper:

Open first terminal:

F:\>
F:\>cd kafka_2.12-1.0.0

F:\kafka_2.12-1.0.0>bin\windows\zookeeper-server-start.bat config\zookeeper.properties

Start Kafka:

Next, open another terminal and start Kafka as shown below:

F:\>
F:\>cd kafka_2.12-1.0.0

F:\kafka_2.12-1.0.0>bin\windows\kafka-server-start.bat config\server.properties


Create Topic:

In a third terminal, let us now proceed to create a topic called topic1 having a replication factor as 1 and number of partitions as 1:

F:\>
F:\>cd kafka_2.12-1.0.0F:\kafka_2.12-1.0.0>bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topic1 

Write below code in Intellij IDEA and run it:

import java.util.Properties;

import java.io.IOException;
import java.io.BufferedReader;
import java.io.FileReader;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

public class customProducer {
    private static final String topic= "topic1";
    public static void main(String[] args) {
        String fileName = "C:\\kafka_2.12-1.0.0\\NOTICE";

        Properties properties = new Properties();

        // kafka bootstrap server properties        properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
        properties.setProperty("key.serializer", StringSerializer.class.getName());
        properties.setProperty("value.serializer", StringSerializer.class.getName());

        // producer properties        properties.setProperty("acks", "1");
        properties.setProperty("retries", "3");
        properties.setProperty("linger.ms", "1");

        KafkaProducer<String,String> producer = 
                                 new org.apache.kafka.clients.producer.KafkaProducer<String,String>(properties);

        try (BufferedReader br = new BufferedReader(new FileReader(fileName))) {

            String line;
            while ((line = br.readLine()) != null) {
                ProducerRecord<String,String> producerRecord =
                        new ProducerRecord<String,String>(topic, line);
                producer.send(producerRecord);
            }

        } catch (IOException e) {
            e.printStackTrace();
        }

        producer.close();
    }
}


Above code reads the NOTICE file under Kafka Home and publishes the file contents to topic called topic1. In a way, the code above simulates the FileSource Connector under Kafka Connect in the sense that it reads a file and publishes to a topic

Next, we start a consumer to receive the messages published to the topic1:

Start Consumer:

On another, start a console consumer as follows:

bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic topic1 --from-beginning

Output is shown below:




 








The contents to NOTICE file is shown below for reference:


















The messages are identical to the contents of file above