Thursday, November 20, 2014

Apache Kafka Java tutorial #2

In the blog Kafka introduction, I provided an overview of the features of Apache Kafka, an internet scale messaging broker. In Kafka tutorial #1, I provide a simple java programming example for sending and receiving messages using the high level consumer API.  Kafka also provides a Simple consumer API that provides greater control to the programmer for reading messages and partitions. Simple is a misnomer and this is a complicated API. SimpleConsumer connects directly to the leader of a partition and is able to fetch messages from an offset. Knowing the leader for a partition is a preliminary step for this. And if the leader goes down, you can recover and connect to the new leader.

In the tutorial, we will use the "Simple" API to find the lead broker for a topic partition.

To recap some Kafka concepts
  • Broker in Kafka is a cluster of brokers
  • Messages are sent to and received from topics
  • Topics are partitioned across brokers
  • For each partition there is 1 leader broker and 1 or more replicas
  • Ordering of messages is maintained only within a partition
To manage read positions within a topic, it has to be done at partition level and You need to know the leader for that partition.

For this tutorial you will need

(1) Apache Kafka 0.8.1
(2) Apache Zookeeper
(3) JDK 7 or higher. An IDE of your choice is optional
(4) Apache Maven
(5) Source code for this sample from https://github.com/mdkhanga/my-blog-code if you want to look at working code

In this tutorial, we will
(1) create a 3 node kafka cluster
(2) create a topic with 12  partitions
(3) Write code to determine the leader of the partition
(4) Run the code to determine the leaders of each partition.
(5) Kill one broker and run again to determine the new leaders

Note that Kafka-topics --describe command lets you do the same. But we are doing it programatically for the sake of learning and because it is useful is some usecases.

Step 1 : Create a cluster

Follow the instruction is tutorial 1 to create a 3 node cluster.

Step 2 : Create a topic with 12 partitions

/usr/local/kafka/bin$ kafka-topics.sh --create --zookeeper host1:2181 --replication-factor 2 --partitions 12 --topic mjtopic

Step 3 : Write code to determine the leader for each partition

We use the SimpleConsumer API.
PartitionLeader.java

import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.consumer.SimpleConsumer;


SimpleConsumer consumer = new SimpleConsumer("localhost", 9092, 
        100000, 64 * 1024,  "leaderLookup");
List topics = Collections.singletonList("mjtopic");
TopicMetadataRequest req = new TopicMetadataRequest(topics);
kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);

List metaData = resp.topicsMetadata();
int[] leaders = new int[12] ;

 for (TopicMetadata item : metaData) {
      for (PartitionMetadata part : item.partitionsMetadata()) {
          leaders[part.partitionId()] = part.leader().id() ;
      }
 }
for (int j = 0 ; j < 12 ; j++) {
      System.out.println("Leader for partition " + j + " " + leaders[j]) ;

}

SimpleConsumer can connect to any broker that is online. We construct a TopicMetadataRequest with the topic we are interested in and send it to broker with the consumer.send call. A TopicMetaData is returned which contains a set of PartitionMetaData ( one for each partition ). Each PartitionMetaData has the leader and replicas for that partition.

Step 4 : Run the code 

Leader for partition 0 1
Leader for partition 1 2
Leader for partition 2 3
Leader for partition 3 1
Leader for partition 4 2
Leader for partition 5 3
Leader for partition 6 1
Leader for partition 7 2
Leader for partition 8 3
Leader for partition 9 1
Leader for partition 10 2
Leader for partition 11 3


Step 5 : Kill node 3 and run the code again

Leader for partition 0 1
Leader for partition 1 2
Leader for partition 2 1
Leader for partition 3 1
Leader for partition 4 2
Leader for partition 5 1
Leader for partition 6 1
Leader for partition 7 2
Leader for partition 8 1
Leader for partition 9 1
Leader for partition 10 2
Leader for partition 11 1


You can see the broker 1 has assumed leadership for broker 3's partitions.

In summary, one of the things you can use the SimpleConsumer API is to examine topic partition metadata. We will use this code in future tutorials to determine the leader of a partition.

Related blogs:

Apache Kafka Introduction
Apache Kafka JAVA tutorial #1
Apache Kafka JAVA tutorial #3 
Apache Kafka 0.8.2 New Producer API 






Friday, October 10, 2014

ServletContainerInitializer : Discovering classes in your Web Application

In my blog on java.util.ServiceLoader, we discussed how it can be used to discover third party implementations of your interfaces. This can be useful if your application is a container that executes code written by developers. In this blog, we discuss dynamic discovery and registration for Servlets.

All Java Web developers are already familiar with javax.servlet.ServletContextListerner interface. If you want to do initialization when the application starts or clean up when it is destroyed, you implement the contextInitialized and contextDestroyed methods of this interface.

In Servlet 3.0 specification, they added a couple interesting features that help with dynamicity, that are particularly useful to developers of libraries or containers.

(1) javax.servlet.ServletContainerInitializer is another interface that can notify your code of application start.

Library or container developers typically provide an implementation of this interface. The implementation should be annotated with the HandlesTypes annotation. When the application starts, the Servlet container calls the OnStart method of this interface, passing in as a parameter a set of all classes that implement, extend or are annotated with the type(s) declared in the HandlesTypes annotation.

(2) The specification also add a number of methods to dynamically register Servlets, filters and listeners. You will recall that previously, if you needed to add a new Servlet to you application, you needed to modify web.xml.

Combining (1) and (2), it should be possible to dynamically discover and add Servlets to a web application. This is a powerful feature that allows you to make the web application modular and spread development across teams without build dependencies. Note that this technique can be used to discover any interface, class or annotation. I am killing 2 birds with one stone by using this to discover servlets.

In the rest of the blog, we will build a simple web app, that illustrates the above concepts. For this tutorial you will need

(1) JDK 7.x or higher
(2) Apache Tomcat or any Servlet container
(3) Apache Maven

In this example we will

(1) We will implement SevletContainerInitializer called WebContainerInitializer and package it in a jar containerlib.jar.
(2)  To make the example interesting, we will create a new annotation @MyServlet, which will act like the @WebServlet annotation in the servlet specification. WebContainerInitializer will handle types that are annotated with @MyServlet.
(3) We will write a simple web app that has a Servlet annotated with @MyServlet and has containerlib.jar in the lib directory. No entries in web.xml.
(4) When the app starts, the servlet is discovered and registered. You can go to a browser and invoke it.

Before we proceed any further, you may download the code from my github respository, So you can look at the code as I explain. The code for this example is in the dynamicservlets directory.

Step 0: Get the code

git clone https://github.com/mdkhanga/my-blog-code.git

dynamicservlets has 2 subdirectories: containerlib and dynamichello.

The containerlib project has the MyServlet annotation and the WebContainerInitializer which implements ServletContainerInitializer.

DynamicHello is a web application that uses containerlib jar.

Step 1: The MyServlet annotation
MyServlet.java
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface MyServlet {   
    String path() ;
}

The annotation applies to classes and is used as
@MyServlet(path = "/someuri")

Step 2: A Demo servlet
HelloWorldServlet.java
@MyServlet(path = "/greeting")
public class HelloWorldServlet extends HttpServlet {
     

    protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
        PrintWriter p = response.getWriter() ;
        p.write(" hello world ");
        p.close();
    }
   
}


This is a simple hello servlet that we discover and register. Nothing needs to be added to web.xml.

Step 3: WebContainerInitializer
WebContainerInitializer.java
This is the implementation of ServletContainerInitializer.

