Tuesday, May 19, 2015

Apache Cassandra: Compaction

In Cassandra vs HBase, I provided an an overview of Cassandra. In Cassandra data model, I covered data modeling in Cassandra. In this blog, I go a little bit into Cassandra internals and discuss Compaction, a topic that is a source of grief for many users. Very often you hear that during compaction, performance degrades. We will discuss what compaction is, why it is necessary and the different types of compaction.

Compaction is process of merging multiple SSTables into larger tables. It removes data that has been marked for deletion and reduces fragmentation. Generally it happens automatically in the background, but can be started manually as well.

Why is compaction necessary ?

Cassandra is optimized for writes. A write is first written in memory to a table called Memtable. When Memtable reaches a certain size it is written in its entirety to disk as a new SSTable. SStable has an index which consists of sorted keys, which point to the location  in file that has the columns. SSTables are immutable. They are never updated.

The high throughput for writes is achieved by always appending and never seeking before writing . Updates to existing keys are also written to the current Memtable and eventually written to a new SStable. There are no disk seeks while writing.

Obviously, over time there are going to be several SSTables on disk. Not only that, but the latest column values for a single key might be spread over several SSTables.

How does this affect reads ?

Reading from one SSTable is easy. Find the key in the index. Keys are sorted. So a binary search would find the key. After that it is one disk seek to the location of the columns.

But as pointed out earlier, the updates for a single key might be spread over several SSTables. So for the latest values, Cassandra would need to read several SSTables and merge updates based on timestamps before returning columns.

Rather than do this for every read, it is worthwhile to merge SSTables in the background, so that when a read request arrives, Cassandra needs to just read from fewer SSTables ( one would be ideal).

Compaction

Compaction is the process of merging SSTables in order to
  • read columns for partition key from as few SSTables as possible
  • remove deleted data
  • reduce fragmentation
We did not talk about delete earlier. When Cassandra receives a request to delete a partition key, it merely marks it for deletion but does not actually remove the data associated with the key. The term used in Cassandra is "tombstone". A tombstone is created. During compaction, tombstones are supposed to be removed.

Types of Compaction

Size tiered compaction:

This is based on number of  SSTables and size of table. A compaction is triggered when the number tables and their size reaches a certain threshhold. Tables of similar size are grouped into buckets for compaction. Smaller tables are merged into a larger table.

Some disadvantages of size tiered compaction are that read performance can vary because the columns for a partition key can be spread over several SSTables. A lot for free space ( double the current storage) is required during compaction, since the merge process is making a copy.

Leveled compaction:

There are multiple levels of SSTables. SSTables within a level are of the same size and non overlapping (Within each level, a partition key will be in one SSTable only) . SSTables in the higher levels are larger. Data from the lower levels is merged into SSTables of the higher levels.
Leveled compaction tries to ensure that most reads happen from 1 SSTable. The worst read performance is bound by the number of levels. This works well for read heavy workloads because Cassandra knows which SSTable within each level to check for the key. But more work needs to be done during compaction especially for write(insert)  heavy workloads. Due to the extra work to ensure a fixed number of SSTables, there is a lot more IO.

Data tiered compaction:

 Data written within a certain period of time say 1 hr is merged in one SSTable.  This works well when you are writing time series data and querying based on timestamp. A query such as give me columns written in the last 1 hr can be serviced by reading just 1 SSTable. This also makes it easy to remove tombstones that are based on TTL. Data with the same TTL is likely to be in the same SSTable and the entire SSTable can be dropped.

Manual compaction:

This is compaction started manually using the nodetool compact command. A keyspace and table are specified. If you do not specify the table, the compaction will run on all tables. This is called a major compaction. It involves a lot of IO and is generally not done.

In summary, compaction is really fundamental to distributed databases like Cassandra. Without the append only architecture, write throughput would be much lower. And high write through put is necessary for high scalable systems or stated in another way - writes are much harder to scale and are generally the bottleneck. Read can be scaled easily by de-normalization , replication and caching.

Even with relational databases, applications do not go to Oracle or MySql for every read. Typically there is cache like Memcached or Redis, that caches frequently read data. For predictable read performance consider fronting Cassandra with a fast cache. Another strategy is to use different Cassandra clusters for different workloads. Read requests can be sent to clusters optimized for read.

