Wednesday, December 19, 2012

JAVA Garbage Collection

Garbage (GC) collection is the process by which the java virtual machine frees up memory by releasing the memory taken up by objects that are no longer referenced by any other objects. Garbage collection is automatic. For simple applications, the developer does even need to be aware of garbage collection. But for applications with large memory footprint or are long running or have low latency requirements, some understanding is necessary to ensure that garbage collection does not interfere with the application. A common interference of garbage collection is that the application seems to stop responding or the time to respond goes up randomly. The articles lists a few important points every Java developer needs to know about garbage collection.

1.0  Generational GC

Since JDK 5 , the garbage collectors are what are called generational collectors. The heap is divided into regions based on the age of the objects. The young generation has objects that are short lived. The tenured generation has objects that are long lived. All objects are first created in the young region and after a while if they are alive, they are moved to the tenured generation. Garbage collection of the young region happens frequently and is generally fast. GC for the tenured region happens less frequently. Since most objects are short lived, this makes the GC more efficient.

2.0  Types of collectors

Serial Collector : Garbage from both young and tenured regions is done serially and while this happens your application is paused. This is the default collector on single cpu machines and for small heaps sizes ( less that 2G) . This is fine if your application does not care about pauses.

Parallel Collector: This is the default collector on server class machines ( multiple CPUs and greater than 2G heap size). Multiple threads/cpus are used to do garbage collection in parallel for the young region. This makes collection faster. But the application is still paused when GC happens. For the tenured region, the GC is serial as in a serial collector.

Parallel Compacting Collector: GC for the young region is the same as parallel collector and uses multiple threads. However GC for tenured region happens in parallel using multiple CPUs. Application is paused when GC happens.

Concurrent Mark Sweep Collector (CMS): For young region, it is same as in parallel collectors. But for tenured region,  most of the time, GC runs concurrently with the application. The application pauses during GC are expected to be much shorter than the other collectors. This is an ideal choice for applications that cannot tolerate long pauses.

3.0 Understanding GC in your application

Before you try to tune your applications GC, it is important to understand when GC is happening, how much time it takes and how much memory it is reclaiming. The JVM provides the following options to log GC activity.

The -XX:+PrintGCDetails prints GC details described below. The -XX:+PrintGCTimeStamps prints the time from the start of the JVM to when each GC happened. The -Xloggc:gcfilename.log writes the log to gcfilename.log.

In the gc log, you will see a number of lines like

11.561: [GC [PSYoungGen: 868524K->294158K(1198848K)] 1303221K->728855K(4694144K), 0.3640750 secs] [Times: user=1.44 sys=0.02, real=0.37 secs]

This indicates that a GC of the young region occurred at time 11.561 secs from start. The young region was reduced from 868524k to 294158k (66%).  The number (1198848K) is the memory allocated to the young region. The total heap was reduced from 1303221K to 728855K or 44%. The number (4694144K) is the total heap. This GC took .37 secs.

You will see a few lines like

3602.170: [Full GC (System) [PSYoungGen: 16250K->0K(1662080K)] [PSOldGen: 1594630K->1578665K(3495296K)] 1610881K->1578665K(5157376K) [PSPermGen: 22314K->22314K(35904K)], 3.4836190 secs] [Times: user=3.45 sys=0.03, real=3.48 secs]

This indicates that a full GC occurred at 3602.17 secs from the start. The young region was reduced from 16250K to 0K. The old or tenured region was reduced from  1594630K to 1578665K. The total heap was reduced from 1610881K to 1578665K. The GC took 3.48 sec.

The GCViewer is free tool to view GC logs graphically.
GC log viewed in GCViewer

The very small black lines at the bottom indicate the small GCs. The tall black lines at the hourly mark are the Full GCs. The blue peaks are lines indicating how the used heap goes up and goes down after a GC. The ruby red line just below the blue spikes shows the growth of the tenured region. You can see that the tenured region drops after a full GC. Full GCs take a lot of time and you want to reduce the frequency with which they occur.

4.0 Tuning options

 The JVM offers a few knobs that one can turn to tune the GC in a way most suitable to your machine and your application.

-Xms -Xmx options are used to set the initial and maximum size of the heap.  Maximum heap size should be less that physical memory on the machine to avoid paging and one should also leave aside memory for the operating system and other applications running on the same machine. While bigger heap and more memory are good because the GC has to collect less often, when it does have to collect, it has to do more work and the GC pauses could be longer.

–XX:+UseSerialGC
–XX:+UseParallelGC
–XX:+UseParallelOldGC
–XX:+UseConcMarkSweepGC

These options are used to select the GC. SerialGC and ParallelGC are selected by default depending on machine type as described earlier.  Applications that have low latency requirements and cannot tolerate long GC pauses should consider switching to the Concurrent Mark Sweep GC.

-XX:NewSize=n is used to set the default initial size of the young generation. Most applications have many short lived objects and few long lived objects. The newsize should be large enough that short lived objects fit into the young generation and are garbage collected in the small GCs. If the young generation is too small, short lived object get moved to the tenured region which leads to longer Full GCs.

-XX:MaxPauseTimeMillis is a hint to the GC as to the desired maximum pause time. This is just a hint and may or may not be honoured.

5.0 References

There are many other tuning options and the following documents from Oracle are good references on tuning options as well as garbage collection in general:

1. http://www.oracle.com/technetwork/java/javase/gc-tuning-6-140523.html
2. http://www.oracle.com/technetwork/java/javase/tech/memorymanagement-whitepaper-1-150020.pdf





Sunday, November 18, 2012

