Saturday, March 28, 2015

Apache Kafka : New producer API in 0.8.2

In Kafka version 0.8.2, there is a newer, better and faster version of the Producer API. You might recall from earlier blogs that the Producer is used to send messages to a topic. If you are new to Kafka, please read following blogs first.

Some features of the new producer are :
  • Asynchronously send messages to a topic.
  • Send returns immediately. Producer buffers messages and sends them to broker in the background.
  • Thanks to buffering, many messages sent to broker at one time without waiting for responses.
  • Send method returns a Future<RecordMetadata>. RecordMetadata has information on the record like which partition it stored in and what the offset is.
  • Caller may optionally provide a callback, which gets called when the message is acknowledged.
  • Buffer can at times fill up. Buffer size is configurable and can be configured using the total.memory.bytes configuration property.
  • If the buffer fills up, the Producer can either block or throw an exception. The behavior is controlled by the block.on.buffer.full configuration property.
In the rest of the blog we will use Producer API to rewrite the Producer we wrote in tutorial #1

For this example, you will need the following

For this tutorial you will need

(1) Apache Kafka 0.8.2
(2) JDK 7 or higher. An IDE of your choice is optional
(3) Apache Maven
(4) Source code for this sample from so you can look at working code.

In this tutorial we take the Producer we wrote in Step 5 Kafka tutorial 1 and rewrite it using the new API. We will send messages to a topic on a Kafka Cluster and consume it with the consumer we wrote in that tutorial.

Step 1: Step up a Kafka cluster and create a topic

If you are new to Kafka, you can read and follow the instructions in my tutorial 1 to setup a cluster and create a topic.

Step 2: Get the source code for tutorial 1,2,3 from

Copy to We will port KafkaProducer082 to the new producer API.

Step 3: Write the new Producer

Update the maven dependencies in pom.xml.

For the new producer you will need


The rest of the client code also needs to be updated to 0.8.2.


The new producer will not work if rest of the client uses 0.8.1 or lower versions.

Step 3.1: Imports

Remove the old imports and add these.

import org.apache.kafka.clients.producer.KafkaProducer ;
import org.apache.kafka.clients.producer.ProducerRecord;

Note the packages.

Step 3.2: Create the producer

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("request.required.acks", "1");

KafkaProducer producer = new KafkaProducer(props);

As in the past, you provide some configuration like which broker to connect to as Properties. The key and value serializers have to be provided. There are no default values.

Step 3.3: Send Messages