Lastly , Leveled compaction works better for read intensive loads where as Data tiered compaction is suited for time series data and when the there is steady write rate. Size tiered compaction is used with write intensive workloads. But there is no silver bullet. You have to try, measure and tune for optimal performance with your workload.


Related Blogs:

Cassandra vs HBase
Cassandra data model
Choosing Cassandra







Saturday, March 28, 2015

Apache Kafka : New producer API in 0.8.2

In Kafka version 0.8.2, there is a newer, better and faster version of the Producer API. You might recall from earlier blogs that the Producer is used to send messages to a topic. If you are new to Kafka, please read following blogs first.

Apache Kafka Introduction
Apache Kafka JAVA tutorial #1 

Some features of the new producer are :
  • Asynchronously send messages to a topic.
  • Send returns immediately. Producer buffers messages and sends them to broker in the background.
  • Thanks to buffering, many messages sent to broker at one time without waiting for responses.
  • Send method returns a Future<RecordMetadata>. RecordMetadata has information on the record like which partition it stored in and what the offset is.
  • Caller may optionally provide a callback, which gets called when the message is acknowledged.
  • Buffer can at times fill up. Buffer size is configurable and can be configured using the total.memory.bytes configuration property.
  • If the buffer fills up, the Producer can either block or throw an exception. The behavior is controlled by the block.on.buffer.full configuration property.
In the rest of the blog we will use Producer API to rewrite the Producer we wrote in tutorial #1

For this example, you will need the following

For this tutorial you will need

(1) Apache Kafka 0.8.2
(2) JDK 7 or higher. An IDE of your choice is optional
(3) Apache Maven
(4) Source code for this sample from https://github.com/mdkhanga/my-blog-code so you can look at working code.

In this tutorial we take the Producer we wrote in Step 5 Kafka tutorial 1 and rewrite it using the new API. We will send messages to a topic on a Kafka Cluster and consume it with the consumer we wrote in that tutorial.

Step 1: Step up a Kafka cluster and create a topic

If you are new to Kafka, you can read and follow the instructions in my tutorial 1 to setup a cluster and create a topic.

Step 2: Get the source code for tutorial 1,2,3 from https://github.com/mdkhanga/my-blog-code

Copy KafkaProducer.java to KafkaProducer082.java. We will port KafkaProducer082 to the new producer API.

Step 3: Write the new Producer

Update the maven dependencies in pom.xml.

For the new producer you will need

<dependency>
          <groupId>org.apache.kafka</groupId>
          <artifactId>kafka-clients</artifactId>
          <version>0.8.2.0</version>
 </dependency>

The rest of the client code also needs to be updated to 0.8.2.

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.10</artifactId>
    <version>0.8.2.0</version>
</dependency>

The new producer will not work if rest of the client uses 0.8.1 or lower versions.

Step 3.1: Imports

Remove the old imports and add these.


import org.apache.kafka.clients.producer.KafkaProducer ;
import org.apache.kafka.clients.producer.ProducerRecord;

Note the packages.

Step 3.2: Create the producer

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("request.required.acks", "1");

KafkaProducer producer = new KafkaProducer(props);

As in the past, you provide some configuration like which broker to connect to as Properties. The key and value serializers have to be provided. There are no default values.

Step 3.3: Send Messages

String date = "04092014" ;
String topic = "mjtopic" ;
     
for (int i = 1 ; i <= 1000000 ; i++) {
          
   String msg = date + " This is message " + i ;
   ProducerRecord data = new ProducerRecord(topic, 
            String.valueOf(i), msg);
            
    Future rs = producer.send(data, new Callback() {
        @Override
        public void onCompletion(RecordMetadata recordMetadata, Exception e) {

          System.out.println("Received ack for partition=" + recordMetadata.partition() +

           " offset = " + recordMetadata.offset()) ;
        }
      });

      try {
        RecordMetadata rm = rs.get();
        msg = msg + "  partition = " + rm.partition() +  " offset =" + rm.offset() ;
        System.out.println(msg) ;
      } catch(Exception e) {
        System.out.println(e) ;
      }

         
}

As mentioned earlier. The send is async and it will batch messages before sending to the broker. The send method immediately returns a Future that has the partition and offset in the partition for message send. We provide a callback to the send method whose onCompletion method is called when an acknowledgement for the message is received.

