Friday, March 28, 2014

10 Tips for building low latency applications

In this previous blog on low latency I described 5 tips for building low latency applications. Read that for the first 5 tips. Here are 5 more tips.

6. Co-locate services

Networks hops add latency. A network call to another server on a different subnet or datacenter can add a few milli-seconds to your response and affect SLA. Install dependent services on the same server or same rack and definitely the same data center.

7. Geographically distribute customer facing services

This might sound contradictory to item 6. But it is not. A round trip over the internet from New York to San Francisco takes 80-90 milli seconds. If your servers are in San Francisco, a user in New York will see some latency even without the server doing any work. Users in New york should be served from servers near New York so their time is not wasted on the round trip. To ensure the rule 6 is not violated, this might mean replicating the dependencies such as the database so that the server in New York is able to serve from a database that is close to it.

As your user base grows, you many need to distribute the services to several locations - east coast US, west coast US , Europe , Asia Pacific and so on.

8. Reduce serialization / de-serialization

Network calls, cross process , cross JVM calls all involve serialization and de-serialization of data which is expensive. Care should be taken to reduce and limit serialization/de-serialization to only required data and to delay to only when required. If you store your data as large blogs, then when you need a small piece of data, you end up serializing de-serializing the entire blog. A few years ago, when XML bandwagon was in full swing, there were many products using XML for RPC. They soon realized that while XML good for reading text, it adds way too much overhead for serialization/de-serialization.

9. Tolerate weak consistency with application design

A system with strong consistency ( think ACID and RDBMS) requires locking data during updates. This mean other writes and readers may need to wait at times. Waiting implies increased latency.

Weak consistency means a reader may not always read the latest updated data. In reality many systems can tolerate this. Weak consistency systems generally do not involve locking. They allow more concurrent readers and writers. They are be easily partitioned and distributed. For these reasons, they have lower latency for reads and writes.

10. Measure and tune

Most systems of any complexity have multiple components. In todays agile development model, developers are continuously deploying new releases of their sub components. If latency suddenly goes up, how do you know what caused it ?

It is important to continuously measure and monitor not only the end to end latency but also the latency contributed by the sub components. Know the averages and what deviations are tolerable. Set up alerts when ever there are deviations from mean. If a new component is released and suddenly the latency goes up, you know the likely culprit. As you user base grows , if you see gradual increases in latency, perhaps you need additional capacity. If users in a particular geographical location are complaining, then perhaps you need to replicated and deploy your service to that location.

In summary, ensuring low latency is a continuous and iterative process that is to be done through out the life of a system

Friday, February 28, 2014

Hadoop 2.x cluster setup tutorial

Recently I had the opportunity to setup a multi-node hadoop cluster. The apache documentation is a little thin and I had to spend several hours trouble shooting issues and googling for help before I got it right.  The purpose of this tutorial is to save time for those setting up a hadoop cluster for the first time. If you are new to hadoop, you may read my tutorial on single node setup at Hadoop 2.x tutorial. If you have never setup hadoop before, it is a good idea to to do a single node setup the first time and then try the multi node setup.

In this tutorial we will

(1) set up a multi node hadoop cluster with 4 servers
(2) To test, copy files to hdfs and cat the files.
(3) To test, run a simple map reduce job that we developed in the single node tutorial

Step 1: Download a stable hadoop release and untar it.

Download a stable 2.x.x release from http://hadoop.apache.org/releases.html#Download.
I downloaded hadoop-2.2.0.tar.gz.
Untar the file to a directory say ~/hadoop-2.2.0.

Step 2: Decide on the topology

For this tutorial , we shall setup a 4 node cluster.

Host : master
Host : slave1
Host : slave2
Host : slave3

On the master we will run the namenode, resourcemanager, datanode, nodemanager and historyserver. On the slaves , we run the datanode and nodemanager.

To make it more real world, we will bring up and test the cluster first with just master, slave1 and slave2. Typically you add more capacity as needed. So we will add slave3 after the fact.

Step 3: Ensure proper network connectivity

I am not going to cover networking details here. But it goes without saying, the master should be able to reach the slaves using their hostnames and the slaves should be able to reach the master. So you may have to add the hostname to ip address mapping in /etc/hosts.

Several startup scripts use ssh to connect and start processes on hosts. So ssh must be setup for password less login on all hosts.

Step 4: Set these environment variables
export HADOOP_HOME=path_to_hadoop_install_dir
export HADOOP_MAPRED_HOME=$HADOOP_HOME
export HADOOP_COMMON_HOME=$HADOOP_HOME
export HADOOP_HDFS_HOME=$HADOOP_HOME
export YARN_HOME=$HADOOP_HOME
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export YARN_CONF_DIR=$HADOOP_HOME/etc/hadoop
To hadoop_install_dir/libexec/hadoop-config.sh, Add
export JAVA_HOME=path_to_jdk
Step 5: Update the config files
These files are located at hadoop_install_dir/etc/hadoop

 core-site.xml
 This applies to all servers.
