Friday, January 23, 2015

MongoDB tutorial #1 : Introduction

In the blog NoSQL, I provided an introduction to NoSql databases. We have discussed some NoSql databases such as HBase, Cassandra , Redis. In this blog, we discuss MongoDB, a document oriented database, which is in contrast to the key value stores we discussed  earlier. MongoDB is currently one of the more popular NoSql databases, primarily due to its ease of use and simpler programming model. But there have been reports that it lags in scalability or performance compared to other NoSql databases. And it has more moving parts. But its ease of use and low learning curve makes it an attractive choice in many scenarios.

The key features of MongoDB are:
  • The unit of storage like a record in relational databases or key-value pair in key value stores, is a document or more precisely a JSON document.  
    • { "employee_id":"12345",
    •    "name":"John doe",
    •    "department": "database team",
    •    "title":"architect",
    •    "start_date":"1/1/2015" }
  • Documents are stored in collections.
  • Collection can be indexed by field. 
  • Indexing support for faster queries.
  • No schema is required for the collection.
  • MongoDB is highly available using replication and automatic failover. Write happens to a primary server but can be replicated to multiple replicas. If the primary goes down, one of the replicas takes over as the primary.
  • Read operations can be scaled by sending the reads to the replicas as well.
  • Write operations are scaled by sharding.
  • Sharding is automatic.But has a couple of moving parts
    • Sharding is based on a key which is an indexed field or a indexed compound field.
    • Sharding can be range based or hash based. With range based, partitioning is based on key range, so that values close to each other are together.  With Hash based, the partioning is based on a hash of the key.
    • Data set is divided into chunks. Each shard manages some chunks
    • Query routers are used to send the request to the right shard.
    • Config servers hold meta data on which chunks are with which shard.
    • If a chunk grows too large, it is broken up. If some shards own more chunks than others, the cluster is automatically rebalanced by redistributing the chunks.
In the rest of the blog, let us fire up a mongodb instance, create some data and learn how to query it.

Step 1: Download Mongo

You can download the server from
I like to download the generic linux version and untar it.

Untar/unzip it to a directory of your choice.

Step 2 : Start the server

Decide on a directory to store the data. Say ~/mongodata. Create the directory.

Change to the directory where you installed mongo. To start the server, type the command.

bin/mongod -dbpath ~/mongodata

Step 3: Start the mongo client


Step 4: Create and insert some data into a collection

Create and use a database.
> use testDb ;

Create a employee document and insert into the employees collection.
> emp1 = { "employee_id":"12345", "name":"John doe", "department": "database team", "title":"architect", "start_date":"1/1/2015" }
> db.employees.insert(emp1)

Retrieve the document.
> db.employees.find()
{ "_id" : ObjectId("54c2de34426d3d4ea1226498"), "employee_id" : "12345", "name" : "John doe", "department" : "database team", "title" : "architect", "start_date" : "1/1/2015" }

Step 5 : Insert a few more employees

> emp2 = { "employee_id":"12346", "name":"Ste Curr", "department": "database team", "title":"developer1", "start_date":"12/1/2013" }
> db.employees.insert(emp2)

> emp3 = { "employee_id":"12347", "name":"Dre Grin", "department": "QA team", "title":"developer2", "start_date":"12/1/2011" }
> db.employees.insert(emp3)

> emp4 = { "employee_id":"12348", "name":"Daev Eel", "department": "Build team", "title":"developer3", "start_date":"12/1/2010" }
> db.employees.insert(emp4)

Step 6: Queries

Query by attribute equality
> db.employees.find({"name" : "Ste Curr"} )
{ "_id" : ObjectId("54c2e0de426d3d4ea1226499"), "employee_id" : "12346", "name" : "Ste Curr", "department" : "database team", "title" : "developer1", "start_date" : "12/1/2013"  }

