Monday, December 23, 2013

Streaming Big Data using Storm

Storm is an open source distributed system for processing streams of data. In MapReduce and Hadoop tutorial we discussed the use of Hadoop MapReduce to process large quantities of data. MapReduce is a batch system that processes input files and produces output files. When the input files are processed, the map reduce program stops. When there are new input files, the program must be restarted. In the real world, data is produced continuously and results from processing are required immediately. A batch processing system like Map reduce introduces delays that are not tolerable.

In Storm, the framework components and data processing code provided by you is continuously running and processing streams of data, there by producing results in real time.

Storm cluster has two types of nodes. The master node also called nimbus distributes code to other nodes in the cluster, assigns tasks and monitors completion of tasks. Worker nodes execute the user code. Coordination between Master and workers happens using zookeeper.

Data processing code that is to be executed on a Storm cluster is packaged into what is called a topology. A topology is a graph where each node has some code to be executed and the edges indicate the next node(s) that should process the data.

The basic abstraction for data to be processed is the stream which is a sequence of tuples. A tuple is a set of elements. A Spout is a piece of code that produces tuples. A Bolt consumes tuples produced by a spout and does some processing on it and potentially emits processed tuples that can be consumed by other bolts. Spouts and bolts are combined to form a topology which is deployed to a storm cluster for execution.




Figure 1 is a basic storm topology. Spout A is code that reads messages from a message queue and makes each message available to the topology as tuples. The output of Spout A is sent to Bolt1 and Bolt2 which do some processing on the tuples. From the outputs of Bolt1 and Bolt2, some specific fields are sent to Bolt3 and others to Bolt4. Bolts 3 and 4 might  do some aggregation and write some values to a database or another message queue.

Spouts and bolts execute in parallel. Their instances are distributed across worker nodes in the cluster.

Storm guarantees that messages produced by a spout will be completely processed through the entire topology. In Figure 1, Spout A reads a message from a message queue. Each bolt has to acknowledge that it has processed the tuples it received. The message is not deleted from the queue until acknowledgements are received by spout A. If spout A receives a failure message instead of an acknowledgement, it would re deliver the message. Bolt 1 and bolt 2 may produce additional tuples that are delivered to Bolt 3 and 4. The new tuples need to be acknowledged as well for the processing of the original message to be considered complete. Storm maintains a graph that describes how the tuples are related, so that acknowledgements are handled correctly.

You can achieve realtime MapReduce functionality by coding some bolts to function as mappers and others to function as reducers. By using field groupings, you can tell Storm to send certain tuples to a specific Bolt task that does reducer work like aggregation.

To implement a Spout, you can extend the class BaseRichSpout and override the nextTuple method. To emit tuples, you call the emit method.

public class SpoutA extends BaseRichSpout {

     private SpoutOutputCollector _collector ;

     public void open(Map conf, TopologyContext ctx, SpoutOutputCollector c) {

          _collector = c ;
     }


     public void nextTuple() {

           // get a message from a queue
           // convert to a list of values
           this._collector.emit(new Values(value1,value2,value3,value4)) ;

    }

}

To implement a Bolt, you can extend the class BaseRichBolt and override the execute method

public class Bolt1 extends BaseRichBolt {

    private OutputCollector _collector ;

    public void prepare(Map conf, TopologyContext ctx, OutputCollector c) {

         _collector = c ;

     }
   
     public void execute(Tuple input) {

           // process values in input
         
           // write to q or db
           // or emit values

          _collector.emit(value1, value2) ;

          _collector.ack(input) ;  // acknowledge
    }

}

To Build a topology and main program :


public class SimpleTopology {