Step 4: Start the Consumer

mvn exec:java -Dexec.mainClass="com.mj.KafkaConsumer"

Step 5: Start the Producer

mvn exec:java -Dexec.mainClass="com.mj.KafkaProducer082" 


You should start seeing messages in the consumer.

In summary, the new producer API is asynchronous, scalable and returns useful metadata on the message sent.


Related Blogs:
Apache Kafka Introduction
Apache Kafka JAVA tutorial #1
Apache Kafka JAVA tutorial #2 
Apache Kafka JAVA tutorial #3


Friday, January 23, 2015

MongoDB tutorial #1 : Introduction

In the blog NoSQL, I provided an introduction to NoSql databases. We have discussed some NoSql databases such as HBase, Cassandra , Redis. In this blog, we discuss MongoDB, a document oriented database, which is in contrast to the key value stores we discussed  earlier. MongoDB is currently one of the more popular NoSql databases, primarily due to its ease of use and simpler programming model. But there have been reports that it lags in scalability or performance compared to other NoSql databases. And it has more moving parts. But its ease of use and low learning curve makes it an attractive choice in many scenarios.

The key features of MongoDB are:
  • The unit of storage like a record in relational databases or key-value pair in key value stores, is a document or more precisely a JSON document.  
    • { "employee_id":"12345",
    •    "name":"John doe",
    •    "department": "database team",
    •    "title":"architect",
    •    "start_date":"1/1/2015" }
  • Documents are stored in collections.
  • Collection can be indexed by field. 
  • Indexing support for faster queries.
  • No schema is required for the collection.
  • MongoDB is highly available using replication and automatic failover. Write happens to a primary server but can be replicated to multiple replicas. If the primary goes down, one of the replicas takes over as the primary.
  • Read operations can be scaled by sending the reads to the replicas as well.
  • Write operations are scaled by sharding.
  • Sharding is automatic.But has a couple of moving parts
    • Sharding is based on a key which is an indexed field or a indexed compound field.
    • Sharding can be range based or hash based. With range based, partitioning is based on key range, so that values close to each other are together.  With Hash based, the partioning is based on a hash of the key.
    • Data set is divided into chunks. Each shard manages some chunks
    • Query routers are used to send the request to the right shard.
    • Config servers hold meta data on which chunks are with which shard.
    • If a chunk grows too large, it is broken up. If some shards own more chunks than others, the cluster is automatically rebalanced by redistributing the chunks.
In the rest of the blog, let us fire up a mongodb instance, create some data and learn how to query it.

Step 1: Download Mongo

You can download the server from www.mongodb.org/downloads.
I like to download the generic linux version and untar it.

Untar/unzip it to a directory of your choice.

Step 2 : Start the server

Decide on a directory to store the data. Say ~/mongodata. Create the directory.

Change to the directory where you installed mongo. To start the server, type the command.

bin/mongod -dbpath ~/mongodata

Step 3: Start the mongo client

bin/mongo

Step 4: Create and insert some data into a collection

Create and use a database.
> use testDb ;

Create a employee document and insert into the employees collection.
> emp1 = { "employee_id":"12345", "name":"John doe", "department": "database team", "title":"architect", "start_date":"1/1/2015" }
> db.employees.insert(emp1)

Retrieve the document.
> db.employees.find()
{ "_id" : ObjectId("54c2de34426d3d4ea1226498"), "employee_id" : "12345", "name" : "John doe", "department" : "database team", "title" : "architect", "start_date" : "1/1/2015" }

Step 5 : Insert a few more employees

> emp2 = { "employee_id":"12346", "name":"Ste Curr", "department": "database team", "title":"developer1", "start_date":"12/1/2013" }
> db.employees.insert(emp2)

> emp3 = { "employee_id":"12347", "name":"Dre Grin", "department": "QA team", "title":"developer2", "start_date":"12/1/2011" }
> db.employees.insert(emp3)

> emp4 = { "employee_id":"12348", "name":"Daev Eel", "department": "Build team", "title":"developer3", "start_date":"12/1/2010" }
> db.employees.insert(emp4)

Step 6: Queries