Query by attribute with regex condition
> db.employees.find({"department":{$regex : "data*"}})
{ "_id" : ObjectId("54c2de34426d3d4ea1226498"), "employee_id" : "12345", "name" : "John doe", "department" : "database team", "title" : "architect", "start_date" : "1/1/2015" }
{ "_id" : ObjectId("54c2e0de426d3d4ea1226499"), "employee_id" : "12346", "name" : "Ste Curr", "department" : "database team", "title" : "developer1", "start_date" : "12/1/2013" }

Query using less than , greater than conditions
> db.employees.find({"employee_id":{$gte : "12347"}})
{ "_id" : ObjectId("54c2e382426d3d4ea122649a"), "employee_id" : "12347", "name" : "Dre Grin", "department" : "QA team", "title" : "developer2", "start_date" : "12/1/2011" }
{ "_id" : ObjectId("54c2e3af426d3d4ea122649b"), "employee_id" : "12348", "name" : "Daev Eel", "department" : "Build team", "title" : "developer3", "start_date" : "12/1/2010" }

> db.employees.find({"employee_id":{$lte : "12346"}})
{ "_id" : ObjectId("54c2de34426d3d4ea1226498"), "employee_id" : "12345", "name" : "John doe", "department" : "database team", "title" : "architect", "start_date" : "1/1/2015" }
{ "_id" : ObjectId("54c2e0de426d3d4ea1226499"), "employee_id" : "12346", "name" : "Ste Curr", "department" : "database team", "title" : "developer1", "start_date" : "12/1/2013" }

Step 7: Cursors

Iterate through results.
> var techguys = db.employees.find()
> while ( techguys.hasNext() ) printjson( )
    "_id" : ObjectId("54c2de34426d3d4ea1226498"),
    "employee_id" : "12345",
    "name" : "John doe",
    "department" : "database team",
    "title" : "architect",
    "start_date" : "1/1/2015"

Step 8: Delete records

Delete one record
> db.employees.remove({"employee_id" : "12345"})
WriteResult({ "nRemoved" : 1 })

Delete all records
> db.employees.remove({})
WriteResult({ "nRemoved" : 3 })

As you can see MongoDb is pretty easy to use. Download and give it a try. 

Friday, January 9, 2015

Apache Kafka JAVA tutorial #3: Once and only once delivery

In Apache Kafka introduction, I provided an architectural overview on the internet scale messaging broker. In JAVA tutorial 1, we learnt how to send and receive messages using the high level consumer API. In JAVA tutorial 2, We examined partition leaders and metadata using the lower level Simple consumer API.

A key requirement of many real world messaging applications is that a message should be delivered once and only once to a consumer. If you have used the traditional JMS based message brokers, this is generally supported out of the box, with no additional work from the application programmer. But Kafka has distributed architecture where the messages to a topic are partitioned for scalability and replicated for fault tolerance and hence the application programmer  has to do a little more to ensure once and only once delivery.

Some key features of the Simple Consumer API are:
  • To fetch a message, you need to know the partition and partition leader.
  • You can read messages in the partition several times.
  • You can read from the first message in the partition or from a known offset.
  • With each read, you are returned an offset where the next read can happen.
  • You can implement once and only once read, by storing the offsets with the message that was just read, thereby making the read transactional. In the event of a crash, you can recover because you know what message was last read and where the next one should be read.
  • Not covered in this tutorial, but the API lets you determine how many partitions there are for a topic and who the leader for each partition is. While fetching message, you connect to the leader. Should a leader go down, you need to fail over by determining who the new leader is, connect to it and continue consuming messages
For this tutorial you will need

(1) Apache Kafka 0.8.1
(2) Apache Zookeeper
(3) JDK 7 or higher. An IDE of your choice is optional
(4) Apache Maven
(5) Source code for this sample from if you want to look at working code