      public static void main(String[] args) {

              TopologyBuilder b = new TopologyBuilder() ;
              b.setSpout("source",new SpoutA(),10) ;

              b.setBolt("bolt1", new Bolt1() ,10).shuffleGrouping("source") ;
              b.setBolt("bolt2",new Bolt2(),10).shuffleGrouping("source") ;
              // shuffleGrouping means output from source is randomly distribute to bolt tasks
              b.setBolt("bolt3",new Bolt3(),10).fieldGrouping("bolt1",new   
              Values("field1").fieldGrouping("bolt2",new Values("field2") ;
              StormTopology st = b.createTopology() ;
              Config conf = new Config() ;

               LocalCluster cluster = new LocalCluster() ;
               cluster.submitTopology("sample",conf,st) ;
               Utils.sleep(1200000) ;
               cluster.killTopology("sample") ;
               cluster.shutdown() ;
      }

}

If you need a bolt to receive only certain fields, you can use FieldGrouping instead of shuffleGrouping.

To conclude, Storm is a framework that enables realtime processing of Big Data in a distributed environment. If you are currently using Map Reduce, then Storm might enable you to do some of that processing in real time.




   







Friday, November 22, 2013

JAVA Comparable vs Comparator

The java.lang.Comparable and java.util.Comparator interfaces provide similar function and there is sometimes confusion as to which interface to use where. Some documentation adds to the confusion by implying that the Comparable interface should be used for natural ordering where as Comparator should be used for total ordering without explaining what that means.

The Comparable interface is defined as

public interface Comparable<T> {
   int compareTo(T o) ;
}

The compareTo method is for comparing this object with the object that is passed in. Instances of classes that implement Comparable use this method to compare other instances to themselves.

Let us say class Employee implements Comparable. If an array of Employees is passed to Arrays.sort(Object[] o) methods, the sorting code calls the compareTo method to compare Employee objects with one another and put them in the correct order.  In fact the Arrays method sort(Object[] o) expects the objects to have implemented the Comparable interface and an exception is thrown if the objects do not implement Comparable.

class Employee implements Comparable<Employee> {

    private int id ;
    private String name ;

    // natural ordering  compare by id and by name
    // simplistic 
    public int compareTo(Employee e) {

        if ( id == e.id)
             return name.compareTo(e.name) ;
        
        return (id - e.id) ; 
   }

}

Generally the Comparable interface is used when

(1) You have control of the class and can make the class implement Comparable
(2) The objects need to be sorted in their natural order. Natural order is the order defined by the JVM. For number types everyone understand the order - 23 is greater than 22. For Strings it is the lexicographic order.
(3) Generally there is only one natural order for each type. In the above example, you can order by id,name. If you needed a second kind of ordering by just the id or just the name, it would not be possible with Comparable. But you could write multiple Comparators .

If you need to sort an array of objects which do not implement Comparable and the code for the classes is not under your control, you can write a new class that implements the Comparator interface and call the following method in java.util.Arrays.

public static void sort(T[] a , Comparator <? super T>) ;

interface Comparator<T> {

   int compare(T o1, T o2) ;

   boolean equals(Object o) ;
}

public class EmpIdComparator implements Comparator<Employee> {

   public int compare(Employee e1, Employee e2) {

          return e1.id - e2.id ;
    }

}

public class EmpNameComparator implements  Comparator<Employee> {

   public int compare(Employee e1, Employee e2) {

         return e1.name.compareTo(e2.name) ;

   }

}

If you need to sort employees by id , you can pass an instance of the EmpIdComparator to the sort methods. If you need employees sorted by name, use the EmpNameComparator.

Here the compare method lets you compare any two objects. The Comparator interface is used when

(1) You do not have control of the class or the class does not implement Comparable for some reason.
(2) You want ordering that is different from what is generally accepted as the natural order.
(3)  You need many different kinds of ordering.

All the sorting methods and collections that maintain order ( SortedMap, SortedSet ) expect the contained objects to either implement Comparable or you need to pass in another instance of a Comparator Object.

Lastly the Comparable and Comparator interfaces are useful design patterns that you can copy whenever you need to write code that applies to generics. Given a collection of type T and you need to do some operation on each type T. Consider defining a generic interface with a a generic method. The caller of your API can pass in an implementation for the interface for the appropriate type.

Friday, October 25, 2013

Apache Cassandra Data Model

This is an introduction to the Apache Cassandra data model. For the benefit of those not familiar with Cassandra,  it is an open source, highly scalable, highly available NoSQL database. Some key architectural highlights of  Cassandra are :

No Single point of failure.
No Master - All servers in cluster are equal.
Peer to peer communication between nodes to exchange data and configuration.
Data is partitioned across nodes based on consistent hash.
Data is automatically replicated.
(and recently added) SQL like data access model.

Cassandra has moved to a simple model that is described by a SQL like language called CQL. Lower level constructs like column family are no longer mentioned. Note that earlier column family models were without much of a schema. You needed to define column family upfront. But the column name in each family could be added as needed. The new CQL model is more schema oriented.

1.0 Tables, row keys and columns

Data is stored in Tables which has rows and columns.

Table is partitioned by the primary key, which is the row key.

For columns , CQL supports various data type like int , varchar, text, float, Set , List and many more

The CQL create statement below creates the users table with userid as the primary key.

create Table Users (
    userid varchar,
    name varchar,
    email varchar,
    address varchar,
    PRIMARY KEY(userid)
) ;

You insert rows into this table using the insert statement. Rows of table are partitioned across nodes based on the primary key.

insert into Users(userid,name,email) values('user1', 'user1 name', 'user1@gmail.com') ;

2.0 No Joins but wide columns

Let us say you want groups of users. In a RDBMS , you might have a table with columns, groupid and userid with userid being a foreign key into Users table. In a distributed database like Cassandra joins are expensive. Hence the data needs to be de-normalized. You can create a table GroupsOfUsers with groupid as the primary key. As de-normalization, in addition to having userid column, repeat some useful columns like user name and user email that you might need when looking at members of the group.

create Table GroupsOfUsers (
    groupid varchar,
    groupname varchar,
    userid varchar,
     user_name varchar,
     user_email varchar
     PRIMARY KEY(groupid,userid)
)

When you have a compound primary key, the first column, in this case group id is used as the partition key. The other columns, in this case userid is used to cluster the remaining columns by userid. Additionally, the columns in the row are sorted based of the other columns of the primary key, namely userid.

If you do ,

select * from GroupsOfUsers where groupid = "group1" ;

The result might be

group1     user1  name1 email1
group1     user2   name2 email2
group1     user3   name3  email3

Think of the above as logical rows.

Under the hood , the columns might be stored physically as 1 row with one or more columns for each user.

key        column1       column2            column3          column4          column5        column6
group1  user1:name1  user1:email1     user2 :name2   user2:email1   user3:name3  user3:email3

Each row can have as many as 2 billion columns if necessary. This is very useful in other use cases such as creating indexes or storing collections.

3.0 Collection column types

If each user had a number of  friends, in RDBMS, this would be modeled by joining with a Friends table. In Cassandra you can do that by adding a column type of Collection. The collections supported are List, Map and Set.

Alter Table Users add friends Set ;

insert into Users set friends = friends + {'friend6'} where  userid = 'user1' ;

4.0 Indexes using wide columns

Index is a data structure that enables fast look up based on a Key. In Cassandra the table is partitioned across nodes based on the primary key. By default, each node in Cassandra maintains an index for the primary keys that it host.

You can create additional indexes on other columns. For such indexed columns, Cassandra under the hood creates another table whose primary key is the indexed column.

For example , if frequently had to do a query such as

select groupid from GroupOfUsers where userid = 'user1' ;

It could be worthwhile to create an index on userid column to speed up the query.

create index userid_index on GroupOfUsers(userid) ;

Logically this would be like creating a table

create table userid_idx (
     userid varchar,
     groupid varchar,
     primary key(userid,groupid)
)

The partition key will be userid and the columns in the row will be the groups to which the user belong. This makes use of the wide column feature of Cassandra mentioned above.

5.0 Note on consistency

The original Dynamo paper on which Cassandra is based on, talks about being eventually consistent. Eventual consistency scares people even though we see it in life all the time. For example, the ATM may let you take more cash than you have in your account. When the bank reconciles ATM withdrawals with your account and realizes that you have overdrawn, it takes appropriate action.

Cassandra extends eventually consistency by offering a model of tunable consistency.

A write consistency of ANY means that it is enough for write to be written to any one node. This gives low consistency but high availability. A write consistency on ONE, TWO, THREE or QUORUM implies that writes need to be written to that many replicas. Higher the writes , more the consistency and less availability.

A read consistency of ONE, TWO, THREE, QUORUM indicates the number of replicas to be consulted before returning the most recent data from the replicas.

Note that unlike what is described in the Dynamo paper, when there is a conflict between data in replicas, Cassandra returns the most recent data and not vector clocks with different versions that clients need to resolve.

In summary, with CQL Cassandra provides a simple data model that makes it easier to model and develop applications. With CQL, Cassandra can be looked at as a viable alternative to a relational database when scalability and high availability are important. For additional details, the Cassandra documentation is at http://www.datastax.com/documentation/cassandra/2.0/webhelp/index.html





          

Friday, September 20, 2013

Cassandra Vs HBase : Which NoSql store do I need ?

There many NoSql databases out there and it can be confusing to determine which one is suitable for a particular use case. In this blog, we discuss the two more popular ones, Cassandra and HBase. If you are new to NoSql , you may review these earlier posts:

What is NoSql ?
HBase
HBase architecture

1. History

To understand the goals and motivation behind any product, it is a good idea to trace its origin.

HBase is based on Google's Bigtable as described in the paper "Bigtable: A Distributed Storage System for Structured Data". You only need to read the first line of the abstract to understand what BigTable attempts to do. It is a distributed storage system for managing structured data that can scale to very large size of the order of petabytes using thousands of commodity servers.

Cassandra derives its motivation from Amazon's Dynamo as described in the paper "Dynamo: Amazon's highly available key value store". Reading the first page of this paper it is clear that the primary goals were reliability at scale and high availability for amazon's online store.

While both papers talk about scale, reliability and availability, the primary problem BigTable (and HBase) is addressing is  random access to data of the scale of  hundreds of terrabytes or petabytes  and the primary problem that Dynamo ( and Cassandra) addresses  is high availability.

2. Stack

HBase is based on Hadoop  and depends on HDFS, namenodes, datanodes, zookeeper etc. If you are already familiar and invested in Hadoop, this is not a big issue. Cassandra has no such dependency. If you are not invested in Hadoop, then Cassandra has less moving parts and could be easier to go with.

3. Data Model

HBase data model has table, rows, column families and columns. Table has rows and column families. Each row has a key. A column family has columns. When a table is created, the column families have to be defined. But the columns in a column family do not have to be defined and can be added ad hoc.

Cassandra data model used to be described in a similar manner. More recently Cassandra has adopted CQL which is like SQL, as the client interface to data. The data model consists of  tables, rows and columns. You have to define Tables and their columns. Additional columns can be added ad hoc. The data is partitioned based on row key.

Neither supports joins on tables. This is all about denormalization.

HBase does not support secondary indexes. Cassandra supports indexes on columns by denormalization and creating additional tables under the hood.

4. Consistency

HBase supports strong consistency for reads and writes at a row level. Out of the box Cassandra is eventually consistent, though it can be configured for strong consistency. Eventual consistency means that a read may not see the latest update by a write. When that happens , the application is expected to deal with it in an appropriate manner.

In Cassandra, client might connect and write to any node. The write is also replicated to multiple nodes. At any time, not all nodes might have that latest value replicated. Multiple writes that conflict can happen at the same. These might need to be reconciled.

5. Range queries on row keys  

Partioning of rows in HBase is ordered. This implies one can do range queries by row key in HBase. In Cassandra the default partitioner does not keep ordering and you cannot do range queries. There is an ordered partitioner but the recommendation is to never use it in production because it can cause the cluster to become unbalanced. This is a pretty important difference. If your use cases require range queries as would be the case in data warehousing type applications, then you will need to go with HBase.

5. High Availability and Partition tolerance

Cassandra is built ground up for availability and partition tolerance while sacrificing on consistency. All nodes are equal. Data is replicated across nodes. Client can connect to any node and read or write any key.

Early versions of  HBase had very obvious single point of failures in the HDFS name node and the HBASE master. Early versions required external HA solutions (Eg. Linux HA project)  in an Active - Passive configuration. For example you would have a stand by Name node , shared storage and monitoring of the active name node. When monitoring detects that the active node is down, the HA solution would fail over to the stand by node. Recently Hadoop 2 has added native support for hot failover (active active) for the namenode. HBASE has added hot failover support for HBASE master as well. But clearly these came after the fact.

If high availability is a primary use case, then Cassandra clearly wins.

6. Storage

HBase stores data on HDFS. Cassandra stores data locally on each node.

The primary storage data structure for both is based on Google's Bigtable and is the Log Structured merge tree. Your typical RDBMS uses the B+ tree as the primary data structure for creating and storing indexes. In a B+ tree, each node is a disk block and to find a key or to write a key, the code has to read and write  a fews blocks from disk. The log structured merge tree has lesser disk access than a B+tree. LSM has 2 tree like structures one is memory and one on disk. Initially reads and writes happen to the tree in memory. When the in memory tree reaches a certain size, it is written to disk, where it is merged with the larger tree.

In both HBase and Cassandra, users have complained that the merge process mentioned above ( also known as compaction) slows things down.

7. Summary

HBase and Cassandra are suitable for different use cases.

If you have data warehousing type of use cases and large amounts the data that will continue to grow, HBase is a more suitable choice. If you are thinking data from large application log files, several hundred terrabytes or petabytes of time series data or monitoring type data that needs to be aggregated or mined, then HBase might be appropriate. Recently I spoke to a cloud based application monitoring company that was storing large amounts of monitoring data generated every minute in MySql and were complaining about not being able to scale. They need HBase.

Cassandra on the other hand is more suitable if you needs to serve traffic from a highly available data store. If you are already on an RDBMS but are have availability or scale issues, then Cassandra might be something to look at. With CQL, which is like SQL, Cassandra is positioning itself as an alternative to your typical RDBMS. A company that I know uses a key value store that is fast, but does not scale nor does it have built support for high availability. Very often, their shards go down and they lose data. They should consider Cassandra.

Related Blogs:

Cassandra data model
Choosing Cassandra 
Cassandra Compaction

Friday, August 30, 2013

Java Serializable vs Externalizable

Serializable and Externalizable are almost forgotten interfaces in Java. And for good reason. Many applications do not need to serialize objects explicitly. Most applications write to a database and to write to databases you use either an API like JDBC or a framework like Hibernate or JPA. However if you writing to a file or network, it is worthwhile to understand the default serialization mechanism.

Serializable

To leverage the default serialization mechanism in Java, you need to implement the java.io.Serializable interface. This a marker interface, in that, for default behavior, you do not need to implement any methods.

public class Employee implements Serializable {
    private int id ;
    private String name ;
    private String address ;

    public Employee(int i, String nm, String adr) {
        id = i ;
        name = nm ;
        address = adr ;
    }

    public int getId() {
        return id ;
    }
    
    public String getName() {
        return name ;
    }

    public String getAddress() {
        return address ;
   }

}

To Serialize an Employee Object:

try {
    Employee e = new Employee(1,"John Doe","123 5th Street, New City , CA") ;
    ObjectOutputStream os = new ObjectOutputStream(new FileOutputStream("employees.dat"))) ;
    os.writeObject(e) ;
    os.close() ;
} catch (IOException e) {
    log(e) ;
}

To Deserialize an employee Object:

Employee er = null ;
try {
     ObjectInputStream is = new ObjectInputStream(new FileInputStream("employee.dat"))) ;
     er = (Employee) is.readObject() ;

} catch(IOException e1) {
    log(e1) ;
}