Query by attribute equality
> db.employees.find({"name" : "Ste Curr"} )
{ "_id" : ObjectId("54c2e0de426d3d4ea1226499"), "employee_id" : "12346", "name" : "Ste Curr", "department" : "database team", "title" : "developer1", "start_date" : "12/1/2013"  }

Query by attribute with regex condition
> db.employees.find({"department":{$regex : "data*"}})
{ "_id" : ObjectId("54c2de34426d3d4ea1226498"), "employee_id" : "12345", "name" : "John doe", "department" : "database team", "title" : "architect", "start_date" : "1/1/2015" }
{ "_id" : ObjectId("54c2e0de426d3d4ea1226499"), "employee_id" : "12346", "name" : "Ste Curr", "department" : "database team", "title" : "developer1", "start_date" : "12/1/2013" }

Query using less than , greater than conditions
> db.employees.find({"employee_id":{$gte : "12347"}})
{ "_id" : ObjectId("54c2e382426d3d4ea122649a"), "employee_id" : "12347", "name" : "Dre Grin", "department" : "QA team", "title" : "developer2", "start_date" : "12/1/2011" }
{ "_id" : ObjectId("54c2e3af426d3d4ea122649b"), "employee_id" : "12348", "name" : "Daev Eel", "department" : "Build team", "title" : "developer3", "start_date" : "12/1/2010" }

> db.employees.find({"employee_id":{$lte : "12346"}})
{ "_id" : ObjectId("54c2de34426d3d4ea1226498"), "employee_id" : "12345", "name" : "John doe", "department" : "database team", "title" : "architect", "start_date" : "1/1/2015" }
{ "_id" : ObjectId("54c2e0de426d3d4ea1226499"), "employee_id" : "12346", "name" : "Ste Curr", "department" : "database team", "title" : "developer1", "start_date" : "12/1/2013" }

Step 7: Cursors

Iterate through results.
> var techguys = db.employees.find()
> while ( techguys.hasNext() ) printjson( techguys.next() )
{
    "_id" : ObjectId("54c2de34426d3d4ea1226498"),
    "employee_id" : "12345",
    "name" : "John doe",
    "department" : "database team",
    "title" : "architect",
    "start_date" : "1/1/2015"
}
.
.
.

Step 8: Delete records

Delete one record
> db.employees.remove({"employee_id" : "12345"})
WriteResult({ "nRemoved" : 1 })

Delete all records
> db.employees.remove({})
WriteResult({ "nRemoved" : 3 })

As you can see MongoDb is pretty easy to use. Download and give it a try. 




Friday, January 9, 2015

Apache Kafka JAVA tutorial #3: Once and only once delivery

In Apache Kafka introduction, I provided an architectural overview on the internet scale messaging broker. In JAVA tutorial 1, we learnt how to send and receive messages using the high level consumer API. In JAVA tutorial 2, We examined partition leaders and metadata using the lower level Simple consumer API.

A key requirement of many real world messaging applications is that a message should be delivered once and only once to a consumer. If you have used the traditional JMS based message brokers, this is generally supported out of the box, with no additional work from the application programmer. But Kafka has distributed architecture where the messages to a topic are partitioned for scalability and replicated for fault tolerance and hence the application programmer  has to do a little more to ensure once and only once delivery.

Some key features of the Simple Consumer API are:
  • To fetch a message, you need to know the partition and partition leader.
  • You can read messages in the partition several times.
  • You can read from the first message in the partition or from a known offset.
  • With each read, you are returned an offset where the next read can happen.
  • You can implement once and only once read, by storing the offsets with the message that was just read, thereby making the read transactional. In the event of a crash, you can recover because you know what message was last read and where the next one should be read.
  • Not covered in this tutorial, but the API lets you determine how many partitions there are for a topic and who the leader for each partition is. While fetching message, you connect to the leader. Should a leader go down, you need to fail over by determining who the new leader is, connect to it and continue consuming messages
For this tutorial you will need

(1) Apache Kafka 0.8.1
(2) Apache Zookeeper
(3) JDK 7 or higher. An IDE of your choice is optional
(4) Apache Maven
(5) Source code for this sample from https://github.com/mdkhanga/my-blog-code if you want to look at working code