In this tutorial, we will
(1) start a Kafka broker
(2) create a topic with 1 partition
(3) Send a messages to the topic
(4) Write a consumer using Simple API to fetch messages.
(5) Crash the consumer and restart it ( several times). Each time you will see that it reads the next message after the last one that was read.

Since we are focusing of reading messages from a particular offset in a partition, we will keep other things simple by limiting ourselves to 1 broker and 1 partition.

Step 1: Start the broker

bin/ config/

For the purposes of this tutorial, one broker is sufficient as we are reading from just one partition.

Step 2: Create the topic

bin/ --create --zookeeper localhost:2181 --partitions 1 --topic atopic

Again for the purposes of this tutorial we just need 1 partition.

Step 3: Send messages to the topic

Run the producer we wrote in tutorial 1 to send say 1000 messages to this topic.

Step 4: Write a consumer using SimpleConsumer API

The complete code is in the file

Create a file to store the next read offset. 

static {
    try {
      readoffset = new RandomAccessFile("readoffset", "rw");
    } catch (Exception e) {


Create a SimpleConsumer.

SimpleConsumer consumer = new SimpleConsumer("localhost", 9092, 100000, 64 * 1024, clientname);

If there is a offset stored in the file, we will read from the offset. Otherwise, we read from the beginining of the partition -- EarliestTime.

 long offset_in_partition = 0 ;
    try {
      offset_in_partition = readoffset.readLong();
    } catch(EOFException ef) {
      offset_in_partition =     getOffset(consumer,topic,partition,kafka.api.OffsetRequest.EarliestTime(),clientname) ;

The rest of the code is in a

while (true) {


loop. We will keep reading messages or sleep if there are none.

Within the loop, we create a request and fetch messages from the offset.

FetchRequest req = new FetchRequestBuilder()
          .addFetch(topic, partition, offset_in_partition, 100000).build();
FetchResponse fetchResponse = consumer.fetch(req);

Read messages from the response.

for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(topic, partition)) {
        long currentOffset = messageAndOffset.offset();
        if (currentOffset < offset_in_partition) {
        offset_in_partition = messageAndOffset.nextOffset();
        ByteBuffer payload = messageAndOffset.message().payload();

        byte[] bytes = new byte[payload.limit()];
        System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8"));;
        messages++ ;

        if (messages == 10) {
          System.out.println("Pretend a crash happened") ;

For each message that we read, we check that the offset is not less than the one we want to read from. If it is, we ignore the message. For efficiency, Kafka batches messages. So you can get messages already read. For each valid message, we print it and write the next read offset to the file. If the consumer were to crash, when restarted, it would start reading from the last saved offset.

For demo purposes, the code exits after 10 messages. If you run this program several times, you will see that it starts reading exactly from where it last stopped. You can change that value and experiment.

Step 5: Run the consumer several times.

mvn exec:java -Dexec.mainClass="com.mj.KafkaOnceAndOnlyOnceRead"

210: 04092014 This is message 211
211: 04092014 This is message 212
212: 04092014 This is message 213
213: 04092014 This is message 214
214: 04092014 This is message 215
215: 04092014 This is message 216
216: 04092014 This is message 217
217: 04092014 This is message 218
218: 04092014 This is message 219
219: 04092014 This is message 220

run it again 

mvn exec:java -Dexec.mainClass="com.mj.KafkaOnceAndOnlyOnceRead"

220: 04092014 This is message 221
221: 04092014 This is message 222
222: 04092014 This is message 223
223: 04092014 This is message 224
224: 04092014 This is message 225
225: 04092014 This is message 226
226: 04092014 This is message 227
227: 04092014 This is message 228
228: 04092014 This is message 229
229: 04092014 This is message 230

In Summary, it is possible to implement one and only once delivery of messages in Kafka by storing the read offset.

Related Blogs:

Apache Kafka Introduction
Apache Kafka JAVA tutorial #1
Apache Kafka JAVA tutorial #2 
Apache Kafka 0.8.2 New Producer API