You can customize the default serialization by adding the following methods:

private void writeObject(ObjectOutputStream out) throws IOException {
    out.writeInt(id) ;
    out.writeUTF(name) ;
    out.writeUTF(address) ;

   // add customization here
}
private void readObject(ObjectInputStream in) throws IOException {
    id = in.readInt() ;
    name = readUTF() ;
    address = readUTF() ;
    // add customization here
}

If these methods exist, the jvm calls them during serialization. You can also call defaultWriteObject and defaultReadObject to get default behaviour then and add to it.

Default serialization is quite slow and should not be used except in the most simple use cases. Implementing readObject and writeObject does not give much improvement because the JVM has to use reflection to determine if those private methods exist.

Externalizable
The default serialization protocol is considered a little heavy. If you wish to completely override it and use a different protocol, you can implement the Externalizable interface which has 2 methods.

public void readExternal(ObjectInput in) throws IOException

public void writeExternal(ObjectOutput out) throws IOException

Once you implement these methods, the client code for serialization is same as above. Studies have shown that using Externalizable provides better performance. Since these are public methods, the JVM does not have to resort to reflection. The implementations of readExternal and writeExternal would be similar to readObject and writeObject shown above.

Though for most industrial grade applications, you will want to forego serializing objects directly and serialize the types that make up your objects.