In this tutorial, we will
(1) start a Kafka broker
(2) create a topic with 1 partition
(3) Send a messages to the topic
(4) Write a consumer using Simple API to fetch messages.
(5) Crash the consumer and restart it ( several times). Each time you will see that it reads the next message after the last one that was read.

Since we are focusing of reading messages from a particular offset in a partition, we will keep other things simple by limiting ourselves to 1 broker and 1 partition.

Step 1: Start the broker


bin/kafka-server-start.sh config/server1.properties

For the purposes of this tutorial, one broker is sufficient as we are reading from just one partition.

Step 2: Create the topic

bin/kafka-topics.sh --create --zookeeper localhost:2181 --partitions 1 --topic atopic

Again for the purposes of this tutorial we just need 1 partition.

Step 3: Send messages to the topic

Run the producer we wrote in tutorial 1 to send say 1000 messages to this topic.

Step 4: Write a consumer using SimpleConsumer API

The complete code is in the file KafkaOnceAndOnlyOnceRead.java.

Create a file to store the next read offset. 

static {
    try {
      readoffset = new RandomAccessFile("readoffset", "rw");
    } catch (Exception e) {
      System.out.println(e);
    }

}

Create a SimpleConsumer.

SimpleConsumer consumer = new SimpleConsumer("localhost", 9092, 100000, 64 * 1024, clientname);

If there is a offset stored in the file, we will read from the offset. Otherwise, we read from the beginining of the partition -- EarliestTime.

 long offset_in_partition = 0 ;
    try {
      offset_in_partition = readoffset.readLong();
    } catch(EOFException ef) {
      offset_in_partition =     getOffset(consumer,topic,partition,kafka.api.OffsetRequest.EarliestTime(),clientname) ;
    }


The rest of the code is in a

while (true) {

}

loop. We will keep reading messages or sleep if there are none.

Within the loop, we create a request and fetch messages from the offset.

FetchRequest req = new FetchRequestBuilder()
          .clientId(clientname)
          .addFetch(topic, partition, offset_in_partition, 100000).build();
FetchResponse fetchResponse = consumer.fetch(req);


Read messages from the response.

for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(topic, partition)) {
        long currentOffset = messageAndOffset.offset();
        if (currentOffset < offset_in_partition) {
          continue;
        }
        offset_in_partition = messageAndOffset.nextOffset();
        ByteBuffer payload = messageAndOffset.message().payload();

        byte[] bytes = new byte[payload.limit()];
        payload.get(bytes);
        System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8"));
        readoffset.seek(0);
        readoffset.writeLong(offset_in_partition);
        numRead++;
        messages++ ;

        if (messages == 10) {
          System.out.println("Pretend a crash happened") ;
          System.exit(0);
        }
  }


For each message that we read, we check that the offset is not less than the one we want to read from. If it is, we ignore the message. For efficiency, Kafka batches messages. So you can get messages already read. For each valid message, we print it and write the next read offset to the file. If the consumer were to crash, when restarted, it would start reading from the last saved offset.

For demo purposes, the code exits after 10 messages. If you run this program several times, you will see that it starts reading exactly from where it last stopped. You can change that value and experiment.

Step 5: Run the consumer several times.

mvn exec:java -Dexec.mainClass="com.mj.KafkaOnceAndOnlyOnceRead"

210: 04092014 This is message 211
211: 04092014 This is message 212
212: 04092014 This is message 213
213: 04092014 This is message 214
214: 04092014 This is message 215
215: 04092014 This is message 216
216: 04092014 This is message 217
217: 04092014 This is message 218
218: 04092014 This is message 219
219: 04092014 This is message 220


run it again 

mvn exec:java -Dexec.mainClass="com.mj.KafkaOnceAndOnlyOnceRead"

220: 04092014 This is message 221
221: 04092014 This is message 222
222: 04092014 This is message 223
223: 04092014 This is message 224
224: 04092014 This is message 225
225: 04092014 This is message 226
226: 04092014 This is message 227
227: 04092014 This is message 228
228: 04092014 This is message 229
229: 04092014 This is message 230


In Summary, it is possible to implement one and only once delivery of messages in Kafka by storing the read offset.

Related Blogs:

Apache Kafka Introduction
Apache Kafka JAVA tutorial #1
Apache Kafka JAVA tutorial #2 
Apache Kafka 0.8.2 New Producer API  