Spring JAVA config tutorial

The classic way of configuring beans in Spring is using XML. But many programmers find switching between XML and java code annoying. Having to go into XML to debug dependencies and track down implementation classes has turned many programmers away from Spring. Since version 3.0, Spring has supported the ability to do configuration using classes and annotations without the need to use XML. In XML , to define a bean, you added to the application.xml
<bean id="Hello" class="com.mj.Hello"/>
To use the bean you wrote code like
ApplicationContext ac = new ClassPathXmlApplicationContext("application.xml") ; 
BeanFactory bf = (BeanFactory) ac ; 
Hello h = bf.getBean("Hello")
h.someMethod() ;
Let us write a new spring application using no XML.

Step1: Define the bean interface and implementation
public interface Greeting {
    public String getMessage() ;
}

public class NewYearGreeting implements Greeting {
    public String getMessage() {
        return "Happy New Year" ;
    }
}
public class BirthDayGreeting  implements Greeting {
    public String getMessage() {
        return "Happy Birthday" ;
    }
}
Step 2: Define the bean configuration in JAVA
The bean definitions are created by writing a class and annotating it with @Configuration. The individual beans are defined by annotating the method that creates the bean with @Bean.
@Configuration
public class GreetingSpringConfig {
    @Bean(name="newyear")
    public Greeting newyearGreeting() {
        return new NewYearGreeting() ;
    }
    @Bean(name="birthday")
    public Greeting birthdayGreeting() {
        return new BirthDayGreeting() ;
    }
 } 