In Summary,

You may use the Serializable interface for simple use cases. It is easy to use, but the performance is not good.

Using the Externalizable interface is a little more work and it gives little better performance.

For best performance, design a record format for the data in the objects and serialize the types directly to memory and write blocks to disk. Of course, this is much more work.

Tuesday, July 23, 2013

Redis : A fast key value store

Redis is a fast in-memory key value store that we have been using to develop low latency applications. It is more a cache like Memcached or Ehcache and less like a database (NoSQL or SQL).

You should consider Redis when you need latency less than a few milliseconds or sub millisecond. In such situations Redis can front a more permanent database that might be SQL or NoSQL.

Some very useful features of Redis:

1. It is very easy to setup and get it up and running. See tutorial below.

2.  It is very fast. Everything is done in memory. It is single threaded. So there is no overhead of context switching.

3. It supports more data types than a typical key value store. Supported types include Strings, List, HashMap, Set.

4. It supports some atomic operations such incrementing a value, adding to a list. This is important to be able to avoid race conditions when multiple clients are accessing the data.

5. Redis supports master slave replication which is easy to setup. This is an important feature if you have distributed applications.

Let us do a quick hands on tutorial on how to use Redis.

Step 1: Download and build 

Download redis from http://redis.io/download.
tar -xvf redis-2.6.14
cd redis-2.6.14
make

This will build the redis-server and redis-cli binaries under the src directory.

Step 2: Run the server

cd src
./redis-server

To interact with the server, start the client with the command

./redis-cli

Step 3:  Set and get some simple keys

redis 127.0.0.1:6379> set asimplekey 23
OK
redis 127.0.0.1:6379> get asimplekey
"23"
redis 127.0.0.1:6379> incr asimplekey
(integer) 24

incr operation is atomic. Atomic operations are preferred because no locking is required.

Step4: List and Stack operations

redis 127.0.0.1:6379> lpush alist "value1"
(integer) 1
redis 127.0.0.1:6379> lindex alist 0
"value1"
redis 127.0.0.1:6379> lpush alist value2
(integer) 2
redis 127.0.0.1:6379> lrem alist 1 value2
(integer) 1
redis 127.0.0.1:6379> lpop alist
"value1"
redis 127.0.0.1:6379> lpop alist
(nil)

Step 5: HashMap operations

Here we create a key user1 whose value is a hashmap with keys id,name and title. 

redis 127.0.0.1:6379> hset user1 id 1
(integer) 1
redis 127.0.0.1:6379> hset user1 name JohnDoe
(integer) 1
redis 127.0.0.1:6379> hset user1 title Programmer
(integer) 1
redis 127.0.0.1:6379> hgetall user1
1) "id"
2) "1"
3) "name"
4) "JohnDoe"
5) "title"
6) "Programmer"
redis 127.0.0.1:6379> hdel user1 title
(integer) 1

Redis supports several other commands. We will not go over them here. Redis documentation does a pretty good job of describing the commands.

Some additional points to remember :

Redis supports 2 types of persistence which can be useful to be able to recover from crashes. You can configure Redis to either dump snapshots of memory or log each command to an append log. A disadvantage of snapshots is that if your server crashes between snapshots, some data might be lost.

Redis client libraries are available in several programming languages like Java, C, C#, Perl etc. The programming model is the same as shown in the commands above.

Redis requires all data to be in memory. You can configure Redis to reject writes once a certain amount of memory is used up. Without the max memory configuration, if data exceeds available memory, the operating system will start paging and performance of Redis degrades rapidly.

A major limitation of Redis is that it does not support server side sharding. Only client side sharding based on consistent hashing is supported. This means you need to plan in advance, how big your data is going to be and create the appropriate number of shards. Later if your data grows beyond what you planned for and additional shards are required, migration of data to new shards has to be done by writing a client side program.

Support for high availability was lacking, but is now available as beta.

Redis has been incorrectly classified as a NoSQL database. I see it more as a fast cache that can front another more reliable database. 


Friday, June 28, 2013

JAVA NIO tutorial

Most programmers are familiar with java.io package which offers a variety of classes for input/output operations to files or the network. In this "conventional IO", programs read from and write to streams. A stream is a sequence of bytes.You can either write to a stream or read from it, but you cannot do both on the same stream. While the operating system and the JVM are reading or writing the data, the thread might block. This is not very scalable for servers that are servicing a large numbers of clients. Many threads might just be waiting for IO to complete.