@HandlesTypes({MyServlet.class})
public class WebContainerInitializer implements ServletContainerInitializer {

    public void onStartup(Set> classes, ServletContext ctx)
            throws ServletException {
       
        for (Class c : classes) {
            MyServlet ann = (MyServlet)c.getAnnotation(MyServlet.class) ;       
            ServletRegistration.Dynamic d = ctx.addServlet("hello", c) ;
            d.addMapping(ann.path()) ;
           
        }

    }


The implementation needs to be in separate jar and included as a jar in the lib directory of the application war. WebContainerInitializer is annotated with @HandleTypes that takes MyServlet.class as parameter. When the application starts, the servlet container finds all classes that are annotated with MyServlet and passes them to the onStartup method. In the onStartup method, we go through each class found by the container, get the value of the path attribute from the annotation and register the servlet.

To make this work, we need one more thing, which is in the META-INF/services directory, a file whose name is javax.servlet.ServletContainerInitializer, which contains 1 line com.mj.WebContainerInitializer. If you are wondering why this is required, please see my this blog.

Step 4: Build and run the app

To build,
cd containerlib
mvn clean install
cd dynamichello
mvn clean install

This builds dynamichello/target/dynamichello.war that can be deployed to tomcat or any servlet container.
When the application starts, you will see the following messages in the log

Initializing container app .....
Found ...com.mj.servlets.HelloWorldServlet
path = /greeting


Point you browser to http://localhost:8080/hello/greeting.

The servlet will respond with a hello message.

In summary, this technique can be used to dynamically discover classes during application startup. This is typically used to implement libraries or containers such as JAX-RS implementation. This allows implementations to be provided by different developers. There is no hard wiring.

Saturday, September 20, 2014

Discovering third party API/SPI implementations using java.util.ServiceLoader

One interface, many implementations is a very well known object oriented programming paradigm. If you write the implementations yourself then you know what  those implementations are and you can write a factory class or method that creates and returns the right implementation. You might also make this config driven and inject the correct implementation based on configuration.

What if third parties are providing implementations of your interface? If you know those implementations in advance, then you could do the same as in the case above. But one downside is that code change is required to add or use new implementations or to remove them. You could come up with a configuration file, where implementations are listed and your code uses the list to determine what is available. Downside is that configuration has to be updated by you and this is non standard approach, in that, every API developer could come up with his own format for the configuration. Fortunately JAVA has a solution.

In JDK6, they introduced java.util.ServiceLoader, a class for discovering and loading classes.

It has a static load method that can be used to create a ServiceLoader that will find and load all of a particular Type.

public static<T> ServiceLoader<T> load(Class<T> service)

You would use it as
ServiceLoader<SortProvider> sl = ServiceLoader.load(SortProvider.class) ;
This creates a ServiceLoader that can find and load every SortProvider in the classpath.

The Iterator method returns an Iterator to the implementations founds that will be loaded lazily.
Iterator<SortProvider> it_sl = sl.Iterator() ;

You can iterate over what is found and store it in a Map or somewhere else in memory.
while (its.hasNext()) {
            SortProvider sp = its.next() ;
            log("Found provider " + sp.getProviderName()) ;
            sMap.put(sp.getProviderName(),sp) ;
}

How does ServiceLoader know where to look ?
  • Implementors package their implementation in a jar
  • jar should have a META-INF/services directory
  • services directory should have a file whose name is the fully qualified name of the Type
  • file has a list of fully qualified name of implementations of type
  • jar is installed to the classpath
I have a complete API/SPI example for a Sort interface below that you can download at https://github.com/mdkhanga/my-blog-code. This sample is in msort directory. You should download the code first, so that you can look at code while reading the text below. This example illustrates how ServiceLoader is used to discover implementations from third party service providers. Sort interface can be used for sorting data. Service providers can provide implementations of various Sort algorithms. In the example,

1. com.mj.msort.Sort is the main Sort API. It has 2 sort methods. One for Arrays and one of
 collections. 2 implementations are provided - bubblesort and mergesort. But anybody can write additional implementations.
 
2. com.mj.msort.spi.SortProvider is the SPI.Third party implementors of Sort must also implement the SortProvider interface. The SPI provides another layer of encapsulation. We don't want to know the implementation details. We just want an instance of the implementation.

3. SPI providers need to implement Sort and SortProvider.

4. com.mj.msort.SortServices is a class that can discover and load SPI implementations and make them available to API users. It uses java.util.ServiceLoader to load SortProviders. Hence SortProvider also needs to be packaged as required by java.util.ServiceLoader for it to be discovered.

This is the class that brings everything together. It uses ServiceLoader to find all implementations of SortProviders and stores them in a Map. It has a getSort method that programmers can call to get a specific implementation or whatever is there.

5.  Sample Usage

Sort s = SortServices.getSort(...
s.sort(...

In summary, ServiceLoader is a powerful mechanism to find and load classes of a type. It can used to build highly extensible and dynamic services. As an additional exercise, you can create your own implementation of SortProvider in your own jar and SortServices will find it as long as it is on the classpath.

Tuesday, August 26, 2014

Android programming tutorial

Android is an open source linux based operating system for mobile devices like smart phones, tables and other devices. The purpose of this blog is to introduce a developer to android development. There are already many tutorials for Android. So why another ? Mobile development is fun and easy. But despite lots of documentation from Google and several blogs, the initial startup for new developer is not easy. There is substantial trial and error even for the experienced programmer before you get comfortable with the development process.

In the rest of the blog I will
  • Describe some android application concepts
  • Describe what SDKs and tools you need to download
  • Develop a very simple android application.
This blog will be most useful when used in conjunction with the official Android developers documentation. There are new terms like Activity or Layout that I describe only briefly. You should read more about it from the original documentation.

Concepts

  • Android applications are mostly developed in JAVA.
  • Android development is like any other event driven UI development. Layout UI elements on the screen and write code to handle event like user tapping a button or a menu option.
  • An activity is a single screen of an application that a user interacts with. 
  • An application may have many activities. Each activity has a layout that describes how the user interface widgets are layed out on the screen.
  • Activities communicate by sending Intents to each other. For example, if by clicking a button, a particular screen needs to replace the current one, the current activity will send an intent to the one that needs to come to the foreground.
  • Android SDK supports all the UI elements like text boxes, buttons, lists , menus, action bar etc that are necessary to build a UI.
  • The layouts determine how the UI elements are positioned on the screen respective to each other. With LinearLayout, the UI elements are positioned one after the other.  With RelativeLayout, the UI elements are positioned relative to one another.
  • Additionally, there are APIs
    • to store data to a file or to a local SQLite relational database.
    • to phone other devices.
    • to send text messages to other devices.
    • to send messages to other applications.
  • Using HTTP, REST or other general purpose client libraries, you can make requests to remote servers.
  • Most of the time, any JAVA library that you can use in any JAVA application is generally usable in Android. ( of course sometimes there are issues such as supported JDK versions)
Required Tools
  • JAVA SDK  
  • Android Studio
    • This has the Android SDK and an IntelliJ based IDE.
    • You could also use the eclipse ADT or just the plain SDK with command line.
    • For this tutorial I have used Android studio 0.8.2.
  • Optional - A mobile device
    • Android SDK has emulators that you can run the app on. But they are slow.
    • Running on a real device gives more satisfaction. I used a Nexus 7. 
  • Optional - Download the source code for the tutorial below from https://sites.google.com/site/khangaonkar/home/android
In the rest of the blog we will work through a very simple tutorial to develop an android application.

Tutorial

Step 1: Download the android SDK

Download the android SDK from http://developer.android.com/sdk/installing/index.html. The SDK is available in 3 flavors : eclipse ADT , android studio (intelliJ) and commandline. For this tutorial, I used android studio because that seems to be the recommended direction from google. But (except on MacOs) eclipse works fine as well.

Step 2 : Create a new project

Start Android Studio
Select File > New Project
Enter Application name and click next
Accept default for form factors and click next
Select the default blank activity and hit next
Select the defaults for the new activity and hit finish

You should see a project as shown below















Step 3: Create an emulator
An emulator lets you test your application on a variety of devices without actually having the device. Let us create a Nexus 7 emulator.

Click Tools > Android > AVD Manager
Click create and enter the information as shown below




















Click Ok
Select the created device and hit Start
This will a take a couple of minutes. The emulators are slow. Eventually you will see the window shown below













In the main project, in the lower window, you should see that the emulator is detected.








Caution: Emulators are very slow and take a lot of time to start. The first time I install a new version of AppStudio or eclipse ADT, they almost never work. It takes a little bit of trial and error to get them going.

Step 4 : Run the application

Click Run > Run App
When prompted, Select the emulator
The default apps shows hello world on the screen













Step 5: Review generate files

Under Greeting/app/ src/main/java is the class com.mj.greeting.MyActivity. This is the main class the represents the logic around what is shown on the screen.
line 17  is setContentView(R.layout.activity_my);
This line sets the layout that is displayed on the screen. The layout is defined as an xml file Greeting/apps/src/main/res/layout/activity_my.xml. The LayoutManager and any UI elements like editboxes , buttons etc and their properties are defined here. In this case, a RelativeLayout surrounds a Textview whose default value is Hello World.

Step 6: Add some new code
Let us add an edittext box and a button to the UI. The user can type a message in the editbox and then click the button. On clicking the message replaces what is displayed in the textview.

In the file Greeting/apps/src/main/res/layout/activity_my.xml

add an android:id to the relativelayout
    xmlns:tools="http://schemas.android.com/tools"
    android:id="@+id/main"


add an android:id to the textview
        android:id="@+id/textview"
        android:text="@string/hello_world"


The ids will let us reference these widgets in code.

Add an edittext box
<EditText
        android:id="@+id/edittext"
        android:layout_width="wrap_content"
        android:layout_height="wrap_content"
        android:layout_below="@id/textview"
        android:ems="10"
        android:layout_marginTop="10dp"
        android:text="greeting" android:inputType="text" />


and a button
<Button
        android:id="@+id/button"
        android:layout_width="wrap_content"
        android:layout_height="wrap_content"
        android:layout_below="@+id/edittext"
        android:layout_marginTop="10dp"
        android:text="Update Greeting"
        android:onClick="onClick"/>


OnClick attribute references the method that is called when the user clicks the button. So we will need to add an onClick method implementation

To the class com.mj.greeting.MyActivity add the method
public void onClick(View v) {
        View main = this.findViewById(R.id.main) ; // get a reference to the current view
        EditText edit = (EditText) main.findViewById(R.id.edittext) ; // get a reference to the edittext
        TextView tv= (TextView) main.findViewById(R.id.textview) ; // get the textview
        tv.setText(edit.getText()); // get the text entered in edittext and put it in the textview
    }


Run the application




















Step 7: Run on a real device

So far we have been running the application on a emulator. It is much more fun to run on a real device. Enable USB debugging on your device.  On the Nexus 7, USB debugging is enabled by selecting the option in Settings/Developer Options.

Connect it to your development machine with a USB cable. do Run > Run App

The application will be installed and run on the device.



















In summary, getting started with mobile development is simple and fun once you get comfortable with the concepts and tools.






Saturday, July 26, 2014

Distributed Systems : Consensus Protocols

Modern software systems scale by partitioning the data and distributing data across several machines. Systems are made highly available by replicating data across multiple machines. When multiple systems are involved in managing state, they need to agree when a particular piece data needs to change.

You are familiar with the concept of a transaction in a relational database. A transaction is a unit of work (like a insert or update or some combination of multiple statements) that as a whole can be committed and aborted. What if the work involves updating multiple databases that are on different machines ? To ensure consistent state across the system, all the databases should agree on what to do, whether to commit or abort the state change.

Modern distributed NoSql databases have a similar but slightly different problem. If you had a single server and set a value v=8 in the server. There is no doubt what the value of v is. Any client that connects to the server reads the value as 8.  What if you had a cluster of 3 servers ? Would a client connecting to one the servers see the value as 8 ? Consensus is required to ensure all servers agree on what the value of v is.

Consider systems like Apache Zookeeper or Apache Cassandra. To ensure high availability, clients can connect to any node in the cluster and read  or write data. To ensure consistency in the cluster, some consensus is required among the nodes in the cluster when state changes.

In the rest of this blog we briefly cover some distributed protocols starting with two phase commit, which users of relational databases are very familiar with. We will then talk about Paxos , ZAB and Raft. Paxos became popular because it was used by google for its distributed systems. ZAB is used by Zookeeper which is an important component of the Hadoop echosystem. These protocols are hard to understand and no attempt is made to go into detail. The purpose is to introduce readers to some of the consensus concepts that are important in distributed systems.

1. Two phase commit

Used in databases to ensure all participants in distributed updates either commit or abort the changes.
One node called the co-ordinator originates the transaction.

1.1 Co-ordinator sends a prepare message to all participants.
1.2 Each participant replies with a yes if it can commit its part of the transaction or No otherwise.
1.3 If the co-ordinator receives a yes from all participants, it sends a commit message to the participants. Otherwise it sends an abort message.
1.4 If the participant receives a commit message, it commits its change. If it receives an abort message, it aborts the change. In both cases, it sends an acknowledgement back to the co-ordinator.
1.5 Transaction is complete when the coordinator receives all acknowledgments.

One limitation of this protocol is that if the co-ordinator crashes, the participants do not know whether to commit or abort the transaction, as they do not know how the other participants responded.

2. Three phase commit

The protocol attempts to let the participants make progress even if the co-ordinator fails.

2.1 Co-ordinator sends a prepare message to all participants.
2.2  Each participant replies with a yes if it can commit its part of the transaction or No otherwise.
2.3. If the co-ordinator receives yes from all of participants, it send a pre-commit  message to all participants.
2.4 When the co-ordinator receives an acknowledgment from a majority of participants, it sends a commit message to all participants.

If the co-ordinator fails, the participants can communicate with each other and determine whether to commit or abort.

3. PAXOS

Paxos was first published in that nineties but it became more popular after Google implemented and used it in its distributed infrastructure. The protocol is notorious for being difficult to understand. Below is a very brief description. See references for more details.

There are nodes that propose values called proposers and that accept values called acceptor.

3.1 A proposer with a value to propose submits a proposal (v,n) with value v and sequence number n.

3.2.  When an acceptor receives a proposal (v,.n), it compares it with the highest version proposal accepted for that value. If this proposal is higher version that any accepted proposal, the acceptor replies agree and sends the value of any previously accepted proposal. If the acceptor has already accepted a higher version, it rejects the current proposal.

3.3 If the proposer receives agree from majority of acceptors, it can pick one of the values sent by the acceptors. If they acceptors have not sent any value, it can pick its own value. It then sends a commit message with the chosen value to acceptors. If majority reject or do not respond, abort this proposal and try another one.

3.4  When the acceptor receives a commit message, it agrees to commit if the sequence number is the highest it has agreed to or if the value is the same as the last accepted proposal. Otherwise it rejects the commit.

3.5 If a majority accept the commit, the proposal is complete. Otherwise abort and try again.

Key takeaway is that majorities are used to accept proposal. If there are multiple proposers competing for a value, it is possible that no progress is made in accepting values. The solution is to elect a leader that proposes values. Other players in the system could be learners who learn about accepted values from either the leader or other participants.

4. ZAB (Zookeeper Atomic Broadcast)

ZAB was developed for use in Apache Zookeeper due to limitations in PAXOS. In Zookeeper , the order in which changes are applied in important. In PAXOS, it is possible that updates get applied by acceptors out of order.

ZAB is similar to PAXOS in that a leader proposes values and values are accepted based on majority vote. The key difference is that strict order of updates is maintained. If the leader crashes and a new leader is elected, the updates are applied in the original order.

5. RAFT

RAFT is another distributed consensus protocol that claims to be simpler that PAXOS or ZAB

A node can either be a leader, follower or candidate.

5.1 By default all nodes are followers. When there is no leader, a node can make itself a candidate for leadership and solicit votes.

5.2 The candidate that gets majority votes is elected leader.

5.3 A client submits its updates to the leader. Leader updates a log (uncommitted) and sends the update to followers.

5.4 When leaders hears from a majority of followers that they have made the update, leader commits the change and informs the followers of the commit

5.5 Followers commit the update.

5.6 If a leader terminates for some reason, one of the followers turns itself into a candidate and gets elected as the leader.

We have a given a brief description of some consensus protocols. If you use Hadoop, Cassandra, Kafka or similar distributed systems, you will run into these protocols. For more details, some references are provided below.

References:

1.  Database Management Systems by RamaKrishnan and Gehrke
2. PAXOS made simple
PAXOS by example
4. The secret lives of data
5. Apache Zookeeper
6. Paxos paper trail
7. Raft Consensus

Friday, June 27, 2014

Apache Cassandra : Things to consider before choosing Cassandra

A lot has been written about NoSql databases. There is lot of hype surrounding many of these databases. Unfortunately most written material either sings praises about a particular database or trashes it. I also am starting to see people pick databases for the wrong reasons. The purpose of this blog is to highlight the reasons to consider while choosing Cassandra as your database.

1. Scaling by partitioning data

Cassandra is designed to store large quantities of data - several hundreds of terrabytes or petabytes that typically cannot be stored on a single machine. Cassandra solves the problem by partitioning the data across machines in a cluster using a consistent hash. When data is partitioned across several machines, some of the things we are used to in relational databases like consistency and transactions are difficult to implement. Hence those features are weak or in some cases not available. So the ability to scale comes at the expense of other features.

The single biggest mistake people make is using Cassandra when their size of data is not large enough to merit partitioning. If in the foreseeable future, you data size is a few hundred gigabytes , stick to mysql or other relational database of your choice. Even if your data size grows in the the future, you can always port to Cassandra when you reach the stage of few TB. This is especially true if you are building a new application with limited resources. Do not let the complexity of Cassandra slow down the rest of your feature development.

2. High availability

The CAP theorem states that out of consistency, availability and partition tolerance , it is possible for a system to have only 2 of 3. Cassandra is designed for availability and partition tolerance.

If your applications primary requirement is high availability, Cassandra can be a great choice. With its shared nothing architecture, where all nodes are equal, multiple nodes can go down and the database is still available. Clients can connect to any node and that node with get/put the data to the node that is required to handle that data. Replication ensures that if the primary node that handles the data goes down, a replica is able to service the request.

3. Replication

Replication has 2 purposes : One it provides redundancy for data in case of failures. Second, it makes copies of data available closer to where it is consumed or served. In many databases, setting up replication is cumbersome. Not in Cassandra. Replication is core to the architecture.  Replication is configured at a keyspace level by specifying a replication strategy and the number of replicas and the data is replicated within the cluster or across data centers as required.

If replication is important, especially across data centers, Cassandra is a great choice.

4. Optimized for writes

Write operations update a in memory data structure called Memtable and returns immediately. Nothing is locked and nothing is written to disk. Writes are very fast. When Memtables reach a certain size, they are flushed to disk to a file called SSTable. Reads may have to go through multiple SStables and aggregate changes to return correct data. For this reason, reads might not be that fast.

If you have a workloads that involves a lot of writes and few reads then Cassandra is a suitable database. A common use case is storing data from log files of high volume production web servers that service several billion requests a day. An analytics application would potentially read the data, but the read volume is low because the reads are done by in house business analysts and not internet users.

5. Compaction

Over time several SSTables get created and reads have to go through multiple SSTables to get to data. Periodically Cassandra will asynchronously merge smaller SSTables into large SSTables. People have complained that during compaction, things slow down and throughput degrades. There are probably ways to tune this, but you should be aware of compaction when using Cassandra.

6. Limited querying capability

Cassandra supports a SQL like language called CQL. It is "SQL like" and not SQL. Many very basic things like aggregation operators are not supported. Joins of tables are not supported. Range queries on partition key are not supported. Range queries are possible within a partition key using the clustered columns, but it requires some additional data modeling. 

Bottom line is that Cassandra partitions the data based on consistent hash of the partition key and look ups are possible based only on the look up key. Anything else requires additional modeling that involves what is called clustered columns.

7. Consistency model

Cassandra was inspired by Amazon's Dynamo database where the model was eventual consistency. When a client requested data and there was inconsistency between the values in the nodes  of a cluster, the server returned a vector clock to the client and it was the responsibility of the client to resolve any conflict.

Cassandra's model is tunable consistency. For a read or write operation, client can specify a consistency level such as ANY, ALL, QUORUM, ONE , TWO etc. However when there are concurrent writes, the order is determined based on machine time stamps. So it is important that clocks on nodes in the cluster be synchronized. Getting the consistency model to work requires time and effort on the part of the developer. If the kind of strong consistency we are used to in relational databases is important to you, Cassandra will not be a suitable choice.
 
8. Frequent updates

Based on what is discussed in (4) and (8) Cassandra is not suitable for use cases where you update column values frequently. When concurrent updates happens, Cassandra uses timestamps to determine which update happened first and you could sometimes encounter the lost update problem. To work around the problem, what you have to do is append updates to a collection or wide columns and then aggregate the final value on reads. Again this is additional work in data modeling and programming and you might be better off using another database if frequent updates are an integral part of your use case.

In summary, Cassandra is an excellent choice as a database when your data size is very large and high availability or replication are important. But it is not a general purpose database. Some of the scalability comes at a cost and you give up other features like consistency or querying.

For additional information on Cassandra, please check DataStax documentation. You can also read these blogs:
HBase vs Cassandra
Cassandra Data Model
Cassandra Compaction


Wednesday, May 28, 2014

Apache Kafka JAVA tutorial #1

In Apache Kafka introduction we discussed some key features of Kafka. In this tutorial we will setup a small Kafka cluster. We will send messages to a topic using a JAVA producer. We will consume the messages using a JAVA consumer.

For this tutorial you will need

(1) Apache Kafka
(2) Apache Zookeeper
(3) JDK 7 or higher. An IDE of your choice is optional
(4) Apache Maven
(5) Source code for this sample kafkasample.zip

Zookeeper is required as the Kafka broker uses Zookeeper to store topic configuration and consumer information.

We will setup a 3 node cluster. I will assume you have 3 machines: host1,host2,host3. We will run a Kafka broker on each host. We will run Zookeeper on host1 and host2. You can do this tutorial on one machine, but you will need to change the port numbers to avoid conflict.

Step 1: Download the latest binaries

Apache Kafka can be downloaded at http://kafka.apache.org/downloads.html
Apache Zookeeper can be downloaded at http://kafka.apache.org/downloads.html

Step 2: Install Zookeeper

We will install zookeeper on host1 and host2. At least 2 is a good idea so that the cluster is still usable if one of the servers goes down.

For instructions on how to setup a zookeeper cluster see http://zookeeper.apache.org/doc/trunk/zookeeperStarted.html. The instructions are simple and we will not repeat them here.

Step 3: Configure the Kafka Brokers

Assume you have unzipped the binaries to /usr/local/kafka.

Edit the config/server.properties on each host as follows:

Each broker needs a unique id. On host1 set
broker.id=1
On host2 set it to 2 and on host3 set it to 3

On each host, set the directory where the messages are stored
log.dirs=/srv/apps/kafka/data

Tell the brokers which zookeepers to connect to
zookeeper.connect=host1:2181,host2:2181

These are the only properties that need changes. All the other default properties in server.properties are good enough and you can change them as required.

Step 4: Start the brokers

On each server , from /usr/local/kafka/bin execute

kafka-server-start.sh server.properties &

Step 4.1: Create a topic

Let us create a topic mytopic with 3 partitions and a replication factor of 2.

/usr/local/kafka/bin$ kafka-topics.sh --create --zookeeper host1:2181 --replication-factor 2 --partitions 3 --topic mytopic

Step 5: Write a JAVA producer

The complete source code is at kafkasample.zip.

Create the properties that need to be passed to the producer.
       
        Properties props = new Properties();
        props.put("metadata.broker.list", "host2:9092,host3:9092");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("request.required.acks", "1");
        ProducerConfig config = new ProducerConfig(props);

metadata.broker.list is the list of brokers that the producer can try to connect to.
We are sending text messages. So we use the String encoder.   Setting request.required.acks to 1 ensures that a publish message request is considered completed only when an acknowledgment is received from the leader.
            
        Producer producer = new Producer(config);
     

The above line creates a producer which will send key and value both of type String.
  
        String topic = "mytopic"     
        for (int i = 1 ; i <= 1000 ; i++) {
           
            String msg = " This is message " + i ;

            KeyedMessage data = new KeyedMessage(topic, String.valueOf(i), msg);
            producer.send(data);

        }
        producer.close();

       

The code above sends 1000 messages to the topic.

Step 6: Write a JAVA consumer

Our topic has 3 partitions. Messages within a partition are delivered to a consumer in order. So one consumer per partition makes sense. In our consumer,we will create 3 threads, one for each partition. We use what is called the High level consumer API

First setup the consumer configuration

        Properties props = new Properties();
        props.put("zookeeper.connect", "host:2181");
        props.put("group.id", "mygroupid2");
        props.put("zookeeper.session.timeout.ms", "413");
        props.put("zookeeper.sync.time.ms", "203");
        props.put("auto.commit.interval.ms", "1000");
        ConsumerConfig cf = new ConsumerConfig(props) ;

zookeeper.connect tells the consumer which zookeeper to connect. Consumer needs to connect to zookeeper to get topic information as well as store consumer offset. group.id is the name of the consumer group. Every message is delivered to each consumer group. auto.commit.interval.ms tells how often the consumer should commit offsets to zookeeper.

       ConsumerConnector consumer = Consumer.createJavaConsumerConnector(cf) ;
       String topic = "mytopic"  ;
       Map topicCountMap = new HashMap();
       topicCountMap.put(topic, new Integer(3));
       Map<String,List<KafkaStream<byte[],byte[]>>> consumerMap =

               consumer.createMessageStreams(topicCountMap);
       List<KafkaStream<byte[],byte[]>> streams = consumerMap.get(topic);


The code above creates a ConsumerConnector and gets a list of KafkaStreams. For this topic, we indicate 3 streams are required ( 1 for each partition) and the connector creates 3 streams.

        ExecutorService executor = Executors.newFixedThreadPool(3); ;
        int threadnum = 0 ;     
        for(KafkaStream stream  : streams) {
            executor.execute(new KafkaPartitionConsumer(threadnum,stream));
            ++threadnum ;
        }


The code above creates a threadpool and submits a runnable KafkaPartitionConsumer that will read the stream. The code for the runnable is shown below.

        public static class KafkaPartitionConsumer implements Runnable {
            private int tnum ;
            private KafkaStream kfs ;
            public KafkaPartitionConsumer(int id, KafkaStream ks) {
                tnum = id ;
                kfs = ks ;
            }   
            public void run() {
                System.out.println("This is thread " + tnum) ;
                ConsumerIterator it = kfs.iterator();
                int i = 1 ;
                while (it.hasNext()) {
                    System.out.println(tnum + " " + i + ": " + new String(it.next().message()));
                    ++i ;
                }
            }

        }

Each thread is reading from a stream from a particular partition. If there are no messages, the call hasNext() will block.

Step 7 : Start the consumer

I built my code using maven. When there are dependencies on jars, it is also easier to use maven to run the program, as it pulls in all the dependencies automatically into the classpath

mvn exec:java -Dexec.mainClass="com.mj.KafkaMultiThreadedConsumer"

Step 8 : Start the producer

mvn exec:java -Dexec.mainClass="com.mj.KafkaProducer"

You should see the consumer print out the messages.

You can start multiple consumers with a different groupid and they will each receive all the messages.

Update : see related blogs

Apache Kafka Introduction
Apache Kafka JAVA tutorial #2 
Apache Kafka JAVA tutorial #3 
Apache Kafka 0.8.2 New Producer API



Friday, April 18, 2014

Apache Kafka Introduction : Should I use Kafka as a message broker ?

Asynchronous messaging is an important component of any distributed application. Producers and consumers of messages are de-coupled. Producers send messages to a queue or topic. Consumers consume messages from the queue or topic. The consumers do not have to be running when the message is sent. New consumers can be added on the fly. For Java programmers, JMS was and is the popular API for programming messaging applications. ActiveMQ, RabbitMQ , MQSeries (henceforth referred to as traditional brokers) are some of the popular message brokers that are widely used. While these brokers are very popular, they do have some limitations when it comes to internet scale applications. Generally their throughput will max out at few ten thousands of messages per second. Also, in many cases, the broker is a single point of failure.

A message broker is little bit like a database. It takes a message from a producer, stores it. Later a consumer reads the messages. The concepts involved in scaling a message broker are the same concepts as in scaling databases.  Databases are scaled by partitioning the data storage and we have seen that applied in Hadoop, HBASE, Cassandra and many other popular open source projects. Replication adds redundancy and failure tolerance.

A common use case in internet companies is that log messages from thousands of servers need to sent to other servers that do number crunching and analytics. The rate at which messages are produced and consumed is several thousands per sec, much higher than a typical enterprise application. This needs message brokers that can handle internet scale traffic.

Apache Kafka is a open source message broker that claims to support internet scale traffic. Some key highlights of Kafka are
  • Message broker is a cluster of brokers. So there is partitioning and no single point of failure.
  • Producers send messages to Topics.
  • Messages in a Topic are partitioned among brokers so that you are not limited by machine size.
    • For each topic partition 1 broker is a leader
    • leader handles reads and writes
    • followers replicate
  • For redundancy, partitions can be replicated.
  • A topic is like a log file with new messages appended to the end.
  • Messages are deleted after a configurable period of time. Unlike other messaging systems where message is deleted after it is consumed. Consumer can re-consume messages if necessary.
  • Each consumer maintains the position in the log file where it last read.
  • Point to point messaging is implemented using Consumer groups. Consumer groups is a set of consumers with the same groupid. Within a group, each message is delivered to only one member of the group.
  • Every message is delivered at least once to every consumer group. You can get publish subscribe using multiple consumer groups.
  • Ordering of messages is preserved per partition. Partition is assigned to consumer within a consumer group. If you have same number of partitions and consumers in a group, then each consumer is assigned one partition and will get messages from that partition in order.
  • Message delivery: For a producer , once a message is committed, it will be available as long as at least one replica is available. For the consumer, by default, Kafka provides at least once delivery, which means, in case of a crash, the message could be delivered multiple times. However with each consume, Kafka returns the offset in the logfile. The offset can be stored with the message consumed and in the event of a consumer crash, the consumer that takes over can start reading from the stored offset. For both producer and consumer, acknowledgement from broker is configurable.
  • Kafka uses zookeeper to store metadata.
  • Producer API is easy to use. There 2 consumer APIs.
  • High level API is the simple API to use when you don'nt want to manage read offset within the topic. ConsumerConnector is the consumer class in this API and it stores offsets in zookeeper.
  • What they call the Simple API is the hard to use API to be used when you want low level control of read offsets.
  • Relies on filesystem for storage and caching. Caching is file system page cache.
  • O(1) reads and writes since message and written to end of log and read sequentially. Reads and writes are batched for further efficiency.
  • Developed in Scala programming language
Apache Kafka can be downloaded at http://kafka.apache.org/downloads.html.

They have a good starter tutorial at http://kafka.apache.org/documentation.html#quickstart. So I will not repeat it. I will however write a future tutorial for JAVA producers and consumers.

Apache Kafka is a suitable choice for a messaging engine when
  • You have a very high volume of messages - several billion per day
  • You need high through put
  • You need the broker to be highly available
  • You need cross data center replication
  • You messages are logs from web servers
  • Some loss of messages is tolerable
Some concerns that you need to be aware of are
  • Compared to JMS, the APIs are low level and hard to use
  • APIs are not well documented. Documentation does not have javadocs
  • APIs are changing and the product is evolving
  • Default delivery is at least once delivery. Once and only once delivery requires additional work for the application developer
  • Application developer needs to understand lower level storage details like partitions and consumer read offsets within the partition
It is useful to remember history and draw an analogy with NoSQL databases. 3 or 4 years ago Nosql database were hot and people wanted to use them everywhere. Today we know that traditional RDBMS are not going anywhere and the NoSQL databases are suitable for some specialized use cases. In fact NoSQL database are going in the direction of additing features that are available in RDBMSs. Kafka today is where NoSql databases were a few years ago. Don'nt throw away your traditional message broker yet. While Kafka will be great for the cases mentioned above, lot of the simpler messaging use cases can be done lot more easily  with a traditional message broker.


Related Blogs :

Apache Kafka JAVA tutorial #1
Apache Kafka JAVA tutorial #2 
Apache Kafka JAVA tutorial #3 
Apache Kafka 0.8.2 New Producer API  

Friday, March 28, 2014

10 Tips for building low latency applications

In this previous blog on low latency I described 5 tips for building low latency applications. Read that for the first 5 tips. Here are 5 more tips.

6. Co-locate services

Networks hops add latency. A network call to another server on a different subnet or datacenter can add a few milli-seconds to your response and affect SLA. Install dependent services on the same server or same rack and definitely the same data center.

7. Geographically distribute customer facing services

This might sound contradictory to item 6. But it is not. A round trip over the internet from New York to San Francisco takes 80-90 milli seconds. If your servers are in San Francisco, a user in New York will see some latency even without the server doing any work. Users in New york should be served from servers near New York so their time is not wasted on the round trip. To ensure the rule 6 is not violated, this might mean replicating the dependencies such as the database so that the server in New York is able to serve from a database that is close to it.

As your user base grows, you many need to distribute the services to several locations - east coast US, west coast US , Europe , Asia Pacific and so on.

8. Reduce serialization / de-serialization

Network calls, cross process , cross JVM calls all involve serialization and de-serialization of data which is expensive. Care should be taken to reduce and limit serialization/de-serialization to only required data and to delay to only when required. If you store your data as large blogs, then when you need a small piece of data, you end up serializing de-serializing the entire blog. A few years ago, when XML bandwagon was in full swing, there were many products using XML for RPC. They soon realized that while XML good for reading text, it adds way too much overhead for serialization/de-serialization.

9. Tolerate weak consistency with application design

A system with strong consistency ( think ACID and RDBMS) requires locking data during updates. This mean other writes and readers may need to wait at times. Waiting implies increased latency.

Weak consistency means a reader may not always read the latest updated data. In reality many systems can tolerate this. Weak consistency systems generally do not involve locking. They allow more concurrent readers and writers. They are be easily partitioned and distributed. For these reasons, they have lower latency for reads and writes.

10. Measure and tune

Most systems of any complexity have multiple components. In todays agile development model, developers are continuously deploying new releases of their sub components. If latency suddenly goes up, how do you know what caused it ?

It is important to continuously measure and monitor not only the end to end latency but also the latency contributed by the sub components. Know the averages and what deviations are tolerable. Set up alerts when ever there are deviations from mean. If a new component is released and suddenly the latency goes up, you know the likely culprit. As you user base grows , if you see gradual increases in latency, perhaps you need additional capacity. If users in a particular geographical location are complaining, then perhaps you need to replicated and deploy your service to that location.

In summary, ensuring low latency is a continuous and iterative process that is to be done through out the life of a system

Friday, February 28, 2014

Hadoop 2.x cluster setup tutorial

Recently I had the opportunity to setup a multi-node hadoop cluster. The apache documentation is a little thin and I had to spend several hours trouble shooting issues and googling for help before I got it right.  The purpose of this tutorial is to save time for those setting up a hadoop cluster for the first time. If you are new to hadoop, you may read my tutorial on single node setup at Hadoop 2.x tutorial. If you have never setup hadoop before, it is a good idea to to do a single node setup the first time and then try the multi node setup.

In this tutorial we will

(1) set up a multi node hadoop cluster with 4 servers
(2) To test, copy files to hdfs and cat the files.
(3) To test, run a simple map reduce job that we developed in the single node tutorial

Step 1: Download a stable hadoop release and untar it.

Download a stable 2.x.x release from http://hadoop.apache.org/releases.html#Download.
I downloaded hadoop-2.2.0.tar.gz.
Untar the file to a directory say ~/hadoop-2.2.0.

Step 2: Decide on the topology

For this tutorial , we shall setup a 4 node cluster.

Host : master
Host : slave1
Host : slave2
Host : slave3

On the master we will run the namenode, resourcemanager, datanode, nodemanager and historyserver. On the slaves , we run the datanode and nodemanager.

To make it more real world, we will bring up and test the cluster first with just master, slave1 and slave2. Typically you add more capacity as needed. So we will add slave3 after the fact.

Step 3: Ensure proper network connectivity

I am not going to cover networking details here. But it goes without saying, the master should be able to reach the slaves using their hostnames and the slaves should be able to reach the master. So you may have to add the hostname to ip address mapping in /etc/hosts.

Several startup scripts use ssh to connect and start processes on hosts. So ssh must be setup for password less login on all hosts.

Step 4: Set these environment variables
export HADOOP_HOME=path_to_hadoop_install_dir
export HADOOP_MAPRED_HOME=$HADOOP_HOME
export HADOOP_COMMON_HOME=$HADOOP_HOME
export HADOOP_HDFS_HOME=$HADOOP_HOME
export YARN_HOME=$HADOOP_HOME
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export YARN_CONF_DIR=$HADOOP_HOME/etc/hadoop
To hadoop_install_dir/libexec/hadoop-config.sh, Add
export JAVA_HOME=path_to_jdk
Step 5: Update the config files
These files are located at hadoop_install_dir/etc/hadoop

 core-site.xml
 This applies to all servers.
<configuration>
<property>
    <name>fs.default.name</name>
        <value>hdfs://master:9000</value>
    </property>
</configuration>


hdfs-site.xml
This applies to all servers.
<configuration>
<property>
    <name>dfs.replication </name>
    <value>2 </value>

</property >
<property>
   <name>dfs.permissions </name>
   <value>false </value>

</property>
<property>
  <name>dfs.namenode.name.dir </name>
  <value>file:/mnt/hadoop/data/namenode </value>

</property>
<property>
  <name>dfs.datanode.data.dir </name>
  <value>file:/mnt/hadoop/data/datanode </value>

</property>
</configuration>

dfs.namenode.name.dir points to location where namenode stores metadata. dfs.datanode.data.dir points to location where datanode stores data. It is important to put these directories on disks that have lots of free space ( terra bytes). Default block sizes are large and if there is not enough space you will encounter errors that do not clearly point out the space issue.

yarn-site.xml
This applies to all servers.
<configuration>
<property>
   <name>yarn.nodemanager.aux-services</name>
   <value>mapreduce_shuffle</value>

</property>
<property>
  <name>yarn.nodemanager.aux-services.mapreduce_shuffle.class</name>
  <value>org.apache.hadoop.mapred.ShuffleHandler</value>

</property>
<property>

  <name>yarn.resourcemanager.resource-tracker.address</name>
  <value>master:8025</value>

</property>
<property>
  <name>yarn.resourcemanager.scheduler.address</name>
  <value>master:8030</value>

</property>
<property>
  <name>yarn.resourcemanager.address</name>
  <value>master:8040</value>

</property>
</configuration>

The last 3 properties tell the nodemanager how to connect to the resourcemanager.

Only in the yarn-site.xml of the master, add
<property>
  <name>yarn.nodemanager.localizer.address</name>
  <value>master:8060
</property>
Without this, nodemanager will not start on the master.

mapred-site.xml
<configuration>
<property>

  <name>mapreduce.framework.name</name>
  <value>yarn</value>

</property>
</configuration>


slaves
This is necessary on the master only and is used by the scripts to start datanodes and nodemanagers on the servers listed.

master
slave1
slave2

Step 5: Start the processes

Change to the directory where hadoop is installed.
cd hadoop_install_dir

If you are running hadoop for the first time, the following command will format HDFS. Do not run this everytime as it formats and thus deletes any existing data

hadoop_install_dir$ bin/hadoop namenode -format

Start the namenode.
hadoop_install_dir$ sbin/hadoop-daemon.sh start namenode

Start the datanode.
hadoop_install_dir$ bin/hadoop-daemons.sh start datanode

Note that the script name is -daemons. This starts datanode on all the servers listed in the slaves file. If you use the -daemon script, it will only start the datanode on the server on which you ran the script.

In hadoop 2.x , there is no jobtracker. Instead there is a resourcemanager and a nodemanager.
Start the resourcemanager.
hadoop_install_dir$ sbin/yarn-daemon.sh start resourcemanager

Start the nodemanager.
hadoop_install_dir$ sbin/yarn-daemons.sh start nodemanager
As mentioned in the case of  datanode, the -daemons script will start the nodemanager on all servers listed in the slave file , where as -daemon script with start it only the server on which the script is executed.

Start the history server.
hadoop_install_dir$ sbin/mr-jobhistory-daemon.sh start historyserver

On the master, type jps. It lists the java processes running. Check that all the processes are started

hadoop_install_dir$ jps

1380 DataNode
1558 Jps
1433 ResourceManager
1536 JobHistoryServer
1335 NameNode
1849 NodeManager

Do the same on each of the slaves

hadoop_install_dir$ jps

1380 DataNode
1558 Jps
1849 NodeManager


The jps command is a good check to ensure all necessary processes are started.

You can use the following urls to see the state of the cluster.

For HDFS
http://master:50070
For YARN/Mapreduce
http://master:8088

Step 6: Test HDFS

The HDFS commands are documented at
http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/FileSystemShell.html

hadoop_install_dir$ bin/hdfs dfs -ls /

will list the root directory

hadoop_install_dir$ bin/hdfs dfs -mkdir /input
creates a directory input

In the local filesystem create a file app.log with the data

user01|1|2|3|4|5
user02|1|2|3|4|5
user03|1|2|3|4|5
user01|1|2|3|4|5
user02|1|2|3|4|5
user01|1|2|3|4|5
user03|1|2|3|4|5
user01|1|2|3|4|5
user04|1|2|3|4|5
user01|1|2|3|4|5

Let us pretend this is a log file from a web application where for each request we have logged userid and some additional data. We will later use this as input for a MapReduce program.
You can move it to hdfs using the command

hadoop_install_dir$ bin/hdfs dfs -moveFromLocal ~/projects/app.log /input/

To print the file just moved to hdfs
hadoop_install_dir$ bin/hdfs dfs -cat /input/app.log

Step 7: Run a map reduce program

Writing and running MR program is no different from what we did in the single node tutorial. See Step 6 and Step 7 of the single node tutorial at Hadoop 2.x tutorial.

Step 8: Add additional capacity

Your hadoop cluster is working well. But you are running out of space and your MR jobs are backed up because not enough mappers or reducers are available. Let us increase capacity by adding an additional server slave3.

On slave3, do
Step 1 -- untar that hadoop binaries
Step 3 -- ensure network connectivity
Step 4 -- set the environment variables
Step 5 -- setup the config files

Start the data node
hadoop_install_dir$ bin/hadoop-daemon.sh start datanode

Start the nodemanager
hadoop_install_dir$ sbin/yarn-daemon.sh start nodemanager

Use jps to check processes started. Use the web urls to check that the node got added to the cluster. 

Hoping these steps help jumpstart your hadoop cluster setup and save you time.

Wednesday, January 29, 2014

JAVA NIO Networking tutorial

In the NIO tutorial, I provided an overview of the nio package in JAVA that enables developers to build scalable applications using non blocking techniques. In this tutorial I will focus on the networking parts of the nio package. We will build a simple Echo server and Echo client using non blocking sockets.

The conventional way of writing networking applications is using Socket and ServerSocket classes in java.net package. If you are new to network programming, you may read the tutorial from Oracle at All about Sockets.  The Oracle tutorial cover Sockets where the IO thread blocks. You might say - this works for me and why should I care about non blocking sockets ? When writing a server using blocking sockets, once the server accepts a connection, a thread is created for the peer client socket which handles the IO with the client. There will be times when this thread is doing nothing but waiting for IO. That is not a big issue if the server just has a few clients. But if server needs to handle tens of thousands of concurrent clients, you will end creating thousands of threads that may or may not be doing work and this approach will exhaust the operating system resources.

In NIO networking:
  • The networking stuff such as accepting connections, reading from a peer socket, writing to a socket happens generally from a single thread
  • The processing of data that is read and preparing data to write is done in worker threads.
  • In the main thread a ServerSocketChannel registers an interest in events with a selector.
  • The selector waits for events to occur. Events could be socket requesting a connection, socket is ready for writing , socket is ready for reading.
  • When events occurs the select method returns and the main thread retrieves the event and the peer socket channel.
  • If the event is an accept event, the ServerSocketChannel might accept the connection and register an interest in a read event for the peer socket.
  • If the event is a read event, the thread can read the data from the peer socket and hand it to a worker thread for processing. It can further register an interest in waiting for an event where the socket can be written to.
  • If the event is a write event, the thread can take data intended for this peer that is queued somewhere and write it to the peer socket. It may then register an interest for the next read event from this peer.

Note that the read and writes happen only where the sockets are ready for reading or writing. Let us use the above concepts and write a simple EchoServer and EchoClient. The EchoClient sends lines of text to the EchoServer which echoes each line back.

EchoServer
The complete code is in the file EchoServer.java

Step 1: Create a ServerSocketChannel and bind the local address

        private ServerSocketChannel serverchannel ;
        private Selector selector ;
        private HashMap queuedWrites = 

                   new HashMap();

        serverchannel = ServerSocketChannel.open() ;
        serverchannel.configureBlocking(false) ;
        serverchannel.socket().bind(new InetSocketAddress("localhost",8081));  

The ServerSocketChannel is the channel on which the server will accept stream oriented connections. Selector is what lets you multiplex several connections without blocking for IO. The hashmap is used to queue data to be written on individual connections. This is necessary since we do not want to block on writes (or reads).

Step 2 : Create a selector and register interest in accept event. Run a loop waiting for events.

       selector = Selector.open();
       serverchannel.register(selector, SelectionKey.OP_ACCEPT) ;
           
       while(true) {        
                selector.select() ;
                Iterator keysIterator = selector.selectedKeys().iterator() ;
                while(keysIterator.hasNext()) {                 
                    SelectionKey key = keysIterator.next();
                    

                    // handle the event
                   if (key.isAcceptable()) {
                   } else if (key.isReadable()) {
                   } else if (key.isWritable()) {

                  }
                }

           }

The outer while loop is there so that the server stays up and running waiting for events to happen. In a nutshell, it outlines what a server program does. A Selector is created and we register an interest in the ACCEPT event. The select call is the only call in the program that blocks waiting for events to happen.  We are now waiting for connections. The select call returns when there is an event on a channel. The SelectionKeys tell which event on what channel. The server program takes action based on the event which could be read, write ,accept or connect.

Step 3. Accept a connection. Register interest in a read event.

        if (key.isAcceptable()) {               
                ServerSocketChannel ssc = (ServerSocketChannel)key.channel();
                SocketChannel sc = ssc.accept();
                sc.configureBlocking( false );
                sc.register( selector, SelectionKey.OP_READ );
       }

The SelectionKey has the peer channel. Code accepts connection for that channel. Once the connection is accepted, the server is going to have to read some data sent by the client. Hence we register interest in read event.

Step 4. If there is a read event, read the socket. Hand off the data to the worker queue. Register interest in a write event for the socket.

        if (key.isReadable()) { {
                SocketChannel sc = (SocketChannel)key.channel();
                ByteBuffer readBuffer = ByteBuffer.allocate(8192);                       
                int numread ;
                while (true) { // read till some data is available
                         numread = sc.read( readBuffer );
                         if (numread <=0) {
                              break;
                         }   
                }       
                if (numread == -1) {
                        // Remote entity shut the socket down cleanly. Do the
                        // same from our end and cancel the channel.
                        key.channel().close();
                        key.cancel();
                        continue ;
               }
               readBuffer.flip() ;
               queuedWrites.put(sc,readBuffer) ;   
               key.interestOps(SelectionKey.OP_WRITE) ;

The client has sent some data and the server reads it. Generally a server will want to send a response to the client. Since this is an EchoServer, the server just queues the data read for writing back when the channel is ready for writing.

Step 5. When socket is ready for write, get data from the queue and write it to the socket.

         if (key.isWritable())
                 SocketChannel sc = (SocketChannel)key.channel();     
                 ByteBuffer towrite = queuedWrites.get(sc) ;      
                 while (true) {
                          int n = sc.write(towrite) ;
                          if (n == 0 || towrite.remaining() == 0)
                                break ;
                  }     
                 key.interestOps(SelectionKey.OP_READ) ;
        }

After writing, be ready to read the next message from the client. To recap, the server is in a loop, accepting connections, reading and writing to channels.

EchoClient
The complete code is in the file EchoClient.java

Step 1: Create a Socket Channel and a Selector. Register interest in a Connect event.

        selector = Selector.open() ;
        clientChannel = SocketChannel.open();
        clientChannel.configureBlocking(false);
        clientChannel.register(selector, SelectionKey.OP_CONNECT) ;


Step 2 :  Initiate a connection to the EchoServer

       // Kick off connection establishment. Client initiates connection to server
        clientChannel.connect(new InetSocketAddress("localhost", 8081));


Step 3: Main client loop similar to the server loop

        while(true) {                     
                 // queue a write so it can be written when channel ready
                writeQueue.add("This is line " + i) ;
                selector.select() ;  // wait for events
           
                Iterator skeys = selector.selectedKeys().iterator() ;           
                while (skeys.hasNext()) {
                        SelectionKey key = (SelectionKey) skeys.next();
                        skeys.remove();
                        if (!key.isValid()) {
                             continue;
                        }

                       // Check what event is available and deal with it
                      if (key.isConnectable()) {  // server has accepted connection
                           finishConnection(key);
                      } else if (key.isReadable()) { // socket is ready for reading
                          read(key);
                     } else if (key.isWritable()) {  // socket ready for writing
                         write(key);
                    }
               }
        }


Step 3 : When a connect event is received from the server, finish the connection. Register interest in a Write event on the socket.

        private void finishConnection(SelectionKey key) throws IOException        
                clientChannel.finishConnect() ; // tcp handshake
                key.interestOps(SelectionKey.OP_WRITE) ;   
        }


Step 4 : When the socket is ready for write, write a queued message to the socket. Register interest in a read event from the socket.

        private void write(SelectionKey key) throws IOException {    
                String toWrite = writeQueue.pollFirst() ;       

                if (toWrite != null) {           
                        ByteBuffer b ;
                        b = ByteBuffer.wrap(toWrite.getBytes()) ;          
                       while (true) {
                              int n = clientChannel.write(b) ;

                              if (n == 0 || b.remaining() == 0)
                             break ;
                        }   
                  }
              key.interestOps(SelectionKey.OP_READ) ;

       }


Step 5: When the server echos the message, a read event is there on the socket. Read the message and print out the echo.

        public void read(SelectionKey key) throws IOException {
                readBuf.clear() ;       
                while (true) {
                       int numread = clientChannel.read( readBuf );
                       if (numread <=0) {
                             break;
                      }                     
               }
               System.out.println("Read Echo from server:" + new String(readBuf.array()))  ;       

               key.interestOps(SelectionKey.OP_WRITE) ;
        }

You will notice that in both the client and the server, the read and write operations are in a loop. This is because we working with TCP or stream oriented sockets. Each read returns only as much data there is the socket buffer which might be less than what you need, but more data might be arriving. Similarly on write, each write will write only as much data as much space there is in the socket buffer. So you might need to call write multiple times for all data in your buffer to be written. This is no different that traditional sockets programming.

In conclusion, writing NIO networking programs is a little more involved than traditional blocking sockets programming. But they will scale much more and handle many more concurrent connections.