Thursday, November 20, 2014

Apache Kafka Java tutorial #2

In the blog Kafka introduction, I provided an overview of the features of Apache Kafka, an internet scale messaging broker. In Kafka tutorial #1, I provide a simple java programming example for sending and receiving messages using the high level consumer API.  Kafka also provides a Simple consumer API that provides greater control to the programmer for reading messages and partitions. Simple is a misnomer and this is a complicated API. SimpleConsumer connects directly to the leader of a partition and is able to fetch messages from an offset. Knowing the leader for a partition is a preliminary step for this. And if the leader goes down, you can recover and connect to the new leader.

In the tutorial, we will use the "Simple" API to find the lead broker for a topic partition.

To recap some Kafka concepts
  • Broker in Kafka is a cluster of brokers
  • Messages are sent to and received from topics
  • Topics are partitioned across brokers
  • For each partition there is 1 leader broker and 1 or more replicas
  • Ordering of messages is maintained only within a partition
To manage read positions within a topic, it has to be done at partition level and You need to know the leader for that partition.

For this tutorial you will need

(1) Apache Kafka 0.8.1
(2) Apache Zookeeper
(3) JDK 7 or higher. An IDE of your choice is optional
(4) Apache Maven
(5) Source code for this sample from https://github.com/mdkhanga/my-blog-code if you want to look at working code

In this tutorial, we will
(1) create a 3 node kafka cluster
(2) create a topic with 12  partitions
(3) Write code to determine the leader of the partition
(4) Run the code to determine the leaders of each partition.
(5) Kill one broker and run again to determine the new leaders

Note that Kafka-topics --describe command lets you do the same. But we are doing it programatically for the sake of learning and because it is useful is some usecases.

Step 1 : Create a cluster

Follow the instruction is tutorial 1 to create a 3 node cluster.

Step 2 : Create a topic with 12 partitions

/usr/local/kafka/bin$ kafka-topics.sh --create --zookeeper host1:2181 --replication-factor 2 --partitions 12 --topic mjtopic

Step 3 : Write code to determine the leader for each partition

We use the SimpleConsumer API.
PartitionLeader.java

import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.consumer.SimpleConsumer;


SimpleConsumer consumer = new SimpleConsumer("localhost", 9092, 
        100000, 64 * 1024,  "leaderLookup");
List topics = Collections.singletonList("mjtopic");
TopicMetadataRequest req = new TopicMetadataRequest(topics);
kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);

List metaData = resp.topicsMetadata();
int[] leaders = new int[12] ;

 for (TopicMetadata item : metaData) {
      for (PartitionMetadata part : item.partitionsMetadata()) {
          leaders[part.partitionId()] = part.leader().id() ;
      }
 }
for (int j = 0 ; j < 12 ; j++) {
      System.out.println("Leader for partition " + j + " " + leaders[j]) ;

}

SimpleConsumer can connect to any broker that is online. We construct a TopicMetadataRequest with the topic we are interested in and send it to broker with the consumer.send call. A TopicMetaData is returned which contains a set of PartitionMetaData ( one for each partition ). Each PartitionMetaData has the leader and replicas for that partition.

Step 4 : Run the code 

Leader for partition 0 1
Leader for partition 1 2
Leader for partition 2 3
Leader for partition 3 1
Leader for partition 4 2
Leader for partition 5 3
Leader for partition 6 1
Leader for partition 7 2
Leader for partition 8 3
Leader for partition 9 1
Leader for partition 10 2
Leader for partition 11 3


Step 5 : Kill node 3 and run the code again

Leader for partition 0 1
Leader for partition 1 2
Leader for partition 2 1
Leader for partition 3 1
Leader for partition 4 2
Leader for partition 5 1
Leader for partition 6 1
Leader for partition 7 2
Leader for partition 8 1
Leader for partition 9 1
Leader for partition 10 2
Leader for partition 11 1


You can see the broker 1 has assumed leadership for broker 3's partitions.

In summary, one of the things you can use the SimpleConsumer API is to examine topic partition metadata. We will use this code in future tutorials to determine the leader of a partition.

Related blogs:

Apache Kafka Introduction
Apache Kafka JAVA tutorial #1
Apache Kafka JAVA tutorial #3 
Apache Kafka 0.8.2 New Producer API 