Step 3: Use the beans from a client
 public class GreetingSample {
    public static void main(String args[]) {
        ApplicationContext ac = new    
        AnnotationConfigApplicationContext(GreetingSpringConfig.class) ;
        Greeting g = (Greeting) ac.getBean("newyear") ;
        System.out.println(g.getMessage()) ; 
        g = (Greeting) ac.getBean("birthday") ;
        System.out.println(g.getMessage()) ; 
} 
Note that instead of using ClassPathXmlApplicationContext ,we used AnnotationConfigApplicationContext. AnnotionConfigApplicationContext can process not just @Configuration annotated classes, but also JSR 330 annotated classes. If you don'nt like switching between JAVA & XML , then Java config is simple way of wiring your spring beans.

Friday, October 19, 2012

JAVA Synchronized HashMap vs ConcurrentHashMap

A synchronized HashMap is a Map returned by calling  synchronizedMap methods of java.util.Collections class.

Map syncMap = Collections.synchronizedMap(new HashMap()) ;

The characteristics of synchronized collections are:

1. Each method is synchronized using an object level lock. So the get and put methods on syncMap acquire a lock on syncMap.

2. Compound operations such as check -then - update or iterating over the collection require the client to explicitly acquire a lock on the collection object.

synchronized(syncMap) {
     Integer val = syncMap.get(key) ;
     if ( val == null) {
          syncMap.put(key,  newvalue) ;

}

Without synchronization, multiple threads calling the code can lead to inconsistent values.

3. Locking the entire collection is a performance overhead. While one thread holds on to the lock, no other thread can use the collection.

4. HashMap and other collections from java.util.collections throw ConcurrentModificationException if a thread tries to modify a collection while another thread is iterating over it. The recommended approach is to acquire a lock before iterating over the map.

ConcurrentHashMap was introduced in JDK 5.

The characteristics of ConcurrentHashMap are:

1. There is no locking at the object level. The locking is at a much finer granularity. For a concurrentHashMap , the locks may be at a hashmap bucket level.

2. The effect of lower level locking is that you can have concurrent readers and writers which is not possible for synchronized collections. This leads to much more scalability.

3. Since there no locking at the object level,  additional atomic methods are provided for some compound operations. The ConcurrentHashMap has methods putIfAbsent, remove, replace all of which require checking a key or value and then performing a put or remove.

The code above can be replaced by

ConcurrentHashMap concMap ;
.
.
.
 concMap.putIfAbsent(key,newvalue) ;

4. ConcurrentHashMap does not throw a ConcurrentModificationException if one thread tries to modify it while another is iterating over it. The iterator returned by ConcurrentHashMap is an iterator on a snapshot of the data when the iterator was created. It may or may not have changes made by other threads after the iterator was created.

In general, using ConcurrentHashMap instead of synchronized Map gives you much better scalability and you do not have to explicitly synchronize on the map object.



Sunday, September 16, 2012

Hadoop 2.x Tutorial

Hadoop 2.x release involves many changes to Hadoop and MapReduce. The centralized JobTracker service is replaced with a ResourceManager that manages the resources in the cluster and an ApplicationManager that manages the application lifecycle. These architectural changes enable hadoop to scale to much larger clusters. A new release also has minor changes to scripts,directories and environment variables necessary to get started. This is a getting started tutorial for 2.x. The intended audience is someone who is completely new to hadoop and needs a jumpstart or someone who has played a little bit with a previous version and wants to start using 2.x.  The emphasis on getting hadoop running and not necessarily explaining concepts which is covered in many other blogs.

In this tutorial we will

(1) Setup a hadoop in a single node environment
(2) Create and move files to HDFS
(3) Write and execute a simple MapReduce Application

Step 1: Download Hadoop and install
Download the current 2.x.x release from http://hadoop.apache.org/releases.html.
I downloaded hadoop-2.0.1-alpha.tar.gz.
Untar the file to a directory say ~/hadoop-2.0.1-alpha.

Step 2: Set the following environment variables
hadoop-2.0.1-alpha$ export HADOOP_HOME=~/hadoop-2.0.1-alpha
hadoop-2.0.1-alpha$ export HADOOP_MAPRED_HOME=~/hadoop-2.0.1-alpha
hadoop-2.0.1-alpha$ export HADOOP_COMMON_HOME=~/hadoop-2.0.1-alpha
hadoop-2.0.1-alpha$ export HADOOP_HDFS_HOME=~/hadoop-2.0.1-alpha
hadoop-2.0.1-alpha$ export YARN_HOME=~/hadoop-2.0.1-alpha
hadoop-2.0.1-alpha$ export HADOOP_CONF_DIR=~/hadoop-2.0.1-alpha/etc/hadoop

If these environment variables are not setup correctly, Step 4 might fail.

Step 3: Update the configuration files
 hdfs-site.xml
Add the following configuration to etc/hadoop/hdfs-site.xml. If you do not set dfs.namenode.name.dir and dfs.datanote.data.dir explicitly, hadoop will default to a temp directory that the OS may clean up on restart and you will lose data. This is a common omission for newbees.
 <?xml version="1.0" encoding="UTF-8"?>
<configuration>
<property>
    <name>dfs.replication</name>
    <value>1</value>
  </property>
  <property>
    <name>dfs.namenode.name.dir</name>
    <value>file:/Users/joe/hadoop-hdfs201/data/hdfs/namenode</value>
  </property>
  <property>
    <name>dfs.datanode.data.dir</name>
    <value>file:/Users/joe/hadoop-hdfs201/data/hdfs/datanode</value>
  </property>
</configuration>

core-site.xml
Add the following to etc/hadoop/core-site.xml.
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<property>
    <name>fs.default.name</name>
    <value>hdfs://localhost:9000</value>
  </property>
</configuration>
yarn-site.xml
Add the following to etc/hadoop/yarn-site.xml.
<?xml version="1.0"?>
<configuration>
  <property>
   
<name>yarn.nodemanager.aux-services</name>
    <value>mapreduce.shuffle</value>
  </property>
 <property>
    <name>yarn.nodemanager.aux-services.mapreduce.shuffle.class</name>
    <value>org.apache.hadoop.mapred.ShuffleHandler</value>
  </property>
</configuration>

mapred-site.xml
Add the following to etc/hadoop/mapred-site.xml.
<?xml version="1.0"?>
<configuration>
  <property>
    <name>mapreduce.framework.name</name>
    <value>yarn</value>
  </property>
</configuration>
 

Step 4: Start the processes.

Change to the directory where hadoop is installed.
cd ~/hadoop-2.0.1-alpha

If you are running hadoop for the first time, the following command will format HDFS. Do not run this everytime as it formats and thus deletes any existing data

hadoop-2.0.1-alpha$ bin/hadoop namenode -format

Start the namenode.
hadoop-2.0.1-alpha$ sbin/hadoop-daemon.sh start namenode

Start the datanode.
hadoop-2.0.1-alpha$ sbin/hadoop-daemon.sh start datanode

In hadoop 2.x , there is no jobtracker. Instead there is a resourcemanager and a nodemanager.
Start the resourcemanager.
hadoop-2.0.1-alpha$ sbin/yarn-daemon.sh start resourcemanager

Start the nodemanager.
hadoop-2.0.1-alpha$ sbin/yarn-daemon.sh start nodemanager

Start the history server.
hadoop-2.0.1-alpha$ sbin/mr-jobhistory-daemon.sh start historyserver

Type jps. It lists the java processes running. Check that all the processes are started

hadoop-2.0.1-alpha$ jps

1380 DataNode
1558 Jps
1433 ResourceManager
1536 JobHistoryServer
1335 NameNode
1849 NodeManager

Once I ran into a problem where the mapreduce job was being accepted but never executed. In looking at the logs, I found that the NodeManager had not started.  The jps command is a good check to ensure all necessary processes are started.

Step 5: Get familiar with HDFS
The HDFS commands are documented in the older releases of hadoop
http://hadoop.apache.org/docs/r1.0.3/file_system_shell.html

hadoop-2.0.1-alpha$ bin/hadoop fs -ls

will list the home directory. If you are user joe. HDFS creates a /user/joe directory for you. Any files or directories you create will be created here.

hadoop-2.0.1-alpha$ bin/hadoop fs -mkdir /user/joe/input
creates a directory input

In the local filesystem create a file app.log with the data

user01|1|2|3|4|5
user02|1|2|3|4|5
user03|1|2|3|4|5
user01|1|2|3|4|5
user02|1|2|3|4|5
user01|1|2|3|4|5
user03|1|2|3|4|5
user01|1|2|3|4|5
user04|1|2|3|4|5
user01|1|2|3|4|5

let us pretend this is a log file from a web application where for each request we have logged userid and some additional data. We will later use this .as input for a MapReduce program.
You can move it to hdfs using the command

hadoop-2.0.1-alpha$ bin/hadoop fs -moveFromLocal ~/projects/app.log /user/manoj/input/

To print the file just moved to hdfs
hadoop-2.0.1-alpha$ bin/hadoop fs -cat /user/manoj/input/app.log

Step 6: Create a MapReduce program
The MapReduce programming model is explained in the blog What is MapReduce ?. Let us write a simple mapreduce program that uses that app.log we created in step5 as input and outputs the number of times a user visited the site. UserCountMap reads a line and outputs (username,1).  UserCountReducer takes as input (username, list of 1s) and outputs (username, sum).

public class UserCount {
    public static class UserCountMap extends Mapper    
        public void map(LongWritable key, Text Value, Context context)
                        throws IOException, InterruptedException {

            String line = Value.toString() ;
            String tokens[] = line.split("\\|") ;           
            if (tokens.length > 0) {               
                context.write(new Text(tokens[0]),new IntWritable(1)) ;               
            }
        }     
    }
    public static class UserCountReducer extends Reducer  
        public void reduce(Text key, Iterable values, Context context)
                    throws IOException, InterruptedException {
                 int count = 0 ;      
            for (IntWritable value : values) {
                count = count + value.get() ;
            }
            context.write(key, new IntWritable(count)) ;
        }   
    }

    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();
        Job job = new Job(conf);
        job.setJarByClass(UserCount.class) ;       
        FileInputFormat.addInputPath(job, new Path(args[0])) ;
        FileOutputFormat.setOutputPath(job, new Path(args[1])) ;       
        job.setMapperClass(UserCountMap.class) ;
        job.setReducerClass(UserCountReducer.class) ;       
        job.setOutputKeyClass(Text.class) ;
        job.setOutputValueClass(IntWritable.class) ;
        System.exit(job.waitForCompletion(true) ? 0 : 1) ;       
    }   
      
}