java.nio is a package that offers an alternative approach to IO that is considered more scalable. Even though this has been around for several jdk releases, many Java programmers are not familiar with it. The main difference is that with the nio package the programmer works channels and buffers rather than streams. We describe some basic concepts of the java.nio package

Buffers

Buffer and its sub classes are used as containers of data. They can hold data that needs to be written to a channel or that is read from a channel.

// allocate a character buffer
CharBuffer charBuf = CharBuffer.allocate(128) ;
// put some data into the buffer
charBuf.put("abcdef") ;
 // read data from a buffer
charBuf.flip() ;
char c = charBuf.get() ;

Channels

A channel is an interface that represents a connection to a device or entity that is used for input or output such as a file or a socket. There are many implementations of Channel. For example, FileChannel is for reading and writing file, while SocketChannel is for reading/writing socket.

Unlike with streams, you can read and write to the same channel.From a channel, you can write to a buffer or read from a buffer.

// create a channel to a file
RandomAccessFile thefile = new RandomAccessFile("afile.txt","rw") ;
FileChannel theChannel = thefile.getChannel() ;

// read from a channel
ByteBuffer b = ByteBuffer.allocate(128) ;
int numBytesRead = theChannel.read(b) ;

// write to a channel
String data = "here is some data to write" ;
ByteBuffer bBuf = ByteBuffer.allocate(1024) ;
// put some data into the buffer
bBuf.put(data.getBytes()) ;
 // read data from a buffer
bBuf.flip() ;

while(bBuf.hasRemaining()) {
    channel.write(bBuf) ;
}

The buffer has to be flipped because the channel is going to be reading from it. Write has to be done in a loop because the channel may not write all the bytes at one time.

Selector

A Selector is a component that lets you do non blocking IO. In conventional network programming IO, you create 1 thread per connection. The threads blocks waiting for IO to happen. That is waste of resources.

With Selector, a single thread can express interest in events (like connect, read, write) from multiple channels.  

// Creating a selector
Selector selector = selector.open() ;

You register one of more channels with a Selector , also indicating what event you are interested in.

// Register channel with selector
channel.configureBlocking(false) ;
selector.register(channel,SelectionKey.OP_READ) ;

We are interested in non blocking IO. Hence set blocking to false. SocketChannels can be non blocking but not FileChannels. That is alright because that main use case for non blocking IO is in network programming. For example, a web server that needs to do IO with thousands of clients.

Once the channels are registered, you call the select method which will block till an event occurs on one of the registered channels. This is the only thread the needs to block. 

// sit in a loop waiting for something to happen
while (true) {

int ready = selector.select() ;



}


When there is an event, the select call returns. The selector give the list of selectionkeys that had an event. You can iterate over the selectionkeys and handle the keys appropriately. When necessary, the processing of the event can handed of to a worker thread.

while (true) {

int ready = selector.select() ;

Set selected = selector.selectedKeys() ;
Iterator iter = selected.iterator() ;

    while(iter.hasNext()) {

            SelectionKey key = iter.next() ;

            if (key.isAcceptable()) {

                     // server socket channel accepted a connection

            } else if (key.isConnectable()) {
                     // connected to a server

            } else if (key.isReadable()) {
                    // channel is ready for reading
                   

            } else if (key.isWritable()) {

                    // channel is ready for writing
           }

 
    }
    iter.remove() ;
 } 

From the SelectionKey, you can get the channel on which the event occurs with a simple call.

Channel c = SelectionKey.getChannel() ; 

Once you get the channel , you can read or write to it. Once data is read, it can be offloaded to a worker thread for processing. For writes, you should queue data that needs to be written somewhere, so that as soon as the channel is ready for write, you can get the data and write it.

Most server applications use non blocking IO when the need to scale. Tomcat comes with a NIO adapter which should be use when you need to scale to thousands of clients.

Conclusion

In Summary, the main difference between IO and NIO packages is that IO is stream oriented where as NIO is based on buffers and channels. Buffers give a little more flexibility and ease is reading/writing data. For Network programming, NIO support non blocking mode, which lets you scale by ensuring that every thread does not block waiting for input/output.




Tuesday, May 28, 2013

Java Concurrency : 10 things every programmer should know

1.  Synchronized

Every one knows that synchronized keyword is used to acquire a lock and serialize access to a code block. A lesser known function of this keyword is to synchronize thread local memory with main memory. When a thread enters a synchronized block, it refreshes its local cache with data from main memory. You can be sure that you are now reading any data written by other threads. When a thread leaves a synchronized block, it writes data to main memory. The data is guaranteed to be seen by any other thread that reads.

2. Executors

Prior to JDK 5 and prior to java.util.concurrent, the way to create threads was to extend java.lang.Thread and override the run method or implement a Runnable and pass it to a Thread constructor. However most applications need more than a single thread and you had to write your own thread pool. Since JDK5, the preferred way to create and use threads is to use java.util.concurrent.Executors class to create a threadpool.

ExecutorService tPool = Executors.newCachedThreadPool() ;
tPool.submit(new Runnable() {

      public void run() {
             // do work
       }

}) ;

Executors can create different kinds to threadpools. ExecutorService is an interface that can accept Runnable or Callable s that need to be executed.

3. Callable and Future

Callable like Runnable is an interface to represent a task that needs to be executed. The difference is that the call method of the Callable interface can return a value.

Future is an interface that has methods to check status on an executing task and get the result when it is available.

Callable<List> work = new Callable<List>() {   public List call() {

         List result = new ArrayList() ;

          // do some work and populate result

         return result ;
   }
}

Future<List> future = executor.submit(work) ;

List result = future.get() ;

get() method waits for the execution to complete and then gets the result.

Callable and Future make it convenient to code the interaction between tasks that generate results and tasks that are waiting for results. Future also has methods to check if a task is completed or canceled. You may cancel a task with the cancel method.

4. Thread Termination

Terminating a thread or threadpool gracefully is the responsibility of the application. A best practice is to provide a stop method that tells the thread to let submitted work complete and then exit.

If you have created the thread directly, then your implementation of shutdown needs to set a flag. The run method would check this flag and exit when necessary. Since a race condition is possible care should to taken to synchronize setting or reading the flag. Once the flag is set, any new work should be rejected and the thread should exit after already submitted work is completed. 