Friday, October 10, 2014

ServletContainerInitializer : Discovering classes in your Web Application

In my blog on java.util.ServiceLoader, we discussed how it can be used to discover third party implementations of your interfaces. This can be useful if your application is a container that executes code written by developers. In this blog, we discuss dynamic discovery and registration for Servlets.

All Java Web developers are already familiar with javax.servlet.ServletContextListerner interface. If you want to do initialization when the application starts or clean up when it is destroyed, you implement the contextInitialized and contextDestroyed methods of this interface.

In Servlet 3.0 specification, they added a couple interesting features that help with dynamicity, that are particularly useful to developers of libraries or containers.

(1) javax.servlet.ServletContainerInitializer is another interface that can notify your code of application start.

Library or container developers typically provide an implementation of this interface. The implementation should be annotated with the HandlesTypes annotation. When the application starts, the Servlet container calls the OnStart method of this interface, passing in as a parameter a set of all classes that implement, extend or are annotated with the type(s) declared in the HandlesTypes annotation.

(2) The specification also add a number of methods to dynamically register Servlets, filters and listeners. You will recall that previously, if you needed to add a new Servlet to you application, you needed to modify web.xml.

Combining (1) and (2), it should be possible to dynamically discover and add Servlets to a web application. This is a powerful feature that allows you to make the web application modular and spread development across teams without build dependencies. Note that this technique can be used to discover any interface, class or annotation. I am killing 2 birds with one stone by using this to discover servlets.

In the rest of the blog, we will build a simple web app, that illustrates the above concepts. For this tutorial you will need

(1) JDK 7.x or higher
(2) Apache Tomcat or any Servlet container
(3) Apache Maven

In this example we will

(1) We will implement SevletContainerInitializer called WebContainerInitializer and package it in a jar containerlib.jar.
(2)  To make the example interesting, we will create a new annotation @MyServlet, which will act like the @WebServlet annotation in the servlet specification. WebContainerInitializer will handle types that are annotated with @MyServlet.
(3) We will write a simple web app that has a Servlet annotated with @MyServlet and has containerlib.jar in the lib directory. No entries in web.xml.
(4) When the app starts, the servlet is discovered and registered. You can go to a browser and invoke it.

Before we proceed any further, you may download the code from my github respository, So you can look at the code as I explain. The code for this example is in the dynamicservlets directory.

Step 0: Get the code

git clone https://github.com/mdkhanga/my-blog-code.git

dynamicservlets has 2 subdirectories: containerlib and dynamichello.

The containerlib project has the MyServlet annotation and the WebContainerInitializer which implements ServletContainerInitializer.

DynamicHello is a web application that uses containerlib jar.

Step 1: The MyServlet annotation
MyServlet.java
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface MyServlet {   
    String path() ;
}

The annotation applies to classes and is used as
@MyServlet(path = "/someuri")

Step 2: A Demo servlet
HelloWorldServlet.java
@MyServlet(path = "/greeting")
public class HelloWorldServlet extends HttpServlet {
     

    protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
        PrintWriter p = response.getWriter() ;
        p.write(" hello world ");
        p.close();
    }
   
}


This is a simple hello servlet that we discover and register. Nothing needs to be added to web.xml.

Step 3: WebContainerInitializer
WebContainerInitializer.java
This is the implementation of ServletContainerInitializer.

@HandlesTypes({MyServlet.class})
public class WebContainerInitializer implements ServletContainerInitializer {