Compile the program and package into a jar called say usercount.jar.

Step 7: Run the madreduce program

hadoop-2.0.1-alpha$ bin/hadoop jar ~/projects/usercount.jar com.mj.UserCount /user/joe/input /user/joe/output

you should see output some of which is shown below.

12/09/12 17:41:28 INFO mapreduce.Job: The url to track the job: http://joe.local:8088/proxy/application_1347494786422_0003/
12/09/12 17:41:28 INFO mapreduce.Job: Running job: job_1347494786422_0003
12/09/12 17:41:37 INFO mapreduce.Job: Job job_1347494786422_0003 running in uber mode : false
12/09/12 17:41:37 INFO mapreduce.Job:  map 0% reduce 0%
12/09/12 17:41:43 INFO mapreduce.Job:  map 100% reduce 0%
12/09/12 17:41:45 INFO mapreduce.Job:  map 100% reduce 100%
12/09/12 17:41:45 INFO mapreduce.Job: Job job_1347494786422_0003 completed successfully

You can see the status of the job at http:/localhost:8088

hadoop-2.0.1-alpha$ bin/hadoop fs -ls /user/joe/output
212/09/12 17:45:29 WARN util.KerberosName: Kerberos krb5 configuration not found, setting default realm to empty
Found 2 items
-rw-r--r--   1 manoj supergroup          0 2012-09-12 17:41 /user/joe/output/_SUCCESS
-rw-r--r--   1 manoj supergroup         39 2012-09-12 17:41 /user/joe/output/part-r-00000

The file part-r-00000 will have the output which is

user01    5
user02    2
user03    2
user04    1

Hoping these steps help jumpstart you with hadoop and get you going on your way to write more complex Map Reduce jobs to analyze your big data.

Update 2/28/2014: For how to setup hadoop 2.x Cluster , see Hadoop 2.x YARN cluster setup tutorial







Friday, August 17, 2012

JAVA enum tutorial

Java language has supported enum type for several releases. Yet, many programmers do not use it or do not fully understand all features of enum.

We still see a lot of code like this:

public static final int LIGHT = 1 ;
public static final int MEDIUM = 2 ;
public static final int HEAVY = 3 ;
public static final int SUPERHEAVY = 4 ;

int weight_range  = getRange():
if (weight_range == LIGHT  ) {

} else if (weight_range == MEDIUM) {

} else if (weight_range == HEAVY) {

}

Such code is error prone. It lacks type safety. If the weight_range is serialized/deserialized somewhere you are going to have to remember what 1,2,3 represent.

Java enum is a cleaner type safe way of working with constants. It is a type that has a fixed set of constant fields that are instances of the type.

1. Defining enum

Defining enum is like defining a class.

public enum WeightRange {
 LIGHT, MEDIUM, HEAVY,SUPERHEAVY 
} ;

defines a WeightRange enum type with 4 constant fields.

2. Creating a variable of type enum

WeightRange wclass = WeightRange.Medium ;

is like declaring any other type.

3. Using the enum

WeightRange boxer_class  = getWtRangeFromSomeWhere();

if (boxer_class == WeightRange.LIGHT) {

} else if (boxer_class == WeightRange.HEAVY) {

}

is more type safe than the code without enums.

4. Enum is a class.

As mentioned above, enum is a class. Every enum type extends java.lang.Enum.  All enum types thus can have additional fields and constructors.

The above WeightRange enum can be enhanced to add fields for low and high range. The values are provided in the constructor.

public enum WeightRange {
   
    LIGHT(0,70) ,
    MEDIUM(71,150),
    HEAVY(151,225),
    SUPERHEAVY(226,350) ;
   
    private final int low ;
    private final int high ;   
   
    WeightRange(int low, int high) {       
        this.low = low ;
        this.high = high ; 
    }
}

5. Enum can also have methods.

In the above enum we can add a method to check if a given weight is within a weight range.

public boolean isInRange(int wt) {       
        if (wt >= low && wt <= high)
            return true ;
        else
            return false ;
}

5. It can have static factory method that takes a weight as parameter and returns the correct enum.

public static WeightRange getWeightRange(int weight) {
       
        if (weight <= 70)
            return LIGHT ;
        else if (weight <= 150)
            return MEDIUM ;
        else if (weight <= 225)
            return HEAVY ;
        else
            return SUPERHEAVY ;        
}

6.  Calling toString on an enum value returns the name used to define the constant field.

System.out.println(WeightRange.LIGHT) ; 
prints LIGHT