ExecutorService discussed above has a shutdown method which shuts down the threadpool after completing of already submitted tasks. No new tasks are accepted once this method is called.

public void stop() throws InterruptedException {
    executor.shutdown() ;
    executor.awaitTermination(timeout,TimeUnit.seconds) ;
}

5. Thread Interruption

Interruption is cooperative. Calling the interrupt method on thread merely sets the interrupted status flag in the thread to true. It does not automatically interrupt the thread. Implementations of well behaved blocking methods or long running methods should check this flag and exit early. Exiting early involves clearing the interrupted status and throwing an InterruptedException.

If your code calls a method that throws an InterruptedException, the code should either propagate the exception up the stack ( so someone more appropriate can handle it) or it should catch the Exception and set the interrupted status by calling the interrupt method.

The isInterrupted method returns the current interrupted status. The interrupted() method clears the status. These method names are a little confusing.

6. ConcurrentHashMap

In concurrent programs, it is better to use ConcurrentHashMap as opposed to synchronizedHashMap. See the blog ConcurrentHashMap.

7. Explicit locks

Explicit locks have several advantages over the synchronized keyword. For details read the blog When to use Explicit Locks ?

8. Compare and Swap

Locking in concurrent programs whether using synchronized or using explicit locks is expensive. The thread that is blocked waiting for lock might be suspended by operating system. When it acquires the lock it has to be rescheduled for execution and wait for its time slice.

Modern CPUs support the execution of some compound operations like compare and swap, fetch and increment, test and set without locking. When multiple threads try to operate on the same data, only one thread succeeds but the others do not block. This substantially increases scalability of concurrent programs.

Since JDK 5, Java has taken advantage of these atomic compound operations supported by CPUs in the form of Atomic variables and data structures like ConcurrentHashMap. Atomic classes discussed in 9 have various compound operations like CompareAndSet that take advantage of this.

9. Atomic variables

Operations like incrementing a variable or check and update, are compound operations. You first read, then increment/check and lastly write. For this to be threadsafe, locking is required. As mentioned above, locking is expensive and not scalable. The java.util.concurrent.atomic package has a set of classes that let you perform thread safe lock free operations on variables using techniques like item 8.

A get on an atomic variable gets the latest update from memory. A set on an atomic variable is available immediately to other threads to read. This is the same behavior as volatile variable and as per the Java memory model listed in 10.

10. Java Memory model

The Java memory model describes the rules that define how variables written to memory are seen, when such variables are written and read by multiple threads. It is a topic that is not well understood and many programmers are not aware of it. Read about it in the blog Java Memory model.

Thursday, April 25, 2013

10 reasons for considering the Scala programming language

Scala programming language has been around for a few years now and its popularity is increasing. Having programmed in Java for many years, I was initially skeptical whether we needed another programming language on the JVM. But after trying out Scala and reading about the language, I have had a change in heart. Whether your background is Java, C/C++, Ruby, python, C# or any other language, Scala has some very useful features that will force you to consider it, if you were looking for a programming language. This blog just lists the useful features. Programming examples will follow in subsequent blogs.

1. Objected oriented programming language (OOP)

Scala is a object oriented programming language. The benefits of OOP are well documented. A majority of programs today are written in some OO language. If you come from JAVA, C++, C# background, then you already know the benefits. If you are currently using a language that is not OO, then this might be one of the reasons for you to consider Scala. In Scala everything is an Object, unlike JAVA where primitives are not objects and the use of static methods lets you bypass the OO paradigm. OO programming enables you to write programs that have a structure that models that problem domain that the program is written for. This helps produce programs that are easier to read and maintain.

2. Functional programming

In contrast to OO programming, functional programming encourages the use of functions to do some work without changes in state or changes to the data it works on. Data is immutable. Functions take data as input and may produce new data as output. Additionally, a function is a type just like an Integer, String or any class. The advantage of functional programming is that there are no side effects - a function takes input and produces output , that is all. This make it easy to write error free programs that can scale or can be executed in parallel. Scala has very good support functional for programming. 

3. Static Types

In statically typed languages like C++, Java and Scala, every variable has a type and the type
determines what the program can do with the variable. If you try to multiply 2 Strings, the compilation process will flag that as error. Statically typed language protect the programmer
by detecting errors and from shooting himself in the foot. If you think strong typing is annoying and leads to verbose code, then you will be pleased to know that unlike Java, Scala supports type inference ( ability to detect type ) which reduces verbosity.

4. Brevity

Scala has features that enable the programmer to write compact code as opposed to verbose code. Less code mean fewer bugs and less time spent on maintenance.

//Java
public class Person {
  private String fname ;
  private String lname ;

  public Person(String first, String last) {
      fname = first ;
      lname = last ;
  }

}

In Scala the same class is written as

class Person(fname: String,lname: String) 

Scala supports type inference that helps avoid verbose code.  

// Java String is in the statement twice
public String[] stringarray = new String[5] ;

// Scala type is infered as Array of Strings
val stringarray = new Array[String](5)

5. JVM language

Scala is compiled to bytecode that runs on the Java virtual machine. Since the JVM is available on every platform, your scala code will run on windows , linux , mac os and any other platform for which a JVM is available.

Another advantage is the integration with Java. Java has a very rich class library. There are several open source projects that provide additional libraries for very useful functions. Java code can be called from Scala programs very easily, which means all those function rich libraries are available for your use in Scala.

val calendar = new java.util.GregorianCalendar()
print(java.lang.String.format("%1$ty%1$tm%1$td",cal))

will print todays date in format YYMMDD.

6.  Better support for concurrency

To write concurrent programs in JAVA, you had to deal with threads, the java memory model, locking ,  synchronization, deadlocks etc. Writing error free concurrent programs was difficult. Scala has a actor based programming model that shields the programmer from the issues you face in Java , C/C++. To write concurrent programs , you implement actors that send, receive and handle messages. The Actor model lets the programmer avoid sharing data between threads and the issues related to locking shared data.

7. Scalable programs

By avoiding locking in concurrent programs, Scala is able to exploit the parallelism in way that Java cannot. In Java, a recommended best practice for writing scalable code was to use immutable objects. With the Actor model is Scala, you use immutable objects as messages and have unsynchronized methods. Immutable object are also at the heart of functional programming (2) which Scala promotes.

How many times have we heard of a Ruby or Python application that has be rewritten in Java or C++ because it cannot scale to the increased demands of users ? With Scala, this will not be an issue.