<configuration>
<property>
    <name>fs.default.name</name>
        <value>hdfs://master:9000</value>
    </property>
</configuration>


hdfs-site.xml
This applies to all servers.
<configuration>
<property>
    <name>dfs.replication </name>
    <value>2 </value>

</property >
<property>
   <name>dfs.permissions </name>
   <value>false </value>

</property>
<property>
  <name>dfs.namenode.name.dir </name>
  <value>file:/mnt/hadoop/data/namenode </value>

</property>
<property>
  <name>dfs.datanode.data.dir </name>
  <value>file:/mnt/hadoop/data/datanode </value>

</property>
</configuration>

dfs.namenode.name.dir points to location where namenode stores metadata. dfs.datanode.data.dir points to location where datanode stores data. It is important to put these directories on disks that have lots of free space ( terra bytes). Default block sizes are large and if there is not enough space you will encounter errors that do not clearly point out the space issue.

yarn-site.xml
This applies to all servers.
<configuration>
<property>
   <name>yarn.nodemanager.aux-services</name>
   <value>mapreduce_shuffle</value>

</property>
<property>
  <name>yarn.nodemanager.aux-services.mapreduce_shuffle.class</name>
  <value>org.apache.hadoop.mapred.ShuffleHandler</value>

</property>
<property>

  <name>yarn.resourcemanager.resource-tracker.address</name>
  <value>master:8025</value>

</property>
<property>
  <name>yarn.resourcemanager.scheduler.address</name>
  <value>master:8030</value>

</property>
<property>
  <name>yarn.resourcemanager.address</name>
  <value>master:8040</value>

</property>
</configuration>

The last 3 properties tell the nodemanager how to connect to the resourcemanager.

Only in the yarn-site.xml of the master, add
<property>
  <name>yarn.nodemanager.localizer.address</name>
  <value>master:8060
</property>
Without this, nodemanager will not start on the master.

mapred-site.xml
<configuration>
<property>

  <name>mapreduce.framework.name</name>
  <value>yarn</value>

</property>
</configuration>


slaves
This is necessary on the master only and is used by the scripts to start datanodes and nodemanagers on the servers listed.

master
slave1
slave2

Step 5: Start the processes

Change to the directory where hadoop is installed.
cd hadoop_install_dir

If you are running hadoop for the first time, the following command will format HDFS. Do not run this everytime as it formats and thus deletes any existing data

hadoop_install_dir$ bin/hadoop namenode -format

Start the namenode.
hadoop_install_dir$ sbin/hadoop-daemon.sh start namenode

Start the datanode.
hadoop_install_dir$ bin/hadoop-daemons.sh start datanode

Note that the script name is -daemons. This starts datanode on all the servers listed in the slaves file. If you use the -daemon script, it will only start the datanode on the server on which you ran the script.

In hadoop 2.x , there is no jobtracker. Instead there is a resourcemanager and a nodemanager.
Start the resourcemanager.
hadoop_install_dir$ sbin/yarn-daemon.sh start resourcemanager

Start the nodemanager.
hadoop_install_dir$ sbin/yarn-daemons.sh start nodemanager
As mentioned in the case of  datanode, the -daemons script will start the nodemanager on all servers listed in the slave file , where as -daemon script with start it only the server on which the script is executed.

Start the history server.
hadoop_install_dir$ sbin/mr-jobhistory-daemon.sh start historyserver

On the master, type jps. It lists the java processes running. Check that all the processes are started

hadoop_install_dir$ jps

1380 DataNode
1558 Jps
1433 ResourceManager
1536 JobHistoryServer
1335 NameNode
1849 NodeManager

Do the same on each of the slaves

hadoop_install_dir$ jps

1380 DataNode
1558 Jps
1849 NodeManager


The jps command is a good check to ensure all necessary processes are started.

You can use the following urls to see the state of the cluster.

For HDFS
http://master:50070
For YARN/Mapreduce
http://master:8088

Step 6: Test HDFS

The HDFS commands are documented at
http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/FileSystemShell.html

hadoop_install_dir$ bin/hdfs dfs -ls /

will list the root directory

hadoop_install_dir$ bin/hdfs dfs -mkdir /input
creates a directory input

In the local filesystem create a file app.log with the data

user01|1|2|3|4|5
user02|1|2|3|4|5
user03|1|2|3|4|5
user01|1|2|3|4|5
user02|1|2|3|4|5
user01|1|2|3|4|5
user03|1|2|3|4|5
user01|1|2|3|4|5
user04|1|2|3|4|5
user01|1|2|3|4|5