7. In converse, enum can be constructed using a String using the valueOf method.

WeightRange w3 = WeightRange.valueOf("MEDIUM") ;
System.out.println(w3) ;
will print MEDIUM

8. You can iterate over the constants defined in the enum.

for (WeightRange r : WeightRange.values()) {
            System.out.println(r) ;
}

9. enum constants are final.

WeightRange.LIGHT = WeighRange.Heavy ; // compilation error

10. The only instances of an enum that can be created are the constants defined in the enum defintion.

WeightRange r = new WeightRange(12,100) ; // compilation error

Next time you need a fixed set of constants, consider using enum. It is type safe, leads to better code and your constants are within a namespace.

Tuesday, July 17, 2012

Scaling The Relational Database

Scalability of a web application is the ability to handle increased load whether it is requests or number of users or data without having to redesign or re-architect the application. Scalability should not be confused with performance or raw speed.

One can scale by using bigger components : bigger machine, more memory, more cpu. This is vertical scaling. One can also scale by adding more copies of the same component to share the workload. This is horizontal scaling.

In a typical multi tiered web application, the middle tier, where the application logic executes, scales easily by going stateless or using a session cookie with state stored to a centralized storage. The middle tier thus scales horizontally by just adding more application servers. In reality, it has just punted the problem down the stack to the centralized storage which generally is a relational database. The database thus becomes that hardest component to scale.

The typical multi tiered web application starts with the architecture shown in figure 1. As the application become popular, the number of users increase, the number of concurrent reads and writes increase. The application slows down to a crawl and eventually grinds to halt like a braking train. In the rest of this article we discuss some strategies to avoid such a situation.

To understand issues involved in scaling the database, it is useful to think in terms of the two primary client operations on a database; READ and WRITE. Clients either read from a database or write to the database. READs can be scaled easily by adding additional servers, replicating the data and distributing the read requests across servers. Scaling WRITEs is much more complicated. Simply distributing write requests across servers will not work because the it is difficult to maintain consistency of data across servers.


Scaling reads: Master - Slave configuration


As mentioned above, a simple master slave configuration as shown in figure 2 will scale READs. In most web applications 80% of the traffic is read requests and 20% write request. Hence most of time, this configuration provides significant relief.



All WRITE requests are sent only to the master. READ requests are sent to the slaves. The master is replicated to the slaves. Note that a READ from a slave is not any faster than a READ from a master. This is because every WRITE on the master leads to a WRITE on the slave because of replication. However because there can be multiple slaves and READ request distributed across slaves, the system as whole has higher through put. As the number of READ requests go up, you can continue to scale by simply adding more slaves.


Master - Master configuration


In the master - master configuration shown in figure 3, the two servers are setup to replicate to each other. READ and WRITE requests are sent to both servers. While this gives the appearance of scaling WRITEs as well, this approach has some serious disadvantages.

Since there can be a replication lag, the data in the servers might not be identical in certain time windows, leading to read inconsistency. If any columns are ids that needs to be incremented, the logic will need to be implemented at an application level, since that has to be coordinated across the servers. You will not be able to use database features like auto incrementing ids. This does not scale beyond a couple of  servers as each WRITE on every server has to be replicated to every other server.

Scaling writes : partitioning the database


The only way to scale WRITEs is to partition the database. The WRITE requests are sent to different instances of the database which may have the same or different schema. There is no replication or sharing between the instances.

Figure 4 shows an architecture where the database is partitioned by moving some of the tables to different database instances. Tables that needs joins need to be on the same instance. You cannot do SQL joins across servers. This approach works when you have many tables in the schema and some of the tables are not really related to others. This increases application complexity. The application needs connections to several instances and be aware of which instance has which table.

If you have a schema with few tables , but a large number of rows in the table, then another strategy is to keep the schema in instances the same but partition the data across servers based on some key range. For example , a USER table which has a billion rows with users from the all over the world can be partitioned across instances based on the geographical location of the user, say the continent. Figure 5 shows such an architecture. Again this requires the application logic to be smart enough to know which database instance to connect to , based on say a key value. To keep application logic simple, it helps to write a layer that handles the partitioning for the application.

Scaling even further : NoSql

If your data is even larger. of the order of  petabytes or several hundred terrabytes and ACID consistancy is not a hard requirement, you might consider NoSql datastores as discussed in What is NoSql ?

Tuesday, June 12, 2012

5 Tips for building low latency web applications

Low latency applications are those that need to service requests in a few milliseconds or microseconds. Examples of a low latency application are
  • Servers that serve ads to be shown on web pages. If you don'nt serve the ad as per SLA dictated by publishers, you will not be given the opportunity to serve the ad. Typically the server has a few milliseconds to respond.
  • Servers that participate in real time bidding. Again, taking internet advertising as an example, if you are bidding for the opportunity to show an ad, you have just a few milli-seconds to make a bid.
  • Applications that provide real time quotes such as a travel portal. The portal makes requests to its partner agents. For the portal to be usable, the quotes need to displayed before the user loses interest and moves on to another travel site. Typically the portal app gives each partner a few milliseconds to respond. Otherwise the response is ignored.
Here are 5 simple guidelines to remember when building low latency applications.

1. Read from Memory: Keep the data required to serve the request in memory. Keep as much as possible in local memory and then use external caches like memcached, Ehcache and the more recent NoSql key value stores. Having to read data from a database or filesystem is slow and should be avoided.

2. Write asynchronously: The thread that services the request should never be involved in writing to any external storage such database or disk. The main thread should hand off the write task to worker threads that do the writing asynchronously.

