In the third part of Apache Kafka, we will write a producer in Java. We will use the same environment that we set up in this post.
In addition, we are using IntelliJ IDEA, Community Edition (Version: 2017.3.4, Build: 173.4548.28, Released: January 30, 2018) as IDE for the Java program. A screenshot is shown below:
In this post, we will write a producer in Java. This will read a file and publish the contents to a topic. A consumer will subscribe to this topic and output the message to the console
We invoke zookeeper and kafka as shown below. Then, we create a topic called topic1:
F:\>
Write below code in Intellij IDEA and run it:
Above code reads the NOTICE file under Kafka Home and publishes the file contents to topic called topic1. In a way, the code above simulates the FileSource Connector under Kafka Connect in the sense that it reads a file and publishes to a topic
Next, we start a consumer to receive the messages published to the topic1:
Start Consumer:
On another, start a console consumer as follows:
bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic topic1 --from-beginning
Output is shown below:
The contents to NOTICE file is shown below for reference:
The messages are identical to the contents of file above
In addition, we are using IntelliJ IDEA, Community Edition (Version: 2017.3.4, Build: 173.4548.28, Released: January 30, 2018) as IDE for the Java program. A screenshot is shown below:
In this post, we will write a producer in Java. This will read a file and publish the contents to a topic. A consumer will subscribe to this topic and output the message to the console
We invoke zookeeper and kafka as shown below. Then, we create a topic called topic1:
Start Zookeeper:
Open first terminal:
F:\>
F:\>cd kafka_2.12-1.0.0
F:\kafka_2.12-1.0.0>bin\windows\zookeeper-server-start.bat config\zookeeper.properties
Start Kafka:
Next, open another terminal and start Kafka as shown below:
F:\>
F:\>cd kafka_2.12-1.0.0
F:\kafka_2.12-1.0.0>bin\windows\kafka-server-start.bat config\server.properties
Create Topic:
Open first terminal:
F:\>
F:\>cd kafka_2.12-1.0.0
F:\kafka_2.12-1.0.0>bin\windows\zookeeper-server-start.bat config\zookeeper.properties
Start Kafka:
Next, open another terminal and start Kafka as shown below:
F:\>
F:\>cd kafka_2.12-1.0.0
F:\kafka_2.12-1.0.0>bin\windows\kafka-server-start.bat config\server.properties
Create Topic:
In
a third terminal, let us now proceed to create a topic called topic1
having a replication factor as 1 and number of partitions as 1:
F:\>
F:\>cd kafka_2.12-1.0.0F:\kafka_2.12-1.0.0>bin\windows\kafka-topics.bat
--create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topic1
Write below code in Intellij IDEA and run it:import java.util.Properties;
import java.io.IOException;
import java.io.BufferedReader;
import java.io.FileReader;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
public class customProducer {
private static final String topic= "topic1";
public static void main(String[] args) {
String fileName = "C:\\kafka_2.12-1.0.0\\NOTICE";
Properties properties = new Properties();
// kafka bootstrap server properties properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
properties.setProperty("key.serializer", StringSerializer.class.getName());
properties.setProperty("value.serializer", StringSerializer.class.getName());
// producer properties properties.setProperty("acks", "1");
properties.setProperty("retries", "3");
properties.setProperty("linger.ms", "1");
KafkaProducer<String,String> producer =
new org.apache.kafka.clients.producer.KafkaProducer<String,String>(properties);
try (BufferedReader br = new BufferedReader(new FileReader(fileName))) {
String line;
while ((line = br.readLine()) != null) {
ProducerRecord<String,String> producerRecord =
new ProducerRecord<String,String>(topic, line);
producer.send(producerRecord);
}
} catch (IOException e) {
e.printStackTrace();
}
producer.close();
}
}
Above code reads the NOTICE file under Kafka Home and publishes the file contents to topic called topic1. In a way, the code above simulates the FileSource Connector under Kafka Connect in the sense that it reads a file and publishes to a topic
Next, we start a consumer to receive the messages published to the topic1:
Start Consumer:
On another, start a console consumer as follows:
bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic topic1 --from-beginning
Output is shown below:
The contents to NOTICE file is shown below for reference:
The messages are identical to the contents of file above