    public void onStartup(Set> classes, ServletContext ctx)
            throws ServletException {
       
        for (Class c : classes) {
            MyServlet ann = (MyServlet)c.getAnnotation(MyServlet.class) ;       
            ServletRegistration.Dynamic d = ctx.addServlet("hello", c) ;
            d.addMapping(ann.path()) ;
           
        }

    }


The implementation needs to be in separate jar and included as a jar in the lib directory of the application war. WebContainerInitializer is annotated with @HandleTypes that takes MyServlet.class as parameter. When the application starts, the servlet container finds all classes that are annotated with MyServlet and passes them to the onStartup method. In the onStartup method, we go through each class found by the container, get the value of the path attribute from the annotation and register the servlet.

To make this work, we need one more thing, which is in the META-INF/services directory, a file whose name is javax.servlet.ServletContainerInitializer, which contains 1 line com.mj.WebContainerInitializer. If you are wondering why this is required, please see my this blog.

Step 4: Build and run the app

To build,
cd containerlib
mvn clean install
cd dynamichello
mvn clean install

This builds dynamichello/target/dynamichello.war that can be deployed to tomcat or any servlet container.
When the application starts, you will see the following messages in the log

Initializing container app .....
Found ...com.mj.servlets.HelloWorldServlet
path = /greeting


Point you browser to http://localhost:8080/hello/greeting.

The servlet will respond with a hello message.

In summary, this technique can be used to dynamically discover classes during application startup. This is typically used to implement libraries or containers such as JAX-RS implementation. This allows implementations to be provided by different developers. There is no hard wiring.

Saturday, September 20, 2014

Discovering third party API/SPI implementations using java.util.ServiceLoader

One interface, many implementations is a very well known object oriented programming paradigm. If you write the implementations yourself then you know what  those implementations are and you can write a factory class or method that creates and returns the right implementation. You might also make this config driven and inject the correct implementation based on configuration.

What if third parties are providing implementations of your interface? If you know those implementations in advance, then you could do the same as in the case above. But one downside is that code change is required to add or use new implementations or to remove them. You could come up with a configuration file, where implementations are listed and your code uses the list to determine what is available. Downside is that configuration has to be updated by you and this is non standard approach, in that, every API developer could come up with his own format for the configuration. Fortunately JAVA has a solution.

In JDK6, they introduced java.util.ServiceLoader, a class for discovering and loading classes.

It has a static load method that can be used to create a ServiceLoader that will find and load all of a particular Type.

public static<T> ServiceLoader<T> load(Class<T> service)

You would use it as
ServiceLoader<SortProvider> sl = ServiceLoader.load(SortProvider.class) ;
This creates a ServiceLoader that can find and load every SortProvider in the classpath.

The Iterator method returns an Iterator to the implementations founds that will be loaded lazily.
Iterator<SortProvider> it_sl = sl.Iterator() ;

You can iterate over what is found and store it in a Map or somewhere else in memory.
while (its.hasNext()) {
            SortProvider sp = its.next() ;
            log("Found provider " + sp.getProviderName()) ;
            sMap.put(sp.getProviderName(),sp) ;
}

How does ServiceLoader know where to look ?
  • Implementors package their implementation in a jar
  • jar should have a META-INF/services directory
  • services directory should have a file whose name is the fully qualified name of the Type
  • file has a list of fully qualified name of implementations of type
  • jar is installed to the classpath
I have a complete API/SPI example for a Sort interface below that you can download at https://github.com/mdkhanga/my-blog-code. This sample is in msort directory. You should download the code first, so that you can look at code while reading the text below. This example illustrates how ServiceLoader is used to discover implementations from third party service providers. Sort interface can be used for sorting data. Service providers can provide implementations of various Sort algorithms. In the example,

1. com.mj.msort.Sort is the main Sort API. It has 2 sort methods. One for Arrays and one of
 collections. 2 implementations are provided - bubblesort and mergesort. But anybody can write additional implementations.
 
2. com.mj.msort.spi.SortProvider is the SPI.Third party implementors of Sort must also implement the SortProvider interface. The SPI provides another layer of encapsulation. We don't want to know the implementation details. We just want an instance of the implementation.

3. SPI providers need to implement Sort and SortProvider.

4. com.mj.msort.SortServices is a class that can discover and load SPI implementations and make them available to API users. It uses java.util.ServiceLoader to load SortProviders. Hence SortProvider also needs to be packaged as required by java.util.ServiceLoader for it to be discovered.

This is the class that brings everything together. It uses ServiceLoader to find all implementations of SortProviders and stores them in a Map. It has a getSort method that programmers can call to get a specific implementation or whatever is there.

5.  Sample Usage

Sort s = SortServices.getSort(...
s.sort(...

In summary, ServiceLoader is a powerful mechanism to find and load classes of a type. It can used to build highly extensible and dynamic services. As an additional exercise, you can create your own implementation of SortProvider in your own jar and SortServices will find it as long as it is on the classpath.