3. Avoid locks and contention: Design the application in way that multiple threads are not contending for the same data and requiring locks. This is generally not an issue if the application mostly reads data. But multiple threads trying to write requires accquiring locks that slows down the application. You should consider a design where write operations are delegated to a single thread. You might need to relax the requirement of ACID properties on data. In many cases, your users will be able to tolerate data that becomes eventually consistent.

4. Architect with loosely coupled components: When tuning for low latency, you might need to co locate certain components. Co location reduces hops and hence latency. In other cases, you might need to geographically distribute components, so that they are closer to the end user. An application built with loosely coupled components can accommodate such physical changes without requiring a rewrite of the application.

5. Horizontal scalability: Architect the application so that you can scale horizontally as the load on the application goes up. Even if you read all data from a cache, as the number of requests go up and size of data increases, because of things like garbage collection(in the case of JAVA) the latency will go up. To handle more requests without increasing latency, you will need to add more servers. If you are using a key-value store, you might need to shard the data across multiple servers to keep sub millisecond response time.

Most importantly, building and maintaining low latency is an iterative process that involves designing correctly, building, measuring performance and then tuning by looking at the design and code for improvements.

Wednesday, May 16, 2012

When to use explicit Locks in JAVA ?

Prior to JDK 5, the only way to protect data from concurrent access was to use the synchronized keyword. The limitations of using synchronized are

(1) A thread that tries to acquire a lock has to wait till it gets the lock. There is no way to timeout.
(2) A thread that is waiting for a lock cannot be interrupted.
(3) Since synchronized applies to a block of code, the lock has to be acquired and released in the same block. While this is good most of the time, there are cases where you need the flexibility of acquiring and releasing the lock in different blocks.

The Lock interfaces and classes are well documented at java.util.concurrent.locks
The basic usage of the new Lock interface is

Lock l = new ReentrantLock() ;
l.lock() ;
try {
// update
} finally {
l.unlock() ;
}

You might be tempted to say that this can be done using synchronized. However the new Lock interface has several additional features.

1. Non blocking method
The trylock() method (without params) acquires the lock if it is available. If it is not available it returns immediately. This is very useful in avoiding deadlocks when you are trying to acquire multiple locks .

2. Timed
trylock(time......)  acquires the lock if it is free within the time. Otherwise it returns false. The thread can be interrupted during the wait time.

This is useful when you have service time requirements such as in real time bidding. Say the method needs to response in 10 milli secs, otherwise the response is of no use because the bid is lost.

3. Interruptible
The lockInterruptibly method will try to acquire the lock till it is interrupted.
This is useful in implementing abort or cancel features.

4. Non block structured locking
You can acquire the lock in one method and release it in another. Or you can wrap the lock and unlock in your domain specific accquireLock() and releaseLock() methods.

This is useful in avoiding race conditions on read,update,save operations on data stored in caches. The synchronization provided by ConcurrentHashMap or Synchronized Map protects only for the duration of get and set operation. Not while the data is modified.

cache.acquireLock(key) ;
Data d = cache.get(key) ;
d.update1() ;
d.update2() ;
d.update3() ;
cache.put(key,d) ;
cache.releaseLock(key) ;

Acquiring and releasing the lock are abstracted away in acquirelock and releaseLock methods.

5. Read /Write Locks
This is my favorite feature. The ReadWriteLock interface exposes 2 locks objects. A read lock and a write lock.

You acquire the read lock when all you are doing is reading. Multiple threads can acquire the read lock.By allowing multiple readers, you achieve greater concurrency. A read lock cannot be acquired while a write lock is held by another thread.

You acquire the write lock when you need to write data. Only one thread can acquire a write lock at a time. A write lock cannot be acquired while other threads have acquired read locks.

Is the use of synchronized obsolete ?
Not really. Synchronized blocks are simple to use and are widely used. Most programmers are very familiar with its usage. They are less error prone as the lock is automatically release. It is reasonable to continue using synchronized for the the simpler use cases of locking. But if you need any of the features described above, using explicit locks is well worth the extra coding. Performance wise there is not much difference though studies have shown that explicit locks are slightly faster.

Friday, April 20, 2012

Build distributed applications using Spring HTTP Invoker

Buliding distributed applications involves calling methods on objects that are remote - on different machines and/or different JVMs. Code running on machine A invokes a method on an object running on machine B and it works just as if the caller and the target were in the same JVM. In the past, CORBA, RMI and EJBs were technologies used for remote invocation. But they are complicated to use. The protocols are binary and difficult to troubleshoot. Also they are not suitable for use across intranets because they use ports that networks admins hate to open.

Since 2000, SOAP based web services enabled remote invocation using HTTP as the transport and XML for payload. While HTTP solved the problems of troubleshooting and firewalls, the performance of using XML was not very good. Some developers prefer web services using JSON over HTTP, but that requires modeling the data in JSON.

Spring HTTP Invoker is a remoting mechanism, where the programming model is plain java, but HTTP is used as the transport and the payload is created using java serialization. Spring HTTP gives developers the benefit of HTTP without the performance overhead of XML based web services. In the rest of the article, we explain with a simple example remoting using Sprint HTTP Invokers.

For this tutorial you will need

(1) Spring
(2) JDK
(3) Eclipse
(4) Tomcat

In this example, we create a service AccountService, with a method getAccount. The service is deployed to tomcat. We invoke the getAccount method from J2SE client in a different JVM. You may download the full source code for this sample at RemoteService

Step 1: create the service and implementation