String date = "04092014" ;
String topic = "mjtopic" ;
for (int i = 1 ; i <= 1000000 ; i++) {
   String msg = date + " This is message " + i ;
   ProducerRecord data = new ProducerRecord(topic, 
            String.valueOf(i), msg);
    Future rs = producer.send(data, new Callback() {
        public void onCompletion(RecordMetadata recordMetadata, Exception e) {

          System.out.println("Received ack for partition=" + recordMetadata.partition() +

           " offset = " + recordMetadata.offset()) ;

      try {
        RecordMetadata rm = rs.get();
        msg = msg + "  partition = " + rm.partition() +  " offset =" + rm.offset() ;
        System.out.println(msg) ;
      } catch(Exception e) {
        System.out.println(e) ;


As mentioned earlier. The send is async and it will batch messages before sending to the broker. The send method immediately returns a Future that has the partition and offset in the partition for message send. We provide a callback to the send method whose onCompletion method is called when an acknowledgement for the message is received.

Step 4: Start the Consumer

mvn exec:java -Dexec.mainClass="com.mj.KafkaConsumer"

Step 5: Start the Producer

mvn exec:java -Dexec.mainClass="com.mj.KafkaProducer082" 

You should start seeing messages in the consumer.

In summary, the new producer API is asynchronous, scalable and returns useful metadata on the message sent.

Friday, January 9, 2015

Apache Kafka JAVA tutorial #3: Once and only once delivery

In Apache Kafka introduction, I provided an architectural overview on the internet scale messaging broker. In JAVA tutorial 1, we learnt how to send and receive messages using the high level consumer API. In JAVA tutorial 2, We examined partition leaders and metadata using the lower level Simple consumer API.

A key requirement of many real world messaging applications is that a message should be delivered once and only once to a consumer. If you have used the traditional JMS based message brokers, this is generally supported out of the box, with no additional work from the application programmer. But Kafka has distributed architecture where the messages to a topic are partitioned for scalability and replicated for fault tolerance and hence the application programmer  has to do a little more to ensure once and only once delivery.

Some key features of the Simple Consumer API are:
  • To fetch a message, you need to know the partition and partition leader.
  • You can read messages in the partition several times.
  • You can read from the first message in the partition or from a known offset.
  • With each read, you are returned an offset where the next read can happen.
  • You can implement once and only once read, by storing the offsets with the message that was just read, thereby making the read transactional. In the event of a crash, you can recover because you know what message was last read and where the next one should be read.
  • Not covered in this tutorial, but the API lets you determine how many partitions there are for a topic and who the leader for each partition is. While fetching message, you connect to the leader. Should a leader go down, you need to fail over by determining who the new leader is, connect to it and continue consuming messages
For this tutorial you will need

(1) Apache Kafka 0.8.1
(2) Apache Zookeeper
(3) JDK 7 or higher. An IDE of your choice is optional
(4) Apache Maven
(5) Source code for this sample from if you want to look at working code

In this tutorial, we will
(1) start a Kafka broker
(2) create a topic with 1 partition
(3) Send a messages to the topic
(4) Write a consumer using Simple API to fetch messages.
(5) Crash the consumer and restart it ( several times). Each time you will see that it reads the next message after the last one that was read.

Since we are focusing of reading messages from a particular offset in a partition, we will keep other things simple by limiting ourselves to 1 broker and 1 partition.

Step 1: Start the broker

bin/ config/

For the purposes of this tutorial, one broker is sufficient as we are reading from just one partition.

Step 2: Create the topic

bin/ --create --zookeeper localhost:2181 --partitions 1 --topic atopic

Again for the purposes of this tutorial we just need 1 partition.

Step 3: Send messages to the topic

Run the producer we wrote in tutorial 1 to send say 1000 messages to this topic.

Step 4: Write a consumer using SimpleConsumer API

The complete code is in the file

Create a file to store the next read offset. 

static {
    try {
      readoffset = new RandomAccessFile("readoffset", "rw");
    } catch (Exception e) {


Create a SimpleConsumer.

SimpleConsumer consumer = new SimpleConsumer("localhost", 9092, 100000, 64 * 1024, clientname);

If there is a offset stored in the file, we will read from the offset. Otherwise, we read from the beginining of the partition -- EarliestTime.

 long offset_in_partition = 0 ;
    try {
      offset_in_partition = readoffset.readLong();
    } catch(EOFException ef) {
      offset_in_partition =     getOffset(consumer,topic,partition,kafka.api.OffsetRequest.EarliestTime(),clientname) ;

The rest of the code is in a

while (true) {


loop. We will keep reading messages or sleep if there are none.

Within the loop, we create a request and fetch messages from the offset.

FetchRequest req = new FetchRequestBuilder()
          .addFetch(topic, partition, offset_in_partition, 100000).build();
FetchResponse fetchResponse = consumer.fetch(req);

Read messages from the response.

for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(topic, partition)) {
        long currentOffset = messageAndOffset.offset();
        if (currentOffset < offset_in_partition) {
        offset_in_partition = messageAndOffset.nextOffset();
        ByteBuffer payload = messageAndOffset.message().payload();

        byte[] bytes = new byte[payload.limit()];
        System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8"));;
        messages++ ;

        if (messages == 10) {
          System.out.println("Pretend a crash happened") ;

For each message that we read, we check that the offset is not less than the one we want to read from. If it is, we ignore the message. For efficiency, Kafka batches messages. So you can get messages already read. For each valid message, we print it and write the next read offset to the file. If the consumer were to crash, when restarted, it would start reading from the last saved offset.

For demo purposes, the code exits after 10 messages. If you run this program several times, you will see that it starts reading exactly from where it last stopped. You can change that value and experiment.

Step 5: Run the consumer several times.

mvn exec:java -Dexec.mainClass="com.mj.KafkaOnceAndOnlyOnceRead"

210: 04092014 This is message 211
211: 04092014 This is message 212
212: 04092014 This is message 213
213: 04092014 This is message 214
214: 04092014 This is message 215
215: 04092014 This is message 216
216: 04092014 This is message 217
217: 04092014 This is message 218
218: 04092014 This is message 219
219: 04092014 This is message 220

run it again 

mvn exec:java -Dexec.mainClass="com.mj.KafkaOnceAndOnlyOnceRead"

220: 04092014 This is message 221
221: 04092014 This is message 222
222: 04092014 This is message 223
223: 04092014 This is message 224
224: 04092014 This is message 225
225: 04092014 This is message 226
226: 04092014 This is message 227
227: 04092014 This is message 228
228: 04092014 This is message 229
229: 04092014 This is message 230

In Summary, it is possible to implement one and only once delivery of messages in Kafka by storing the read offset.

Thursday, November 20, 2014

Apache Kafka Java tutorial #2

In the blog Kafka introduction, I provided an overview of the features of Apache Kafka, an internet scale messaging broker. In Kafka tutorial #1, I provide a simple java programming example for sending and receiving messages using the high level consumer API.  Kafka also provides a Simple consumer API that provides greater control to the programmer for reading messages and partitions. Simple is a misnomer and this is a complicated API. SimpleConsumer connects directly to the leader of a partition and is able to fetch messages from an offset. Knowing the leader for a partition is a preliminary step for this. And if the leader goes down, you can recover and connect to the new leader.

In the tutorial, we will use the "Simple" API to find the lead broker for a topic partition.

To recap some Kafka concepts
  • Broker in Kafka is a cluster of brokers
  • Messages are sent to and received from topics
  • Topics are partitioned across brokers
  • For each partition there is 1 leader broker and 1 or more replicas
  • Ordering of messages is maintained only within a partition
To manage read positions within a topic, it has to be done at partition level and You need to know the leader for that partition.

For this tutorial you will need

(1) Apache Kafka 0.8.1
(2) Apache Zookeeper
(3) JDK 7 or higher. An IDE of your choice is optional
(4) Apache Maven
(5) Source code for this sample from if you want to look at working code

In this tutorial, we will
(1) create a 3 node kafka cluster
(2) create a topic with 12  partitions
(3) Write code to determine the leader of the partition
(4) Run the code to determine the leaders of each partition.
(5) Kill one broker and run again to determine the new leaders

Note that Kafka-topics --describe command lets you do the same. But we are doing it programatically for the sake of learning and because it is useful is some usecases.

Step 1 : Create a cluster

Follow the instruction is tutorial 1 to create a 3 node cluster.

Step 2 : Create a topic with 12 partitions

/usr/local/kafka/bin$ --create --zookeeper host1:2181 --replication-factor 2 --partitions 12 --topic mjtopic

Step 3 : Write code to determine the leader for each partition

We use the SimpleConsumer API.

import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.consumer.SimpleConsumer;

SimpleConsumer consumer = new SimpleConsumer("localhost", 9092, 
        100000, 64 * 1024,  "leaderLookup");
List topics = Collections.singletonList("mjtopic");
TopicMetadataRequest req = new TopicMetadataRequest(topics);
kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);

List metaData = resp.topicsMetadata();
int[] leaders = new int[12] ;

 for (TopicMetadata item : metaData) {
      for (PartitionMetadata part : item.partitionsMetadata()) {
          leaders[part.partitionId()] = part.leader().id() ;
for (int j = 0 ; j < 12 ; j++) {
      System.out.println("Leader for partition " + j + " " + leaders[j]) ;


SimpleConsumer can connect to any broker that is online. We construct a TopicMetadataRequest with the topic we are interested in and send it to broker with the consumer.send call. A TopicMetaData is returned which contains a set of PartitionMetaData ( one for each partition ). Each PartitionMetaData has the leader and replicas for that partition.

Step 4 : Run the code 

Leader for partition 0 1
Leader for partition 1 2
Leader for partition 2 3
Leader for partition 3 1
Leader for partition 4 2
Leader for partition 5 3
Leader for partition 6 1
Leader for partition 7 2
Leader for partition 8 3
Leader for partition 9 1
Leader for partition 10 2
Leader for partition 11 3

Step 5 : Kill node 3 and run the code again

Leader for partition 0 1
Leader for partition 1 2
Leader for partition 2 1
Leader for partition 3 1
Leader for partition 4 2
Leader for partition 5 1
Leader for partition 6 1
Leader for partition 7 2
Leader for partition 8 1
Leader for partition 9 1
Leader for partition 10 2
Leader for partition 11 1

You can see the broker 1 has assumed leadership for broker 3's partitions.

In summary, one of the things you can use the SimpleConsumer API is to examine topic partition metadata. We will use this code in future tutorials to determine the leader of a partition.

Wednesday, May 28, 2014

Apache Kafka JAVA tutorial #1

In Apache Kafka introduction we discussed some key features of Kafka. In this tutorial we will setup a small Kafka cluster. We will send messages to a topic using a JAVA producer. We will consume the messages using a JAVA consumer.

For this tutorial you will need

(1) Apache Kafka
(2) Apache Zookeeper
(3) JDK 7 or higher. An IDE of your choice is optional
(4) Apache Maven
(5) Source code for this sample

Zookeeper is required as the Kafka broker uses Zookeeper to store topic configuration and consumer information.

We will setup a 3 node cluster. I will assume you have 3 machines: host1,host2,host3. We will run a Kafka broker on each host. We will run Zookeeper on host1 and host2. You can do this tutorial on one machine, but you will need to change the port numbers to avoid conflict.

Step 1: Download the latest binaries

Apache Kafka can be downloaded at
Apache Zookeeper can be downloaded at

Step 2: Install Zookeeper

We will install zookeeper on host1 and host2. At least 2 is a good idea so that the cluster is still usable if one of the servers goes down.

For instructions on how to setup a zookeeper cluster see The instructions are simple and we will not repeat them here.

Step 3: Configure the Kafka Brokers

Assume you have unzipped the binaries to /usr/local/kafka.

Edit the config/ on each host as follows:

Each broker needs a unique id. On host1 set
On host2 set it to 2 and on host3 set it to 3

On each host, set the directory where the messages are stored

Tell the brokers which zookeepers to connect to

These are the only properties that need changes. All the other default properties in are good enough and you can change them as required.

Step 4: Start the brokers

On each server , from /usr/local/kafka/bin execute &

Step 4.1: Create a topic

Let us create a topic mytopic with 3 partitions and a replication factor of 2.

/usr/local/kafka/bin$ --create --zookeeper host1:2181 --replication-factor 2 --partitions 3 --topic mytopic

Step 5: Write a JAVA producer

The complete source code is at

Create the properties that need to be passed to the producer.
        Properties props = new Properties();
        props.put("", "host2:9092,host3:9092");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("request.required.acks", "1");
        ProducerConfig config = new ProducerConfig(props); is the list of brokers that the producer can try to connect to.
We are sending text messages. So we use the String encoder.   Setting request.required.acks to 1 ensures that a publish message request is considered completed only when an acknowledgment is received from the leader.
        Producer producer = new Producer(config);

The above line creates a producer which will send key and value both of type String.
        String topic = "mytopic"     
        for (int i = 1 ; i <= 1000 ; i++) {
            String msg = " This is message " + i ;

            KeyedMessage data = new KeyedMessage(topic, String.valueOf(i), msg);



The code above sends 1000 messages to the topic.

Step 6: Write a JAVA consumer

Our topic has 3 partitions. Messages within a partition are delivered to a consumer in order. So one consumer per partition makes sense. In our consumer,we will create 3 threads, one for each partition. We use what is called the High level consumer API

First setup the consumer configuration

        Properties props = new Properties();
        props.put("zookeeper.connect", "host:2181");
        props.put("", "mygroupid2");
        props.put("", "413");
        props.put("", "203");
        props.put("", "1000");
        ConsumerConfig cf = new ConsumerConfig(props) ;

zookeeper.connect tells the consumer which zookeeper to connect. Consumer needs to connect to zookeeper to get topic information as well as store consumer offset. is the name of the consumer group. Every message is delivered to each consumer group. tells how often the consumer should commit offsets to zookeeper.

       ConsumerConnector consumer = Consumer.createJavaConsumerConnector(cf) ;
       String topic = "mytopic"  ;
       Map topicCountMap = new HashMap();
       topicCountMap.put(topic, new Integer(3));
       Map<String,List<KafkaStream<byte[],byte[]>>> consumerMap =

       List<KafkaStream<byte[],byte[]>> streams = consumerMap.get(topic);

The code above creates a ConsumerConnector and gets a list of KafkaStreams. For this topic, we indicate 3 streams are required ( 1 for each partition) and the connector creates 3 streams.

        ExecutorService executor = Executors.newFixedThreadPool(3); ;
        int threadnum = 0 ;     
        for(KafkaStream stream  : streams) {
            executor.execute(new KafkaPartitionConsumer(threadnum,stream));
            ++threadnum ;

The code above creates a threadpool and submits a runnable KafkaPartitionConsumer that will read the stream. The code for the runnable is shown below.

        public static class KafkaPartitionConsumer implements Runnable {
            private int tnum ;
            private KafkaStream kfs ;
            public KafkaPartitionConsumer(int id, KafkaStream ks) {
                tnum = id ;
                kfs = ks ;
            public void run() {
                System.out.println("This is thread " + tnum) ;
                ConsumerIterator it = kfs.iterator();
                int i = 1 ;
                while (it.hasNext()) {
                    System.out.println(tnum + " " + i + ": " + new String(;
                    ++i ;


Each thread is reading from a stream from a particular partition. If there are no messages, the call hasNext() will block.

Step 7 : Start the consumer

I built my code using maven. When there are dependencies on jars, it is also easier to use maven to run the program, as it pulls in all the dependencies automatically into the classpath

mvn exec:java -Dexec.mainClass="com.mj.KafkaMultiThreadedConsumer"

Step 8 : Start the producer

mvn exec:java -Dexec.mainClass="com.mj.KafkaProducer"

You should see the consumer print out the messages.

You can start multiple consumers with a different groupid and they will each receive all the messages.