Let us pretend this is a log file from a web application where for each request we have logged userid and some additional data. We will later use this as input for a MapReduce program.
You can move it to hdfs using the command

hadoop_install_dir$ bin/hdfs dfs -moveFromLocal ~/projects/app.log /input/

To print the file just moved to hdfs
hadoop_install_dir$ bin/hdfs dfs -cat /input/app.log

Step 7: Run a map reduce program

Writing and running MR program is no different from what we did in the single node tutorial. See Step 6 and Step 7 of the single node tutorial at Hadoop 2.x tutorial.

Step 8: Add additional capacity

Your hadoop cluster is working well. But you are running out of space and your MR jobs are backed up because not enough mappers or reducers are available. Let us increase capacity by adding an additional server slave3.

On slave3, do
Step 1 -- untar that hadoop binaries
Step 3 -- ensure network connectivity
Step 4 -- set the environment variables
Step 5 -- setup the config files

Start the data node
hadoop_install_dir$ bin/hadoop-daemon.sh start datanode

Start the nodemanager
hadoop_install_dir$ sbin/yarn-daemon.sh start nodemanager

Use jps to check processes started. Use the web urls to check that the node got added to the cluster. 

Hoping these steps help jumpstart your hadoop cluster setup and save you time.

Wednesday, January 29, 2014

JAVA NIO Networking tutorial

In the NIO tutorial, I provided an overview of the nio package in JAVA that enables developers to build scalable applications using non blocking techniques. In this tutorial I will focus on the networking parts of the nio package. We will build a simple Echo server and Echo client using non blocking sockets.

The conventional way of writing networking applications is using Socket and ServerSocket classes in java.net package. If you are new to network programming, you may read the tutorial from Oracle at All about Sockets.  The Oracle tutorial cover Sockets where the IO thread blocks. You might say - this works for me and why should I care about non blocking sockets ? When writing a server using blocking sockets, once the server accepts a connection, a thread is created for the peer client socket which handles the IO with the client. There will be times when this thread is doing nothing but waiting for IO. That is not a big issue if the server just has a few clients. But if server needs to handle tens of thousands of concurrent clients, you will end creating thousands of threads that may or may not be doing work and this approach will exhaust the operating system resources.

In NIO networking:
  • The networking stuff such as accepting connections, reading from a peer socket, writing to a socket happens generally from a single thread
  • The processing of data that is read and preparing data to write is done in worker threads.
  • In the main thread a ServerSocketChannel registers an interest in events with a selector.
  • The selector waits for events to occur. Events could be socket requesting a connection, socket is ready for writing , socket is ready for reading.
  • When events occurs the select method returns and the main thread retrieves the event and the peer socket channel.
  • If the event is an accept event, the ServerSocketChannel might accept the connection and register an interest in a read event for the peer socket.
  • If the event is a read event, the thread can read the data from the peer socket and hand it to a worker thread for processing. It can further register an interest in waiting for an event where the socket can be written to.
  • If the event is a write event, the thread can take data intended for this peer that is queued somewhere and write it to the peer socket. It may then register an interest for the next read event from this peer.

Note that the read and writes happen only where the sockets are ready for reading or writing. Let us use the above concepts and write a simple EchoServer and EchoClient. The EchoClient sends lines of text to the EchoServer which echoes each line back.

EchoServer
The complete code is in the file EchoServer.java

Step 1: Create a ServerSocketChannel and bind the local address

        private ServerSocketChannel serverchannel ;
        private Selector selector ;
        private HashMap queuedWrites = 

                   new HashMap();

        serverchannel = ServerSocketChannel.open() ;
        serverchannel.configureBlocking(false) ;
        serverchannel.socket().bind(new InetSocketAddress("localhost",8081));  

The ServerSocketChannel is the channel on which the server will accept stream oriented connections. Selector is what lets you multiplex several connections without blocking for IO. The hashmap is used to queue data to be written on individual connections. This is necessary since we do not want to block on writes (or reads).

Step 2 : Create a selector and register interest in accept event. Run a loop waiting for events.

       selector = Selector.open();
       serverchannel.register(selector, SelectionKey.OP_ACCEPT) ;
           
       while(true) {        
                selector.select() ;
                Iterator keysIterator = selector.selectedKeys().iterator() ;
                while(keysIterator.hasNext()) {                 
                    SelectionKey key = keysIterator.next();
                    

                    // handle the event
                   if (key.isAcceptable()) {
                   } else if (key.isReadable()) {
                   } else if (key.isWritable()) {

                  }
                }

           }