Let us define an interface AccountService and its implementation AccountServiceImpl in plain JAVA.
public interface AccountService {
  public Account getAccount(int id) ;
}
public class AccountServiceImpl implements AccountService {
 @Override
 public Account getAccount(int id) {
  // TODO Auto-generated method stub
  return new Account(id,"testacct",100,2999.99F) ;
 }
}
Step 2: Spring Application context for server side
The Spring application context is defined in the file remoting-servlet.xml.
<beans>
    <bean id="accountService" class="com.mj.account.AccountServiceImpl"/>
    <bean> name="/AccountService" class="org.springframework.remoting.httpinvoker.HttpInvokerServiceExporter">
       <property name="service" ref="accountService"/>
       <property name="serviceInterface" value="com.mj.account.AccountService"/>
   </bean>
</beans>
The first bean accountService needs no explanation - it is a simple spring bean. The 2nd exports a bean /AccountService. This is exported by HttpInvokerServiceExporter, a Spring provided class. The service exported is accountService defined by the 1st bean. Since we will be invoking using HTTP, the url is /AccountService. (by convention).

Step 3: package as war and deploy to tomcat

The classes and context xml needs to be packaged as a war and deployed to tomcat. The standard spring MVC dispatcherServlet needs to be wired into the web.xml.
<servlet>
        <servlet-name>remoting</servlet-name>
        <servlet-class>
            org.springframework.web.servlet.DispatcherServlet
        </servlet-class>
        <load-on-startup>1</load-on-startup>
