In Kafka version 0.8.2, there is a newer, better and faster version of the Producer API. You might recall from earlier blogs that the Producer is used to send messages to a topic. If you are new to Kafka, please read following blogs first.
Apache Kafka Introduction
Apache Kafka JAVA tutorial #1
Some features of the new producer are :
For this example, you will need the following
For this tutorial you will need
(1) Apache Kafka 0.8.2
(2) JDK 7 or higher. An IDE of your choice is optional
(3) Apache Maven
(4) Source code for this sample from https://github.com/mdkhanga/my-blog-code so you can look at working code.
In this tutorial we take the Producer we wrote in Step 5 Kafka tutorial 1 and rewrite it using the new API. We will send messages to a topic on a Kafka Cluster and consume it with the consumer we wrote in that tutorial.
Step 1: Step up a Kafka cluster and create a topic
If you are new to Kafka, you can read and follow the instructions in my tutorial 1 to setup a cluster and create a topic.
Step 2: Get the source code for tutorial 1,2,3 from https://github.com/mdkhanga/my-blog-code
Copy KafkaProducer.java to KafkaProducer082.java. We will port KafkaProducer082 to the new producer API.
Step 3: Write the new Producer
Update the maven dependencies in pom.xml.
For the new producer you will need
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.8.2.0</version>
</dependency>
The rest of the client code also needs to be updated to 0.8.2.
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.2.0</version>
</dependency>
The new producer will not work if rest of the client uses 0.8.1 or lower versions.
Step 3.1: Imports
Remove the old imports and add these.
import org.apache.kafka.clients.producer.KafkaProducer ;
import org.apache.kafka.clients.producer.ProducerRecord;
Note the packages.
Step 3.2: Create the producer
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("request.required.acks", "1");
KafkaProducer producer = new KafkaProducer(props);
As in the past, you provide some configuration like which broker to connect to as Properties. The key and value serializers have to be provided. There are no default values.
Step 3.3: Send Messages
String date = "04092014" ;
String topic = "mjtopic" ;
for (int i = 1 ; i <= 1000000 ; i++) {
String msg = date + " This is message " + i ;
ProducerRecord data = new ProducerRecord(topic,
String.valueOf(i), msg);
Future rs = producer.send(data, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
System.out.println("Received ack for partition=" + recordMetadata.partition() +
" offset = " + recordMetadata.offset()) ;
}
});
try {
RecordMetadata rm = rs.get();
msg = msg + " partition = " + rm.partition() + " offset =" + rm.offset() ;
System.out.println(msg) ;
} catch(Exception e) {
System.out.println(e) ;
}
}
As mentioned earlier. The send is async and it will batch messages before sending to the broker. The send method immediately returns a Future that has the partition and offset in the partition for message send. We provide a callback to the send method whose onCompletion method is called when an acknowledgement for the message is received.
Step 4: Start the Consumer
mvn exec:java -Dexec.mainClass="com.mj.KafkaConsumer"
Step 5: Start the Producer
mvn exec:java -Dexec.mainClass="com.mj.KafkaProducer082"
You should start seeing messages in the consumer.
In summary, the new producer API is asynchronous, scalable and returns useful metadata on the message sent.
Related Blogs:
Apache Kafka Introduction
Apache Kafka JAVA tutorial #1
Apache Kafka JAVA tutorial #2
Apache Kafka JAVA tutorial #3
Apache Kafka Introduction
Apache Kafka JAVA tutorial #1
Some features of the new producer are :
- Asynchronously send messages to a topic.
- Send returns immediately. Producer buffers messages and sends them to broker in the background.
- Thanks to buffering, many messages sent to broker at one time without waiting for responses.
- Send method returns a Future<RecordMetadata>
. RecordMetadata has information on the record like which partition it stored in and what the offset is. - Caller may optionally provide a callback, which gets called when the message is acknowledged.
- Buffer can at times fill up. Buffer size is configurable and can be configured using the total.memory.bytes configuration property.
- If the buffer fills up, the Producer can either block or throw an exception. The behavior is controlled by the block.on.buffer.full configuration property.
For this example, you will need the following
For this tutorial you will need
(1) Apache Kafka 0.8.2
(2) JDK 7 or higher. An IDE of your choice is optional
(3) Apache Maven
(4) Source code for this sample from https://github.com/mdkhanga/my-blog-code so you can look at working code.
In this tutorial we take the Producer we wrote in Step 5 Kafka tutorial 1 and rewrite it using the new API. We will send messages to a topic on a Kafka Cluster and consume it with the consumer we wrote in that tutorial.
Step 1: Step up a Kafka cluster and create a topic
If you are new to Kafka, you can read and follow the instructions in my tutorial 1 to setup a cluster and create a topic.
Step 2: Get the source code for tutorial 1,2,3 from https://github.com/mdkhanga/my-blog-code
Copy KafkaProducer.java to KafkaProducer082.java. We will port KafkaProducer082 to the new producer API.
Step 3: Write the new Producer
Update the maven dependencies in pom.xml.
For the new producer you will need
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.8.2.0</version>
</dependency>
The rest of the client code also needs to be updated to 0.8.2.
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.2.0</version>
</dependency>
The new producer will not work if rest of the client uses 0.8.1 or lower versions.
Step 3.1: Imports
Remove the old imports and add these.
import org.apache.kafka.clients.producer.KafkaProducer ;
import org.apache.kafka.clients.producer.ProducerRecord;
Note the packages.
Step 3.2: Create the producer
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("request.required.acks", "1");
KafkaProducer
As in the past, you provide some configuration like which broker to connect to as Properties. The key and value serializers have to be provided. There are no default values.
Step 3.3: Send Messages
String date = "04092014" ;
String topic = "mjtopic" ;
for (int i = 1 ; i <= 1000000 ; i++) {
String msg = date + " This is message " + i ;
ProducerRecord
String.valueOf(i), msg);
Future
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
System.out.println("Received ack for partition=" + recordMetadata.partition() +
" offset = " + recordMetadata.offset()) ;
}
});
try {
RecordMetadata rm = rs.get();
msg = msg + " partition = " + rm.partition() + " offset =" + rm.offset() ;
System.out.println(msg) ;
} catch(Exception e) {
System.out.println(e) ;
}
}
As mentioned earlier. The send is async and it will batch messages before sending to the broker. The send method immediately returns a Future
Step 4: Start the Consumer
mvn exec:java -Dexec.mainClass="com.mj.KafkaConsumer"
Step 5: Start the Producer
mvn exec:java -Dexec.mainClass="com.mj.KafkaProducer082"
You should start seeing messages in the consumer.
In summary, the new producer API is asynchronous, scalable and returns useful metadata on the message sent.
Related Blogs:
Apache Kafka Introduction
Apache Kafka JAVA tutorial #1
Apache Kafka JAVA tutorial #2
Apache Kafka JAVA tutorial #3