The outer while loop is there so that the server stays up and running waiting for events to happen. In a nutshell, it outlines what a server program does. A Selector is created and we register an interest in the ACCEPT event. The select call is the only call in the program that blocks waiting for events to happen.  We are now waiting for connections. The select call returns when there is an event on a channel. The SelectionKeys tell which event on what channel. The server program takes action based on the event which could be read, write ,accept or connect.

Step 3. Accept a connection. Register interest in a read event.

        if (key.isAcceptable()) {               
                ServerSocketChannel ssc = (ServerSocketChannel)key.channel();
                SocketChannel sc = ssc.accept();
                sc.configureBlocking( false );
                sc.register( selector, SelectionKey.OP_READ );
       }

The SelectionKey has the peer channel. Code accepts connection for that channel. Once the connection is accepted, the server is going to have to read some data sent by the client. Hence we register interest in read event.

Step 4. If there is a read event, read the socket. Hand off the data to the worker queue. Register interest in a write event for the socket.

        if (key.isReadable()) { {
                SocketChannel sc = (SocketChannel)key.channel();
                ByteBuffer readBuffer = ByteBuffer.allocate(8192);                       
                int numread ;
                while (true) { // read till some data is available
                         numread = sc.read( readBuffer );
                         if (numread <=0) {
                              break;
                         }   
                }       
                if (numread == -1) {
                        // Remote entity shut the socket down cleanly. Do the
                        // same from our end and cancel the channel.
                        key.channel().close();
                        key.cancel();
                        continue ;
               }
               readBuffer.flip() ;
               queuedWrites.put(sc,readBuffer) ;   
               key.interestOps(SelectionKey.OP_WRITE) ;

The client has sent some data and the server reads it. Generally a server will want to send a response to the client. Since this is an EchoServer, the server just queues the data read for writing back when the channel is ready for writing.

Step 5. When socket is ready for write, get data from the queue and write it to the socket.

         if (key.isWritable())
                 SocketChannel sc = (SocketChannel)key.channel();     
                 ByteBuffer towrite = queuedWrites.get(sc) ;      
                 while (true) {
                          int n = sc.write(towrite) ;
                          if (n == 0 || towrite.remaining() == 0)
                                break ;
                  }     
                 key.interestOps(SelectionKey.OP_READ) ;
        }

After writing, be ready to read the next message from the client. To recap, the server is in a loop, accepting connections, reading and writing to channels.

EchoClient
The complete code is in the file EchoClient.java

Step 1: Create a Socket Channel and a Selector. Register interest in a Connect event.

        selector = Selector.open() ;
        clientChannel = SocketChannel.open();
        clientChannel.configureBlocking(false);
        clientChannel.register(selector, SelectionKey.OP_CONNECT) ;


Step 2 :  Initiate a connection to the EchoServer

       // Kick off connection establishment. Client initiates connection to server
        clientChannel.connect(new InetSocketAddress("localhost", 8081));


Step 3: Main client loop similar to the server loop

        while(true) {                     
                 // queue a write so it can be written when channel ready
                writeQueue.add("This is line " + i) ;
                selector.select() ;  // wait for events
           
                Iterator skeys = selector.selectedKeys().iterator() ;           
                while (skeys.hasNext()) {
                        SelectionKey key = (SelectionKey) skeys.next();
                        skeys.remove();
                        if (!key.isValid()) {
                             continue;
                        }

                       // Check what event is available and deal with it
                      if (key.isConnectable()) {  // server has accepted connection
                           finishConnection(key);
                      } else if (key.isReadable()) { // socket is ready for reading
                          read(key);
                     } else if (key.isWritable()) {  // socket ready for writing
                         write(key);
                    }
               }
        }


Step 3 : When a connect event is received from the server, finish the connection. Register interest in a Write event on the socket.

        private void finishConnection(SelectionKey key) throws IOException        
                clientChannel.finishConnect() ; // tcp handshake
                key.interestOps(SelectionKey.OP_WRITE) ;   
        }


Step 4 : When the socket is ready for write, write a queued message to the socket. Register interest in a read event from the socket.

        private void write(SelectionKey key) throws IOException {    
                String toWrite = writeQueue.pollFirst() ;       

                if (toWrite != null) {           
                        ByteBuffer b ;
                        b = ByteBuffer.wrap(toWrite.getBytes()) ;          
                       while (true) {
                              int n = clientChannel.write(b) ;

                              if (n == 0 || b.remaining() == 0)
                             break ;
                        }   
                  }
              key.interestOps(SelectionKey.OP_READ) ;

       }


Step 5: When the server echos the message, a read event is there on the socket. Read the message and print out the echo.

        public void read(SelectionKey key) throws IOException {
                readBuf.clear() ;       
                while (true) {
                       int numread = clientChannel.read( readBuf );
                       if (numread <=0) {
                             break;
                      }                     
               }
               System.out.println("Read Echo from server:" + new String(readBuf.array()))  ;       

               key.interestOps(SelectionKey.OP_WRITE) ;
        }