</servlet>
<servlet-mapping>
        <servlet-name>remoting</servlet-name>
        <url-pattern>/*</url-pattern>
</servlet-mapping>
Step 4: Create a application context for the client with the entry
<bean id="AccountProxy" class="org.springframework.remoting.httpinvoker.HttpInvokerProxyFactoryBean">
     <property name="serviceUrl" value="http://localhost:8080/remoteservice/AccountService"/>
     <property name="serviceInterface" value="com.mj.account.AccountService"/>
</bean>
This defines a bean AccountProxy whose implementation is the HttpInvokerProxyFactorybean, which will create the Http invoker. The url to invoke is http://localhost:8080/remoteservice/AccountService. http://localhost:8080 is where the target web server is listening. /remoteservice is the tomcat context ( I deployed the service as remoteservice.war). We defined /AccountService as the url for our bean in remoting-servlet.xml.

Step 5: Remote invoke the service
public class AccountServiceClient {
 public static void main(String[] args) {
   ApplicationContext applicationContext = new ClassPathXmlApplicationContext("remoteclient.xml");  
   AccountService testService = (AccountService) applicationContext.getBean("AccountProxy");  
   Account a = testService.getAccount(25) ;   
   System.out.println(a) ;
 }
}
You should see the output 25 testacct 100 2999.99

In summary, it is very easy to do remote invocation and distribute services using Spring HTTP invokers. You get the ease of plain JAVA programming and the ease of maintainence and troubleshooting because of HTTP. There is simply no reason to use RMI like protocols any more.

Sunday, March 18, 2012

High Availability for Web applications

As more mission critical applications move to the cloud, making the application highly available becomes super critical. An application not available for whatever reason, web server down, database down etc mean lost users, lost revenue that can be devastating to your business. In this blog we examine some basic high availability concepts.

Availability means your web application is available to your users to use. We would all like our applications to available 100% of the time. But for various reasons it does not happen. The goal of high availability is to make the application available as much as possible. Generally, availability is expressed as a percent of time that application is available per year. One may say availability is 99% or 99.9% and so on.

Redundancy and failover are techniques used to achieve high availability. Redundancy is achieved by having multiple copies of your server. Instead of 1 apache web server, you have two. One is the active server. The active server is monitered and if for some reason it fails, you failover to the 2nd server which becomes active. Another approach is to use a cluster of active servers as is done in a tomcat clusters. All servers are active. A load balancer distributes load among the members of the cluster. If one or two member of the cluster go down, no users are affects because other servers continue processing. Of course, the load balancer can become a point of failure and needs redundancy and failover.

If you were launching a new web application to the cloud, you might start of with a basic architecture as shown below without any HA consideration.

Phase 1: 1 Tomcat web server




Very soon you hit 2 issues. First whenever the server goes down, by accident or by intent, your users cannot use the application. As the number of users goes up, your server is not able to handle the load.

Phase 2: Tomcat cluster

You add redundancy and scalability by using a tomcat cluster as shown in the figure below. The cluster is fronted by Apache Web server + mod_proxy which distributes requests to the individual server. Mod_proxy is the load balancer.


Now the application scales horizontally. Tomcat or application failure is not an issue because there are other servers in the cluster. But we have introduced a new point a failure, the load balancer. If Apache+mod_proxy goes down, the application is unavailable.

To read more about setting up a tomcat cluster see Tomcat clustering

To learn how to use a load balancer with tomcat see Loadbalancing with Tomcat

Phase 3: Highly available Tomcat cluster
The figure below shows how to eliminate the point of failure and make the load balancer highly available.
You add redundancy by adding a second apache+mod_proxy. However only one of the apache is active. The second apache is not handling any requests. It merely monitors the active server using a tool like heartbeat. If for some reason, the active server goes down, the 2nd server knows and the passive server takes over the ip address and starts handling requests. How does this happen ?

This is possible because the ip address for this application that is advertised to the world is shared by the two apache's. This is know as a virtual ip address. While the 2 servers share the virtual IP, TCP/IP routes packets to only the active server. When the active server goes down, the passive server tells TCP/IP to start routing packets intended for this ip address to it. There are TCP/IP commands that let the server start and stop listening on the virtual ip address.

Tools like heartbeat and Ultramonkey enable you to maintain a heartbeat with another and failover when necessary. With heartbeat, there is a heartbeat process on each server. Config files have information on the virtual ip address, active server, passive server. There are several articles on the internet on how to setup heartbeat.

In summary, you can build highly available applications using open source tools. The key cocepts of HA, redundancy, monitoring & failover, virtual ip address apply to any service and not just web servers. You can use the same concepts to make your database server highly available.

Sunday, February 19, 2012

Java Generics #2 : what is "? super T" ?

Consider the merge method below that copies all elements from List source to List target.
public static <T> void merge(List<? super T> target, List<? extends T> source)
The <T> following static declares T as a new type variable for this method. We discussed "? extends T" in the blog Java Generics #1. Here, let us examine "? super T". One can guess that it is a wildcard that means any types, that are a superclass of T. If T is Integer, then List<? super T> could be List<Integer>, List<Number> or List<Object>. The code below shows the use of the merge method. In line 6, T is Integer and "? super T" is Number. In line 10, T is Number and "? super T" is Object.
1 List<Integer> aInts = new ArrayList<Integer>() ;
2 aInts.add(5) ;
3 aInts.add(7) ;
4 List<Number> aNums = new ArrayList<Number>() ;
5 aNums.add(12.5) ;
6 MCollections.<Integer>merge(aNums,aInts) ; // works
7 System.out.println(aNums.toString()) ; // aNums has 5,7,12.5
8 List<Object> aObjs = new ArrayList<Object>() ;
9 aObjs.add("hello") ;
10 MCollections.<Number>merge(aObjs,aNums) ; // works as well
11 System.out.println(aObjs.toString()) ; // aObjs has hello,5,7,12.5
We discussed in the last blog that if you have a Collection<? extends T> you can get values out of it, but you cannot put stuff into it. So what can you do with Collection<? super T> ?

In our merge example above, List<? super T> is the target, which means the implementation is putting/setting elements into it. "? super T" means any supertype of T. Logically it makes sense that you can put any supertype into the List.

The implementation of merge could be
1 public class MCollections {
2 public static <T> void merge(List<? super T> target, 
3                                List<? extends T> source) {
4   for(int i = 0 ; i < source.size(); i++) {
5     T e = source.get(i) ;
6     target.add(e) ;
7   }
8 }
9 } 
But if you were to do a get, what would be the returned type ?. There would be no way to know. Hence it is not allowed as shown in line 4 of the code below.
1 List<? super Integer> aNums = new ArrayList<Number>() ;
2 aNums.add(11) ;
3 aNums.add(12) ;   
4 Number n = aNums.get(0) ; // Compilation Error - not allowed 
5 Object o = aNums.get(0) ; // allowed -- No compile error 
The exception to the rule is getting an Object, which is allowed because since Object is a supertype of every other java type.

In summary, you can enable subtyping using "? super T" when you need to put objects into the collection. ( But you can get them out only as Object). You can enable subtyping using ? extends T when you need to get objects out of the collection. It follows that if you need to do both get and put, then you cannot use either of these wildcard mechanisms and you need to use a explicit type.

Sunday, January 15, 2012

Java Generics #1 : Subtyping using wildcard with extends

Generics is one of those more complicated language features in Java that is not well understood by many programmers. Many avoid it altogether. This is not without reason. While writing your program, if you have to stop and think a lot about syntax, there is more than a good chance, you would try to avoid that language construct. In this blog I discuss one type of subtyping with generics which can be tricky.

In Java we know that Integer extends Number. In other words, Integer is a subtype of Number. Anywhere a Number required, you can pass in an Integer. But does this mean that List<Integer> is a subtype of List<Number> ?

Consider the code. Will it work ?
1 List<Integer> aList = new ArrayList<Integer>() ;
2 aList.add(11) ;
3 aList.add(13) ;
4 List<Number> nList = aList ;
5 nList.add(11.5) ;
aList is a list of Integers. nList is a list of Numbers. In line4 nList is made to reference aList. In line 5 we add a double to aList. But aList is a list of integers. This is obviously not correct. And Java will not allow it. Line 4 will cause a compilation error. But sometimes we want to be able use subtypes. Generics have the concept of wildcards that enable subtyping when logically approriate.

Consider the addAll method of the Collection interface.
interface Collection<T> {
   public boolean addAll(Collection<? extends T> x) ;

}
? extend T says that given a collection of Type T , you can add to it elements from any collection whose type is a subtype of T. The following code is valid.
1 List<Number> aList = new ArrayList<Number>() ;
2 List<Integer> intList = Arrays.asList(11,12) ;
3 List<Double> dList = Arrays.asList(15.15) ;
4 aList.addAll(intList) ;
5 aList.addAll(dList) ;
The implemention of addAll method will get elements from the list passed in as parameter and put it into the target collection. Note that it is only a get operation on Collection<? extends T>. A put on Collection<? extends T> would however not be allowed. To understand, consider the code below
1 List <? extends Number> numList ;
2 List<Integer> intList = Arrays.asList(11,12) ;
3 numList = intList ; // Will this work ?
4 numList.add(5.67) ; // Will this work ?
Should line 3 work ? What about line 4 ?
The Java compiler allows line 3 because List<Integer> is considered a subtype of List <? extends Number>. But line 4 is a compilation error because you should not be allowed to add a double to List<Integer>.

In summary, when you have a Collection<? extends T>, it is safe to get elements out of the collection but not safe to put elements into it. Hence the compiler does not allow it.