8. Fast

Studies have shown that Scala is at least as fast as Java.
see http://research.google.com/pubs/pub37122.html

9. General purpose/multi-purpose

The brevity and compactness of Scale ensures that it can be used for scripting or rapid application development a la Ruby or Python. But the fact that it runs on JVM and its scalability features ensure that it can be used for complex applications.

10. It is getting more popular

This is a more non technical reason. Scala is getting more popular. More startups are moving to Scala. Many are skipping Java and going directly to Scala. If you are a Java programmer, learning Scala makes you more marketable. Even if you are not a Java programmer, learning Scala will open up a number of opportunities in the programming world. 

Thursday, April 4, 2013

Using HBase Part 2: Architecture


In this blog, let us take a quick look at some architectural details of HBase.

For an introduction to NoSql and HBase, read the following blogs.
What is NoSql ?
Using HBase

Internally HBase is a  a sparse, distributed, persistent, multidimensional sorted Map. While that sentence seems complicated, reading each word individually gives clarity.
sparse - some cells can be empty
distributed - data is partitioned across many hosts
persistent - stored to disk
multidimensional - more than 1 dimension (key,value,version)
Map - key and value
sorted - maps are generally not sorted but this one is

HBase uses HDFS to store the data.

An HBase table has rows and columns. Columns are grouped into column families. There is a version for each value. So table,row key, column family, column name, version are used to get to a value. Both row keys and values are byte[]s.

Table is sorted by row key, Within a column family, the columns are sorted. Storage is per column family. So logically related columns should be in a column family.

A Table is made of regions. A region has a subset of the rows in a table. A region can be described using tablename, start key, end key. A region is made up of one or more HDFS files.

The regions are managed by servers known as the region servers. There is a master server that assigns regions to region servers.

HBase has 2 catalog tables -ROOT- and .META. .META has information on all regions in the system. -ROOT- has information on .META. When a client wants to access data, these 2 tables are consulted to determine which region server has the region that should be used for this request. The client issues read/write requests to the region server directly.

HBase uses zookeeper to maintain cluster state. A simple diagram below shows the components of an HBase cluster.



















Logical view of a table:
The table is figure 2 has 2 column families: cf1 with columns colA and ColB, cf2 with columns ColC
and ColD. The value in each cell is uniquely identified by row key, column family, column name and a timestamp or version.

Logical view of RegionServer:




The rows of a table are in a Region. Region is the unit of allocation and is identified by a start key and end key. The regions are distributed across the region servers in the cluster.








Physical view of Region Server:

Each Region has one of more stores. Each Store is per column family. The memStore is where changes are stored in memory before writing to disk. The file store is the persistent store and is a file written to HDFS. The Hfile is described in the blog HFile.

Each RegionServer has a write ahead log (WAL) . Writes are first written to the WAL. If the region server crashed before memory is flushed to disk, the WAL is used to recover. This implies data is stored in memory and flushed to disk periodically. Changes are sorted while in memory.

Reads look for data in memStore first and then go to disk if necessary. Data is flushed to disk in 64 Mb chunks. This size is configurable. HFiles are merged to larger files. Sorting in memory and merging files makes it like a mergeSort.

For delete, the row is marked as deleted ( as opposed to physically removing it).

HBase provides ACID semantics at a row level. HBase does multi version concurrent updates, which means updates happen by creating a new version as opposed to overwriting existing row. Writers need to acquire a lock to write. Readers do not acquire a lock.To ensure consistent reads without locking, HBase assigns a write number to each write. The read returns data from the highest write number that is durable. Locks stored in memory in the region server. This is sufficient because all values for a row are in one region server. Transactions are committed in a serial order.

Sharding is automatic. Regions split when files reach a certain size.

Compaction step which run in background combines files, removes deleted data.

This concludes the introduction to HBase architecture.

Friday, March 15, 2013

Using HBase

HBase is a NoSQL database from the hadoop family. The NoSql concept is discussed in my blog at What is NoSql ? HBase is a column oriented key value store based on Google's Bigtable.

To recap,  you would be considering a NoSql database because your RDBMS is probably not able to meet your requirements because of one or more of the following reasons:
  • You application deals with billions and billion of rows of data
  • Application does a lot of writes
  • Reads require low latency
  • linear scalability with commodity hardware is required
  • You frequently need to add more columns or remove columns
There are several NoSql databases that can address one or more of these issues. In this article I provide an introduction to HBase. The goal is to help you get started evaluating whether HBase would be appropriate for your problem. This is introductory material. More details in subsequent blogs.

Main features of HBase are :

  • Built on hadoop and HDFS. If you are already using hadoop , then HBase can be viewed as an extension to your hadoop infrastructure that provides random reads and writes.
  •  A Simple data model based on keys , values and columns. More on this later. 
  • Scales linearly by adding commodity hardware 
  • Automatic partitioning of tables as they grow larger 
  • Classes available for integration with MapReduce 
  • Automatic failover support 
  • Support rowkey range scans 
Data Model
 
The main constructs of the model are  Table, rows, column family and columns.

Data is written and read from a Table. A Table has rows and column families. Each row has a key.

Each Column family has one or more columns. Columns in a column family are logically related. Each column has a name and value. When a Table is created, the column families have to be declared. But the columns in each family do not need to be defined and can be added on demand. Each column is referred to using the syntax columnFamily:column. For example, an age column in a userprofile column family is referred to as userprofile:age. For each row, storage space is taken up only for the columns written in that row.

Let us design a Hbase table to store User web browsing information. Each user has a unique id called userid. For each user we need to store

(1) some profile information like sex, age, geolocation, membership.
(2) For each partner website he visits, store the page types viewed, products viewed.
(3) For each partner website he visits, store products purchased , product put in shopping cart but not purchased.

Our structure might look like