You will notice that in both the client and the server, the read and write operations are in a loop. This is because we working with TCP or stream oriented sockets. Each read returns only as much data there is the socket buffer which might be less than what you need, but more data might be arriving. Similarly on write, each write will write only as much data as much space there is in the socket buffer. So you might need to call write multiple times for all data in your buffer to be written. This is no different that traditional sockets programming.

In conclusion, writing NIO networking programs is a little more involved than traditional blocking sockets programming. But they will scale much more and handle many more concurrent connections.

Monday, December 23, 2013

Streaming Big Data using Storm

Storm is an open source distributed system for processing streams of data. In MapReduce and Hadoop tutorial we discussed the use of Hadoop MapReduce to process large quantities of data. MapReduce is a batch system that processes input files and produces output files. When the input files are processed, the map reduce program stops. When there are new input files, the program must be restarted. In the real world, data is produced continuously and results from processing are required immediately. A batch processing system like Map reduce introduces delays that are not tolerable.

In Storm, the framework components and data processing code provided by you is continuously running and processing streams of data, there by producing results in real time.

Storm cluster has two types of nodes. The master node also called nimbus distributes code to other nodes in the cluster, assigns tasks and monitors completion of tasks. Worker nodes execute the user code. Coordination between Master and workers happens using zookeeper.

Data processing code that is to be executed on a Storm cluster is packaged into what is called a topology. A topology is a graph where each node has some code to be executed and the edges indicate the next node(s) that should process the data.

The basic abstraction for data to be processed is the stream which is a sequence of tuples. A tuple is a set of elements. A Spout is a piece of code that produces tuples. A Bolt consumes tuples produced by a spout and does some processing on it and potentially emits processed tuples that can be consumed by other bolts. Spouts and bolts are combined to form a topology which is deployed to a storm cluster for execution.




Figure 1 is a basic storm topology. Spout A is code that reads messages from a message queue and makes each message available to the topology as tuples. The output of Spout A is sent to Bolt1 and Bolt2 which do some processing on the tuples. From the outputs of Bolt1 and Bolt2, some specific fields are sent to Bolt3 and others to Bolt4. Bolts 3 and 4 might  do some aggregation and write some values to a database or another message queue.

Spouts and bolts execute in parallel. Their instances are distributed across worker nodes in the cluster.

Storm guarantees that messages produced by a spout will be completely processed through the entire topology. In Figure 1, Spout A reads a message from a message queue. Each bolt has to acknowledge that it has processed the tuples it received. The message is not deleted from the queue until acknowledgements are received by spout A. If spout A receives a failure message instead of an acknowledgement, it would re deliver the message. Bolt 1 and bolt 2 may produce additional tuples that are delivered to Bolt 3 and 4. The new tuples need to be acknowledged as well for the processing of the original message to be considered complete. Storm maintains a graph that describes how the tuples are related, so that acknowledgements are handled correctly.

You can achieve realtime MapReduce functionality by coding some bolts to function as mappers and others to function as reducers. By using field groupings, you can tell Storm to send certain tuples to a specific Bolt task that does reducer work like aggregation.

To implement a Spout, you can extend the class BaseRichSpout and override the nextTuple method. To emit tuples, you call the emit method.

public class SpoutA extends BaseRichSpout {

     private SpoutOutputCollector _collector ;

     public void open(Map conf, TopologyContext ctx, SpoutOutputCollector c) {

          _collector = c ;
     }


     public void nextTuple() {

           // get a message from a queue
           // convert to a list of values
           this._collector.emit(new Values(value1,value2,value3,value4)) ;

    }

}

To implement a Bolt, you can extend the class BaseRichBolt and override the execute method

public class Bolt1 extends BaseRichBolt {

    private OutputCollector _collector ;

    public void prepare(Map conf, TopologyContext ctx, OutputCollector c) {

         _collector = c ;

     }
   
     public void execute(Tuple input) {

           // process values in input
         
           // write to q or db
           // or emit values

          _collector.emit(value1, value2) ;

          _collector.ack(input) ;  // acknowledge
    }

}

To Build a topology and main program :


public class SimpleTopology {

      public static void main(String[] args) {

              TopologyBuilder b = new TopologyBuilder() ;
              b.setSpout("source",new SpoutA(),10) ;

              b.setBolt("bolt1", new Bolt1() ,10).shuffleGrouping("source") ;
              b.setBolt("bolt2",new Bolt2(),10).shuffleGrouping("source") ;
              // shuffleGrouping means output from source is randomly distribute to bolt tasks
              b.setBolt("bolt3",new Bolt3(),10).fieldGrouping("bolt1",new   
              Values("field1").fieldGrouping("bolt2",new Values("field2") ;
              StormTopology st = b.createTopology() ;
              Config conf = new Config() ;

               LocalCluster cluster = new LocalCluster() ;
               cluster.submitTopology("sample",conf,st) ;
               Utils.sleep(1200000) ;
               cluster.killTopology("sample") ;
               cluster.shutdown() ;
      }

}

If you need a bolt to receive only certain fields, you can use FieldGrouping instead of shuffleGrouping.

To conclude, Storm is a framework that enables realtime processing of Big Data in a distributed environment. If you are currently using Map Reduce, then Storm might enable you to do some of that processing in real time.




   







Friday, November 22, 2013

JAVA Comparable vs Comparator

The java.lang.Comparable and java.util.Comparator interfaces provide similar function and there is sometimes confusion as to which interface to use where. Some documentation adds to the confusion by implying that the Comparable interface should be used for natural ordering where as Comparator should be used for total ordering without explaining what that means.

The Comparable interface is defined as

public interface Comparable<T> {
   int compareTo(T o) ;
}

The compareTo method is for comparing this object with the object that is passed in. Instances of classes that implement Comparable use this method to compare other instances to themselves.

Let us say class Employee implements Comparable. If an array of Employees is passed to Arrays.sort(Object[] o) methods, the sorting code calls the compareTo method to compare Employee objects with one another and put them in the correct order.  In fact the Arrays method sort(Object[] o) expects the objects to have implemented the Comparable interface and an exception is thrown if the objects do not implement Comparable.

class Employee implements Comparable<Employee> {

    private int id ;
    private String name ;

    // natural ordering  compare by id and by name
    // simplistic 
    public int compareTo(Employee e) {

        if ( id == e.id)
             return name.compareTo(e.name) ;
        
        return (id - e.id) ; 
   }

}

Generally the Comparable interface is used when

(1) You have control of the class and can make the class implement Comparable
(2) The objects need to be sorted in their natural order. Natural order is the order defined by the JVM. For number types everyone understand the order - 23 is greater than 22. For Strings it is the lexicographic order.
(3) Generally there is only one natural order for each type. In the above example, you can order by id,name. If you needed a second kind of ordering by just the id or just the name, it would not be possible with Comparable. But you could write multiple Comparators .

If you need to sort an array of objects which do not implement Comparable and the code for the classes is not under your control, you can write a new class that implements the Comparator interface and call the following method in java.util.Arrays.

public static void sort(T[] a , Comparator <? super T>) ;

interface Comparator<T> {

   int compare(T o1, T o2) ;

   boolean equals(Object o) ;
}

public class EmpIdComparator implements Comparator<Employee> {

   public int compare(Employee e1, Employee e2) {

          return e1.id - e2.id ;
    }

}

public class EmpNameComparator implements  Comparator<Employee> {

   public int compare(Employee e1, Employee e2) {

         return e1.name.compareTo(e2.name) ;

   }

}

If you need to sort employees by id , you can pass an instance of the EmpIdComparator to the sort methods. If you need employees sorted by name, use the EmpNameComparator.

Here the compare method lets you compare any two objects. The Comparator interface is used when

(1) You do not have control of the class or the class does not implement Comparable for some reason.
(2) You want ordering that is different from what is generally accepted as the natural order.
(3)  You need many different kinds of ordering.

All the sorting methods and collections that maintain order ( SortedMap, SortedSet ) expect the contained objects to either implement Comparable or you need to pass in another instance of a Comparator Object.

Lastly the Comparable and Comparator interfaces are useful design patterns that you can copy whenever you need to write code that applies to generics. Given a collection of type T and you need to do some operation on each type T. Consider defining a generic interface with a a generic method. The caller of your API can pass in an implementation for the interface for the appropriate type.

Friday, October 25, 2013

Apache Cassandra Data Model

This is an introduction to the Apache Cassandra data model. For the benefit of those not familiar with Cassandra,  it is an open source, highly scalable, highly available NoSQL database. Some key architectural highlights of  Cassandra are :

No Single point of failure.
No Master - All servers in cluster are equal.
Peer to peer communication between nodes to exchange data and configuration.
Data is partitioned across nodes based on consistent hash.
Data is automatically replicated.
(and recently added) SQL like data access model.

Cassandra has moved to a simple model that is described by a SQL like language called CQL. Lower level constructs like column family are no longer mentioned. Note that earlier column family models were without much of a schema. You needed to define column family upfront. But the column name in each family could be added as needed. The new CQL model is more schema oriented.

1.0 Tables, row keys and columns

Data is stored in Tables which has rows and columns.

Table is partitioned by the primary key, which is the row key.

For columns , CQL supports various data type like int , varchar, text, float, Set , List and many more

The CQL create statement below creates the users table with userid as the primary key.

create Table Users (
    userid varchar,
    name varchar,
    email varchar,
    address varchar,
    PRIMARY KEY(userid)
) ;

You insert rows into this table using the insert statement. Rows of table are partitioned across nodes based on the primary key.

insert into Users(userid,name,email) values('user1', 'user1 name', 'user1@gmail.com') ;

2.0 No Joins but wide columns

Let us say you want groups of users. In a RDBMS , you might have a table with columns, groupid and userid with userid being a foreign key into Users table. In a distributed database like Cassandra joins are expensive. Hence the data needs to be de-normalized. You can create a table GroupsOfUsers with groupid as the primary key. As de-normalization, in addition to having userid column, repeat some useful columns like user name and user email that you might need when looking at members of the group.

create Table GroupsOfUsers (
    groupid varchar,
    groupname varchar,
    userid varchar,
     user_name varchar,
     user_email varchar
     PRIMARY KEY(groupid,userid)
)

When you have a compound primary key, the first column, in this case group id is used as the partition key. The other columns, in this case userid is used to cluster the remaining columns by userid. Additionally, the columns in the row are sorted based of the other columns of the primary key, namely userid.

If you do ,

select * from GroupsOfUsers where groupid = "group1" ;

The result might be

group1     user1  name1 email1
group1     user2   name2 email2
group1     user3   name3  email3

Think of the above as logical rows.

Under the hood , the columns might be stored physically as 1 row with one or more columns for each user.

key        column1       column2            column3          column4          column5        column6
group1  user1:name1  user1:email1     user2 :name2   user2:email1   user3:name3  user3:email3

Each row can have as many as 2 billion columns if necessary. This is very useful in other use cases such as creating indexes or storing collections.

3.0 Collection column types

If each user had a number of  friends, in RDBMS, this would be modeled by joining with a Friends table. In Cassandra you can do that by adding a column type of Collection. The collections supported are List, Map and Set.

Alter Table Users add friends Set ;

insert into Users set friends = friends + {'friend6'} where  userid = 'user1' ;

4.0 Indexes using wide columns

Index is a data structure that enables fast look up based on a Key. In Cassandra the table is partitioned across nodes based on the primary key. By default, each node in Cassandra maintains an index for the primary keys that it host.

You can create additional indexes on other columns. For such indexed columns, Cassandra under the hood creates another table whose primary key is the indexed column.

For example , if frequently had to do a query such as

select groupid from GroupOfUsers where userid = 'user1' ;

It could be worthwhile to create an index on userid column to speed up the query.

create index userid_index on GroupOfUsers(userid) ;

Logically this would be like creating a table

create table userid_idx (
     userid varchar,
     groupid varchar,
     primary key(userid,groupid)
)

The partition key will be userid and the columns in the row will be the groups to which the user belong. This makes use of the wide column feature of Cassandra mentioned above.

5.0 Note on consistency

The original Dynamo paper on which Cassandra is based on, talks about being eventually consistent. Eventual consistency scares people even though we see it in life all the time. For example, the ATM may let you take more cash than you have in your account. When the bank reconciles ATM withdrawals with your account and realizes that you have overdrawn, it takes appropriate action.

Cassandra extends eventually consistency by offering a model of tunable consistency.

A write consistency of ANY means that it is enough for write to be written to any one node. This gives low consistency but high availability. A write consistency on ONE, TWO, THREE or QUORUM implies that writes need to be written to that many replicas. Higher the writes , more the consistency and less availability.

A read consistency of ONE, TWO, THREE, QUORUM indicates the number of replicas to be consulted before returning the most recent data from the replicas.

Note that unlike what is described in the Dynamo paper, when there is a conflict between data in replicas, Cassandra returns the most recent data and not vector clocks with different versions that clients need to resolve.

In summary, with CQL Cassandra provides a simple data model that makes it easier to model and develop applications. With CQL, Cassandra can be looked at as a viable alternative to a relational database when scalability and high availability are important. For additional details, the Cassandra documentation is at http://www.datastax.com/documentation/cassandra/2.0/webhelp/index.html





          

Friday, September 20, 2013

Cassandra Vs HBase : Which NoSql store do I need ?

There many NoSql databases out there and it can be confusing to determine which one is suitable for a particular use case. In this blog, we discuss the two more popular ones, Cassandra and HBase. If you are new to NoSql , you may review these earlier posts:

What is NoSql ?
HBase
HBase architecture

1. History

To understand the goals and motivation behind any product, it is a good idea to trace its origin.

HBase is based on Google's Bigtable as described in the paper "Bigtable: A Distributed Storage System for Structured Data". You only need to read the first line of the abstract to understand what BigTable attempts to do. It is a distributed storage system for managing structured data that can scale to very large size of the order of petabytes using thousands of commodity servers.

Cassandra derives its motivation from Amazon's Dynamo as described in the paper "Dynamo: Amazon's highly available key value store". Reading the first page of this paper it is clear that the primary goals were reliability at scale and high availability for amazon's online store.

While both papers talk about scale, reliability and availability, the primary problem BigTable (and HBase) is addressing is  random access to data of the scale of  hundreds of terrabytes or petabytes  and the primary problem that Dynamo ( and Cassandra) addresses  is high availability.

2. Stack

HBase is based on Hadoop  and depends on HDFS, namenodes, datanodes, zookeeper etc. If you are already familiar and invested in Hadoop, this is not a big issue. Cassandra has no such dependency. If you are not invested in Hadoop, then Cassandra has less moving parts and could be easier to go with.

3. Data Model

HBase data model has table, rows, column families and columns. Table has rows and column families. Each row has a key. A column family has columns. When a table is created, the column families have to be defined. But the columns in a column family do not have to be defined and can be added ad hoc.

Cassandra data model used to be described in a similar manner. More recently Cassandra has adopted CQL which is like SQL, as the client interface to data. The data model consists of  tables, rows and columns. You have to define Tables and their columns. Additional columns can be added ad hoc. The data is partitioned based on row key.

Neither supports joins on tables. This is all about denormalization.

HBase does not support secondary indexes. Cassandra supports indexes on columns by denormalization and creating additional tables under the hood.

4. Consistency

HBase supports strong consistency for reads and writes at a row level. Out of the box Cassandra is eventually consistent, though it can be configured for strong consistency. Eventual consistency means that a read may not see the latest update by a write. When that happens , the application is expected to deal with it in an appropriate manner.

In Cassandra, client might connect and write to any node. The write is also replicated to multiple nodes. At any time, not all nodes might have that latest value replicated. Multiple writes that conflict can happen at the same. These might need to be reconciled.

5. Range queries on row keys  

Partioning of rows in HBase is ordered. This implies one can do range queries by row key in HBase. In Cassandra the default partitioner does not keep ordering and you cannot do range queries. There is an ordered partitioner but the recommendation is to never use it in production because it can cause the cluster to become unbalanced. This is a pretty important difference. If your use cases require range queries as would be the case in data warehousing type applications, then you will need to go with HBase.

5. High Availability and Partition tolerance

Cassandra is built ground up for availability and partition tolerance while sacrificing on consistency. All nodes are equal. Data is replicated across nodes. Client can connect to any node and read or write any key.

Early versions of  HBase had very obvious single point of failures in the HDFS name node and the HBASE master. Early versions required external HA solutions (Eg. Linux HA project)  in an Active - Passive configuration. For example you would have a stand by Name node , shared storage and monitoring of the active name node. When monitoring detects that the active node is down, the HA solution would fail over to the stand by node. Recently Hadoop 2 has added native support for hot failover (active active) for the namenode. HBASE has added hot failover support for HBASE master as well. But clearly these came after the fact.

If high availability is a primary use case, then Cassandra clearly wins.

6. Storage

HBase stores data on HDFS. Cassandra stores data locally on each node.

The primary storage data structure for both is based on Google's Bigtable and is the Log Structured merge tree. Your typical RDBMS uses the B+ tree as the primary data structure for creating and storing indexes. In a B+ tree, each node is a disk block and to find a key or to write a key, the code has to read and write  a fews blocks from disk. The log structured merge tree has lesser disk access than a B+tree. LSM has 2 tree like structures one is memory and one on disk. Initially reads and writes happen to the tree in memory. When the in memory tree reaches a certain size, it is written to disk, where it is merged with the larger tree.

In both HBase and Cassandra, users have complained that the merge process mentioned above ( also known as compaction) slows things down.

7. Summary

HBase and Cassandra are suitable for different use cases.

If you have data warehousing type of use cases and large amounts the data that will continue to grow, HBase is a more suitable choice. If you are thinking data from large application log files, several hundred terrabytes or petabytes of time series data or monitoring type data that needs to be aggregated or mined, then HBase might be appropriate. Recently I spoke to a cloud based application monitoring company that was storing large amounts of monitoring data generated every minute in MySql and were complaining about not being able to scale. They need HBase.

Cassandra on the other hand is more suitable if you needs to serve traffic from a highly available data store. If you are already on an RDBMS but are have availability or scale issues, then Cassandra might be something to look at. With CQL, which is like SQL, Cassandra is positioning itself as an alternative to your typical RDBMS. A company that I know uses a key value store that is fast, but does not scale nor does it have built support for high availability. Very often, their shards go down and they lose data. They should consider Cassandra.