{
userid1:{ // rowkey
    profile:{ // column family
          sex: male, // column , value
          age : 25,
          member: Y 
    },
    browsehistory: { // column family
          partner1.hp:23,    // visited partner1 homepage 23 times
          partner2.product.pr1 : 4 // viewed product pr1 4 times
    }
    shoppinghistory: { // column family
         partner3.pr3: 25.5 , // purchased pr3 from partner3 for $25.5
    } 
 
 }

 Let us design an Hbase table for the above structure.

Tablename : UserShoppingData. Since we will lookup data based on user, the key can be userid.

(1) ColumnFamily profile for profile information. Columns would be sex, age, member etc
(2) ColumnFamily browsehistory for browsing data. Columns are dynamic such as websitename.page or website.productid
(3) ColumnFamily shopping history for shopping data. Columns are dynamic.


The beauty is you can dynamically add columns. If visualizing this as columns is difficult, just think that you are dynamically adding key value pairs.  This kind of data is required in a typical internet shopper analytics application. 

HBase is an appropriate choice because you have several hundred million internet shoppers. That is several million rows. If you wanted to store data by date, you might make the key userid+date, in which case you might have even more rows - in the order of billions. Data is written as the user visits various internet shopping websites. Later the data might need to read with low latency to be able to show the user a promotion or advertisement based on his past history. A company I worked for in the past used a very popular RDBMS for such high volume writes and when ever the RDBMS was flooded with such write requests, the RDBMS would grind to a halt.

Let us use HBase shell to create the above table, insert some data into it and query it. 

Step 1: Download and install HBase from http://hbase.apache.org

Step 2: Start hbase
$ ./start-hbase.sh
starting master, logging to /Users/jk/hbase-0.94.5/bin/../logs/hbase-jk-master-jk.local.out
 

Step 3: Start hbase shell
$ ./hbase shell
HBase Shell; enter 'help' for list of supported commands.
Type "exit
" to leave the HBase Shell
Version 0.94.5, r1443843, Fri Feb  8 05:51:25 UTC 2013
hbase(main):001:0>

 

Step4: Create the table
hbase(main):004:0> create 'usershoppingdata','profile','browsehistory','shophistory'
0 row(s) in 3.9940 seconds


Step5: Insert some data
hbase(main):003:0> put 'usershoppingdata', 'userid1','profile:sex','male'
0 row(s) in 0.1990 seconds

hbase(main):004:0> put 'usershoppingdata', 'userid1','profile:age','25'
0 row(s) in 0.0090 seconds

hbase(main):005:0> put 'usershoppingdata', 'userid1','browsehistory:amazon.hp','11'
0 row(s) in 0.0100 seconds

hbase(main):006:0> put 'usershoppingdata', 'userid1','browsehistory:amazon.isbn123456','3'
0 row(s) in 0.0070 seconds

hbase(main):007:0> put 'usershoppingdata', 'userid1','shophistory:amazon.isbn123456','19.99'
0 row(s) in 0.0140 seconds

 

Step 6: Read the data
hbase(main):008:0> scan 'usershoppingdata'
ROW                        COLUMN+CELL                                                                
 userid1                   column=browsehistory:amazon.hp, timestamp=1362784343421, value=11          
 userid1                   column=browsehistory:amazon.isbn123456, timestamp=1362786676092, value=3   
 userid1                   column=profile:age, timestamp=1362784243334, value=25                      
 userid1                   column=profile:sex, timestamp=1362784225141, value=male                    
 userid1                   column=shophistory:amazon.isbn123456, timestamp=1362786706557, value=19.99 
1 row(s) in 0.1450 seconds
 

hbase(main):010:0> get 'usershoppingdata', 'userid1'
COLUMN                     CELL                                                                       
 browsehistory:amazon.hp   timestamp=1362784343421, value=11                                          
 browsehistory:amazon.isbn timestamp=1362786676092, value=3                                           
 123456                                                                                               
 profile:age               timestamp=1362784243334, value=25                                          
 profile:sex               timestamp=1362784225141, value=male                                        
 shophistory:amazon.isbn12 timestamp=1362786706557, value=19.99                                       
 3456                                                                                                 
5 row(s) in 0.0520 seconds
 

hbase(main):011:0> get 'usershoppingdata', 'userid1', 'browsehistory:amazon.hp'
COLUMN                     CELL                                                                       
 browsehistory:amazon.hp   timestamp=1362784343421, value=11                                          
1 row(s) in 0.0360 seconds


Step 7: Add few more rows

hbase(main):015:0> put 'usershoppingdata', 'userid2','profile:sex','male'
0 row(s) in 0.0070 seconds

hbase(main):016:0> put 'usershoppingdata', 'userid3','profile:sex','male'
0 row(s) in 0.0060 seconds

hbase(main):017:0> put 'usershoppingdata', 'userid4','profile:sex','male'
0 row(s) in 0.0330 seconds

hbase(main):018:0> put 'usershoppingdata', 'userid5','profile:sex','male'
0 row(s) in 0.0050 seconds


Step 8: Let us do some range scans on the row key
hbase(main):024:0> scan 'usershoppingdata', {STARTROW => 'u'}
ROW                        COLUMN+CELL                                                                
 userid1                   column=browsehistory:amazon.hp, timestamp=1362784343421, value=11          
 userid1                   column=browsehistory:amazon.isbn123456, timestamp=1362786676092, value=3   
 userid1                   column=profile:age, timestamp=1362784243334, value=25                      
 userid1                   column=profile:sex, timestamp=1362784225141, value=male                    
 userid1                   column=shophistory:amazon.isbn123456, timestamp=1362786706557, value=19.99 
 userid2                   column=profile:sex, timestamp=1362788377896, value=male                    
 userid3                   column=profile:sex, timestamp=1362788385501, value=male                    
 userid4                   column=profile:sex, timestamp=1362788392575, value=male                    
 userid5                   column=profile:sex, timestamp=1362788398087, value=male                    
5 row(s) in 0.0780 seconds


hbase(main):019:0> scan 'usershoppingdata', {STARTROW => 'userid3'}
ROW                        COLUMN+CELL                                                                
 userid3                   column=profile:sex, timestamp=1362788385501, value=male                    
 userid4                   column=profile:sex, timestamp=1362788392575, value=male                    
 userid5                   column=profile:sex, timestamp=1362788398087, value=male                    
3 row(s) in 0.0250 seconds

hbase(main):023:0> scan 'usershoppingdata', {STARTROW => 'userid3', STOPROW => 'userid5'}
ROW                        COLUMN+CELL                                                                
 userid3                   column=profile:sex, timestamp=1362788385501, value=male                    
 userid4                   column=profile:sex, timestamp=1362788392575, value=male                    
2 row(s) in 0.0160 seconds


The shell is very useful to playaround with the data model and get familiar with HBase. In a real world application , you might write code in a language like Java. There is more to HBase than this simple introduction. I will get into internals and architecture in future blogs.