Tuesday, July 4, 2017

Distributed Consensus: Raft


In the Paxos blog, we discussed the distributed consensus problem and how the Paxos algorithm describes a method for a cluster of servers to achieve consensus on a decision.

However the Paxos as a protocol is hard to understand and even harder to implement.

Examples of problems that need consensus are :

- Servers needing to agree on a value, such as whether a distributed lock is acquired or not.
- Servers needing to agree on order of events.
- Server need to agree on the state of a configuration value
- Any problem where you need 100% consistency in a distributed environment.

in a highly available environment.

The raft algorithm described https://raft.github.io/ solves the same problem, It is easier to understand and implement.

Overview

The key elements of raft  are:

Leader Election
Log Replication
Consistency ( in the spec they refer to this as safety)

A server in the cluster is the leader. All other servers are followers.

The leader is elected by majority vote.

Every consensus decision begins with the leader sending a value to followers.

If a majority of followers respond having received the value, the leader commits the value and then tells all servers to commit the value.

Clients communicate with leader only.

If a leader crashes, another leader is elected. Messages between leaders and followers enable the followers to determine if leader is still alive.

If a follower does not receive messages from the leader for a certain period, it can try to become a leader by soliciting votes.

If multiple leaders try to get elected at the same time, it is possible there is no majority. In such situations, the candidates try to get elected again after a random delay.

 In Raft, time is set of sequential terms. Term is time of certain length. Leadership is for a term.

2 Main RPC messages between leader and followers :

Request Vote : sent when a candidate solicits vote.
Append Entry : sent by leader to replicate a log entry

Scenario 1 : leader election cluster start up

let us say 5 servers s1 to s5.

Every server is a follower.

No one is getting messages from a leader.

s1 and s3 decide to become leaders (called candidates) and send message to other servers to solicit vote.
Servers always vote for themselves.
s2 and s4 respond to s1.
s1 is elected leader

Scenario 2 : log replication


A client connects to a leader s1 to set x = 3.

s1 writes x=3 to its log. But its state is unchanged.

At this point the change is uncommitted.

s1 sends appendEntry message to all followers that x =3. Each follower writes that entry to log.

Followers respond to s1 that change is written to log.

When majority of followers respond, s1 commits the change. It applies the change to its state so x is now 3.

Followers are told to commit by piggybacking the last committed entry, in the next appendEntry message. Followers commit the entry by applying the change to its state.

When a change is committed, all previous changes are considered committed.

The next appendEntry message from leader to followers will include the previous committed entry. The servers can they commit any previous entries they have not yet committed.

The cluster of servers has consensus.


Scenario 3 : Leader goes down


When a leader goes down, one or more of the followers can detect that there are no messages from the leader and decide to be come a candidate by soliciting votes.

But the leader that just went down has the accurate record of  committed entries, that some of the followers might not have.

If a follower that was behind on committed entries became a leader, it could force other servers with later committed entries to overwrite their entries. That should not be allowed. Committed entries should never change.

Raft prevents this situation by requiring candidate to send with the requestVote message the term and index of the latest message it accepted from the previous leader. Each Follower rejects requestVote with a term/index lower than its highest term/index.

Since a leader only commits entries accepted by a majority of servers and a majority of servers is required to get elected, it follows that a majority or a least half of the remaining servers have accepted that highest committed entry of the leader that went down.Thus a follower that does not have the highest committed entry from the previous leader can never get elected.

Scenario 4 : Catch up for servers that have been down


It is possible for a follower to miss committed entries either because it went down or did not receive the message.

To ensure followers catch up and stay consistent with leaders, RAFT has a consistency check.

 Every append entry message also includes term and index of previous message in leaders log. If it does not match in the follower, the follower rejects the new message. When AppendEntry is accepted by a server, it means leader and server have identical entries.


When a follower rejects an appendEntry, the server retries that follower with a previous entry. This continues until the follower accepts an entry. Once an entry is accepted, the leader will again send subsequent entries that will be accepted.

Leader maintains a nextIndex  for each follower. This is the index in the log that the leader will send to the follower next.

Scenario 5 : Cluster membership changes


Cluster membership changes refers to a bunch of servers being added or removed from the cluster.
May be even the current leader is no longer in new configuration.

This needs some attention because it is possible we end up with 2 leaders and 2 majorities.

Raft takes a two phase approach

First switch to joint consensus
   -- entries committed to servers in both configuration
  --  2 leaders one from each configuration
  -- majority from each configuration needs to approve stuff

Second switch to new configuration

Leader receives request to change configuration to new.
Leader uses appendEntry to send old,new config pair to followers
Once (old,new) is committed , we are in joint consensus period.
Leader then sends appendEntry for new configuration.
Once committed, new configuration is in effect.

Summary

Consensus is required when you need 100% consistency in a distributed environment that is also highly available. Raft simplifies distributed consensus by breaking the problem into leader election and log replication. Easier to understand means easier to implement and use to solve real world problems.

A future blog will go into an implementation.

References:

1. In Search of an Understandable Consensus Algorithm by Diego Ongaro and John Ousterhout Stanford University. https://raft.github.io/raft.pdf

Related Blogs:




Saturday, November 12, 2016

Distributed Systems : Basic Paxos


1.0 Introduction

How to build reliable highly available distributed systems that are consistent ? Paxos is a protocol that addresses this problem.

Paxos was authored by Leslie Lamport in his paper "Part time parliament"  and explained better in his paper "Paxos made simple". It has been implemented and used in many of the modern distributed systems built by Google, Amazon, Microsoft etc.

Consider a banking system with clients c1,c2 and server s.

c1 can issue command to s : add $200 to account A
c2 can issue command: add 2% interest to A

A single server can easily determine the order c1,c2 and execute the commands.

But if S crashes, no client can do any work. The traditional way to solve this problem is to fail over using standby. Another server S' , identical to S is standing around doing nothing. When S crashes, the system detects that S is no longer servicing requests, starts sending requests to S'.  For reasons that merit a blog of its own, fail over using standby is hard to implement , to test and more expensive. That will not be discussed here.

A second limitation is that when the number of client requests increase, the server may not be able to keep up and respond in a reasonable time.

The way to solve scalability and fail over issues is to have multiple servers says s1 and s2 servicing clients with replication between s1 ans s2, so the clients are presented with a single system view. Commands that execute on s1 due to its clients are also made to execute on s2 and vice versa, so that both s1 and s2 are in the same state.

In Figure 1 Both S1 and S2 are active and servicing clients.

Figure 1 : Multiple server cluster with replication



However a difference in the order of execution can lead to a consistency issue.

Assume account A has $1000
Say c1 connects to s1 invokes command subtract 200 from account A.
Say c2 connects to s2 and invokes command debit 5% interest to account A.

If the order of execution is c1,c2 then c1 increases A to 1200. c2 increases it to 1260. If the order of execution is c2,c1, then c2 increases A to 1250. c1 increases it to 1250. One server may have a value 1260 , while the other may have a value 1250.

To ensure consistent results, both servers need to agree on the order of execution. In other words, there needs to be consensus among the servers. You can have more then 2 servers and the same is true.

Paxos is a protocol for achieving consensus among a group of servers.

2.0 Paxos assumptions

A group of distributed servers communicating with each other.

Asynchronous communication

non byzantine : no devious unpredictable stuff

Messages can be lost.

A majority of servers are always available. So your system should have an odd number of server 3,5,7... so that a majority can be established. Common logic tells us that with an odd number of servers, any two majorities should have at least one overlapping member. This observation is critical to the correctness of the protocol.

Server can join or leave the system at any time.

3.0 What Paxos achieves

Reliable system with unreliable components.

Only one value may be chosen.

The value is chosen when it chosen by a majority of the servers.

Once a value is chosen, it cannot be changed.

This "one value" concept can be hard to understand for a first time reader. How is choosing just one value useful in solving any real world problems ? In reality , the value is likely to be a command that needs to be executed on the server. It could be command and data or both. Value is a simplification.

For this to be useful, the servers probably need consensus on not just one value but several values. That can be achieved by a minor extension to basic Paxos and will be discussed in a subsequent blog.

4.0 Actors in Paxos

Proposers propose a value to be chosen. Proposers are generally the ones handling client requests.

Acceptors respond to proposers and can be part of the majority that lead to a value being chosen.

Learners learn the chosen value and may put it to some use.

In reality, a single server may function as all 3 and this is what we will assume

5.0 The protocol

(1) Proposer proposes a value (n,v) where n is a proposal number and v is a value.

(2) If an acceptor has not received any other proposal, it sends a response agreeing to not accept
any other proposals with number less than n.

If proposal number is less than what it has accepted or agreed to accept, it can ignore the proposal.

If it has other lower number proposals accepted with value v or any other v', it responds with the accepted proposal number and value v'.

The acceptor continues to do this with any subsequent proposal it receives. It must remember the highest proposal number it has. This is important because as described in step 5, it should never accept any lower numbered proposals.

(3)The proposer examines responses to its proposal.

If majority of acceptors responded with value v', then mean that v' is either chosen or has a good chance of being chosen. Proposer must take v' as value . If majority does not have value, it can stay with original v.

(4) Proposer sends accept message (n,v')

(5) When an acceptor receives an accept message (n,v) or (n,v') . It must accept the value if n is still the highest proposal it has.

Between step 2 and 5, other proposers could have send other proposals with number higher than n. If the acceptor has any such proposals, it cannot accept n.

In either case, it returns to the proposer, the highest proposal number it has.

(6) The proposer can use the proposal number in response from 5 to determine if its accept message is accepted. If the proposal number is the same as n, then it known that n is accepted.

Other wise the returned number  is that of the larger proposal number that is around. It has to go back to step 1 and start with new proposal number greater than this.

6.0 Notes

Proposals have order as indicated by n. Newer proposals override older proposals. If an acceptor has received proposal n. It can ignore all proposals less than n.

Proposal numbers need to be unique across proposers. 

Multiple rounds of propose and accept may be necessary before a majority for a chosen value is reached.

Once a value is chosen, future proposers will also have to choose that value. That is the only way we can get to one and only one value chosen.

Proposals are ordered. Older ones are ignored or rejected

7.0 Examples

In this section we go through some scenarios of  how the protocol works.

7.1 Case 1 :  Value chosen for the first time 

Figure 2 : Value chosen for first time
 




























This is the most basic case of no value yet chosen and a value proposed for the first time.

3 servers s1,s2,s3. 2 is majority

1. s1 sends proposal 1 with value X to s2
2. s2 has no previous proposal or value , so it responds agreeing to not accept any proposals numbers less than 1
3. s1 has agreement from majority. So it sends accept message to s2 which accepts and X is the chose value.



7.2 Case 2 :  Value proposed after one already chosen

Figure 3 : Value proposed after one chosen
s1 and s2 have agreed on value X.

s3 does not know of this. s3 send proposal 2 with value Y to s2.

s1 responds that it has accepted proposal 1 with value X.

s2 has to update its value to X. s2 sends an accept message with value X which is accepted.

s1,s2,s3, all have value X.

7.3 Case 3: No value yet chosen two competing proposals 1 wins



















Figure 4: Competing proposals

There are 5 servers  s1,s2,s3,s4,s5. 3 is majority

s1 sends proposal 1 with value X to s2,s3
s2 agrees to accept 1,X
Before s3 receives accept message for  (1,X) s5 sends (2,Y) to s3,s4
Now s3 cannot accept (1,x) because its highest proposal is 2.
s3,s4 respond agreement to (2,Y) to s5
s3 ignores proposal (1,X)
s5 sends accept(2,Y) to s3,s4 which accept
s1 sends a new proposal (3,X).
s3 responds (2,Y)
s1 sends accept(3,Y)
s1 and s2 also agree on Y 


7.4 Case 4 : Not making progress or  liveness

















Figure 5 : Not making progress

s1 proposes values to s2 ,s3. s5 proposes values to s3,s4.

s1 proposes (1,X). s3 agrees to not accept proposal less than 1.
Before (1,X) can be accepted s5 proposes (2,Y). s3 now agrees not to accept less than 2
When s1 tries to get (1,X) accepted, It will not get accepted because there is a proposal 2. It sends out a proposal (3,x).
s2 will not be able to get (2,Y) accepted because there is a (3,X). It sends outs a (4,Y).
s1 will not be able to get (3,X) accepted because there is a (4,Y). It sends out a (5,x)

This may go on and on. One way to avoid this is for each server to introduce a random delay before issuing the next proposal, there by givings the others a chance to get their proposals accepted.

Another solution is to have a leader among the servers and have the leader be the only one that issues proposals.

8.0 Paxos usage in real world 

The basic protocol enable servers to arrive at a consensus on one value. How does one value apply to real world system ? To solve real world problems like the one described in the introduction, you have run multiple instances or interactions of Paxos. For a group of servers to agree on the order of a set of commands, think of a list of command 0 .. n. Each Paxos instance would pick a command at each index. This is multi Paxos and merits a blog or discussion on its own.

Some real world usages of Paxos have been to arrive at consensus on locks, configuration changes, counters.

9.0 References

"Part time parliament" by Leslie Lamport
"Paxos made Simple" by Leslie Lamport
"Time, clocks and the ordering of events in a distributed system" by Leslie Lamport

Tuesday, October 20, 2015

JAVA 8 : Lambdas tutorial

Lambdas are the biggest addition to JAVA in not just release 8 but several releases. But when you look at the cryptic lambda syntax, like most regular programmers, you are left wondering why one should write code this way. 6. The purpose of this tutorial is to introduce lambdas, so that you can start using them in real code.

Overview

Lambdas facilitate defining, storing and passing as parameters blocks of code. They may be stored in variables for later use or passed as parameters to methods who may invoke the code. This style of programming is known as functional programming.

You might argue that JAVA already supported functional programming using anonymous classes. But that approach is considered verbose.

Example

Listing 1 shows the old way to pass executable code to a thread.

  public void Listing1_oldWayRunnable() {
        Runnable r = new Runnable() {
            @Override
            public void run() {
                System.out.println("Hello Anonymous") ;
            }
        } ;
        Thread t = new Thread(r) ;
        t.start() ;
    }

Listing 2 shows the new way using lambdas.

public void Listing2() {

        Thread t = new Thread(()->System.out.println("Hello Lambdas")) ;
        t.start() ;
    }

Listing 2 has no anonymous class.  It is much more compact.

()->System.out.println is the lambda.

Syntax

The syntax is

(type)->statement
 Where type is the parameter passed in. In our example, there was no parameter. Hence the syntax was ()->statement

If you had multiple parameters, the syntax would be
(type1,type2)->statement

If you had multiple statements, the syntax would be a
(type) ->{statement1; statement2} ;

Storing in a variable

The lambda expression can also be stored in variable and passed around as shown in listing 3.

 public void Listing3() {
        Runnable r = ()->System.out.println("Hello functional interface") ;
        Thread t = new Thread(r) ;
        t.start() ;
    }

Functional interface

JAVA 8 introduces a new term functional interface. It is an interface with just one abstract method that needs to be implemented. The lambda expression provides the implementation for the method. For that reason, lambda expressions can be assigned to variables that are functional interfaces. In the example above Runnable is the functional interface.

You can create new functional interfaces. They are ordinary interfaces but with only one abstract method. @FunctionalInterface is an annotation that may be used to document the fact that an interface is functional.

Listing 5 show the definition and usage of a functional interface.

@FunctionalInterface
    public interface Greeting {
        public void sayGreeting() ;
    }

    public static void greet(Greeting s) {
        s.sayGreeting();
    }

    @Test
    public void Listing5() {
        // old way
        greet(new Greeting() {
            @Override
            public void sayGreeting() {
                System.out.println("Hello old way") ;
            }
        }) ;

        // lambda new way
        greet(()->System.out.println("Hello lambdas")) ;
    }
}

Once again you can see that the code with lambdas is much more compact. Within an anonymous class, the "this" variable resolves to the anonymous class. But within a lambda, the this variable resolves to the enclosing class.

java.util.Function

The java.util.Function package in JDK 8 has several starter ready to use functional interfaces. For example the Consumer interface takes a single argument and returns no result. This is widely used in new methods in the java.util.collections package. Listing 6 shows one such use with the foreach method added to Iterable interface, that can be used to process all elements in a collection.

@Test
    public void Listing6() {
         List l = Arrays.asList(1,2,3,4,5,6,7,8,9) ;
         l.forEach((i)->System.out.println(i*i)) ;
    }

 

In summary, Java 8 lambdas introduce a new programming style to java. It attempts to bring JAVA up to par with other languages that claim to be superior because they support functional programming. It is not all just programming style. Lambdas do provide some performance advantages. I will examine them more in future blogs.


Monday, July 20, 2015

ConcurrentHashMap vs ConcurrentSkipListMap

In the blog Map classes, we discussed the map classes in java.util package. In blog ConcurrentHashMap, we ventured into concurrent collections and discussed the features of ConcurrentHashMap, which offers much superior concurrency than a conventional HashMap.

In this blog we discuss another concurrent map, the ConcurrentSkipListMap and compare it with ConcurrentHashMap. Package java.util has a HashMap and TreeMap. Have you ever wondered why java.util.concurrent has a ConcurrentHashMap, but no ConcurrentTreeMap and why there is a ConcurrentSkipListMap ?

In the non concurrent Collections, there is a  HashMap and TreeMap. HashMap for O(1) time complexity and TreeMap for maintaining a sorted order but O(logn) complexity. The implementation of a tree map is not a ordinary binary search tree(BST), because a BST that is not balanced degrades in performance to O(n) for input that is already sorted. TreeMap is implemented as a Red black tree, whose implementation is complex and involves balancing the tree (moving the nodes around) when nodes are added or removed.  The complexity is even more when you try to make the implementation concurrent (safe for concurrent use). For that reason there is no ConcurrentTreeMap in java.util.concurrent.

A concurrent implementation of SkipList is simpler. Hence, for a Map that is ordered and concurrent,the implementators choose SkipList.

What is a Skip List ?

A skiplist is an ordered linked list with o(log n) worst case search time. An ordinary linked list has o(n) worst case search time. A skip list provides faster search by maintaining layers of links, allowing the search to skip  nodes. As shown in the figure, the lowest layer is an ordinary linked list. But each higher layer skips some (more) nodes.

level4 10-------------------------------------100-null
level3 10-----------------50-----------------100-null
level2 10-------30------ 50-----70---------100-null
level1 10 -20 -30 -40 -50 -60-70-80-90-100-null

Let us you need to find 80 in the list.
Start are highest level 4. Search linearly to find the node that is equal to or whose next node is greater than 80. At level 4, 100 is greater than 80. So at node 10, move down to level 3.

At level 3, node 10, 50 is less than 80. Move to node 50. Next node 100 is greater that 50. Move down to level 2 at node 50.

At level 2 node 50, next node is 70 which is less than 80. Move to node 70.  Next node is 100 which is greater than 80. Move to level 1 at node 70.

At level 1, this is the last level. Keep going forward from 70 till you find 80 or reach end of the list.

Adding more levels can leads to faster search.

Skiplist has O(logn) performance for search, insert and delete. Depending on number of levels, it does use some extra space. Space complexity is O(nlogn).

In general, you will use a ConcurrentHashMap, if you must have O(1) for both get and put operations, but do not care about the ordering in the collection. You will use a ConcurrentSkipListMap if you need an ordered collection (sorted), but can tolerate O(logn) performance for get and put.

Lastly, SkipList is easier to implement than a balanced tree and is become the data structure of choice for ordered concurrent Map.

Tuesday, May 19, 2015

Apache Cassandra: Compaction

In Cassandra vs HBase, I provided an an overview of Cassandra. In Cassandra data model, I covered data modeling in Cassandra. In this blog, I go a little bit into Cassandra internals and discuss Compaction, a topic that is a source of grief for many users. Very often you hear that during compaction, performance degrades. We will discuss what compaction is, why it is necessary and the different types of compaction.

Compaction is process of merging multiple SSTables into larger tables. It removes data that has been marked for deletion and reduces fragmentation. Generally it happens automatically in the background, but can be started manually as well.

Why is compaction necessary ?

Cassandra is optimized for writes. A write is first written in memory to a table called Memtable. When Memtable reaches a certain size it is written in its entirety to disk as a new SSTable. SStable has an index which consists of sorted keys, which point to the location  in file that has the columns. SSTables are immutable. They are never updated.

The high throughput for writes is achieved by always appending and never seeking before writing . Updates to existing keys are also written to the current Memtable and eventually written to a new SStable. There are no disk seeks while writing.

Obviously, over time there are going to be several SSTables on disk. Not only that, but the latest column values for a single key might be spread over several SSTables.

How does this affect reads ?

Reading from one SSTable is easy. Find the key in the index. Keys are sorted. So a binary search would find the key. After that it is one disk seek to the location of the columns.

But as pointed out earlier, the updates for a single key might be spread over several SSTables. So for the latest values, Cassandra would need to read several SSTables and merge updates based on timestamps before returning columns.

Rather than do this for every read, it is worthwhile to merge SSTables in the background, so that when a read request arrives, Cassandra needs to just read from fewer SSTables ( one would be ideal).

Compaction

Compaction is the process of merging SSTables in order to
  • read columns for partition key from as few SSTables as possible
  • remove deleted data
  • reduce fragmentation
We did not talk about delete earlier. When Cassandra receives a request to delete a partition key, it merely marks it for deletion but does not actually remove the data associated with the key. The term used in Cassandra is "tombstone". A tombstone is created. During compaction, tombstones are supposed to be removed.

Types of Compaction

Size tiered compaction:

This is based on number of  SSTables and size of table. A compaction is triggered when the number tables and their size reaches a certain threshhold. Tables of similar size are grouped into buckets for compaction. Smaller tables are merged into a larger table.

Some disadvantages of size tiered compaction are that read performance can vary because the columns for a partition key can be spread over several SSTables. A lot for free space ( double the current storage) is required during compaction, since the merge process is making a copy.

Leveled compaction:

There are multiple levels of SSTables. SSTables within a level are of the same size and non overlapping (Within each level, a partition key will be in one SSTable only) . SSTables in the higher levels are larger. Data from the lower levels is merged into SSTables of the higher levels.
Leveled compaction tries to ensure that most reads happen from 1 SSTable. The worst read performance is bound by the number of levels. This works well for read heavy workloads because Cassandra knows which SSTable within each level to check for the key. But more work needs to be done during compaction especially for write(insert)  heavy workloads. Due to the extra work to ensure a fixed number of SSTables, there is a lot more IO.

Data tiered compaction:

 Data written within a certain period of time say 1 hr is merged in one SSTable.  This works well when you are writing time series data and querying based on timestamp. A query such as give me columns written in the last 1 hr can be serviced by reading just 1 SSTable. This also makes it easy to remove tombstones that are based on TTL. Data with the same TTL is likely to be in the same SSTable and the entire SSTable can be dropped.

Manual compaction:

This is compaction started manually using the nodetool compact command. A keyspace and table are specified. If you do not specify the table, the compaction will run on all tables. This is called a major compaction. It involves a lot of IO and is generally not done.

In summary, compaction is really fundamental to distributed databases like Cassandra. Without the append only architecture, write throughput would be much lower. And high write through put is necessary for high scalable systems or stated in another way - writes are much harder to scale and are generally the bottleneck. Read can be scaled easily by de-normalization , replication and caching.

Even with relational databases, applications do not go to Oracle or MySql for every read. Typically there is cache like Memcached or Redis, that caches frequently read data. For predictable read performance consider fronting Cassandra with a fast cache. Another strategy is to use different Cassandra clusters for different workloads. Read requests can be sent to clusters optimized for read.

Lastly , Leveled compaction works better for read intensive loads where as Data tiered compaction is suited for time series data and when the there is steady write rate. Size tiered compaction is used with write intensive workloads. But there is no silver bullet. You have to try, measure and tune for optimal performance with your workload.


Related Blogs:

Cassandra vs HBase
Cassandra data model
Choosing Cassandra







Saturday, March 28, 2015

Apache Kafka : New producer API in 0.8.2

In Kafka version 0.8.2, there is a newer, better and faster version of the Producer API. You might recall from earlier blogs that the Producer is used to send messages to a topic. If you are new to Kafka, please read following blogs first.

Apache Kafka Introduction
Apache Kafka JAVA tutorial #1 

Some features of the new producer are :
  • Asynchronously send messages to a topic.
  • Send returns immediately. Producer buffers messages and sends them to broker in the background.
  • Thanks to buffering, many messages sent to broker at one time without waiting for responses.
  • Send method returns a Future<RecordMetadata>. RecordMetadata has information on the record like which partition it stored in and what the offset is.
  • Caller may optionally provide a callback, which gets called when the message is acknowledged.
  • Buffer can at times fill up. Buffer size is configurable and can be configured using the total.memory.bytes configuration property.
  • If the buffer fills up, the Producer can either block or throw an exception. The behavior is controlled by the block.on.buffer.full configuration property.
In the rest of the blog we will use Producer API to rewrite the Producer we wrote in tutorial #1

For this example, you will need the following

For this tutorial you will need

(1) Apache Kafka 0.8.2
(2) JDK 7 or higher. An IDE of your choice is optional
(3) Apache Maven
(4) Source code for this sample from https://github.com/mdkhanga/my-blog-code so you can look at working code.

In this tutorial we take the Producer we wrote in Step 5 Kafka tutorial 1 and rewrite it using the new API. We will send messages to a topic on a Kafka Cluster and consume it with the consumer we wrote in that tutorial.

Step 1: Step up a Kafka cluster and create a topic

If you are new to Kafka, you can read and follow the instructions in my tutorial 1 to setup a cluster and create a topic.

Step 2: Get the source code for tutorial 1,2,3 from https://github.com/mdkhanga/my-blog-code

Copy KafkaProducer.java to KafkaProducer082.java. We will port KafkaProducer082 to the new producer API.

Step 3: Write the new Producer

Update the maven dependencies in pom.xml.

For the new producer you will need

<dependency>
          <groupId>org.apache.kafka</groupId>
          <artifactId>kafka-clients</artifactId>
          <version>0.8.2.0</version>
 </dependency>

The rest of the client code also needs to be updated to 0.8.2.

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.10</artifactId>
    <version>0.8.2.0</version>
</dependency>

The new producer will not work if rest of the client uses 0.8.1 or lower versions.

Step 3.1: Imports

Remove the old imports and add these.


import org.apache.kafka.clients.producer.KafkaProducer ;
import org.apache.kafka.clients.producer.ProducerRecord;

Note the packages.

Step 3.2: Create the producer

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("request.required.acks", "1");

KafkaProducer producer = new KafkaProducer(props);

As in the past, you provide some configuration like which broker to connect to as Properties. The key and value serializers have to be provided. There are no default values.

Step 3.3: Send Messages

String date = "04092014" ;
String topic = "mjtopic" ;
     
for (int i = 1 ; i <= 1000000 ; i++) {
          
   String msg = date + " This is message " + i ;
   ProducerRecord data = new ProducerRecord(topic, 
            String.valueOf(i), msg);
            
    Future rs = producer.send(data, new Callback() {
        @Override
        public void onCompletion(RecordMetadata recordMetadata, Exception e) {

          System.out.println("Received ack for partition=" + recordMetadata.partition() +

           " offset = " + recordMetadata.offset()) ;
        }
      });

      try {
        RecordMetadata rm = rs.get();
        msg = msg + "  partition = " + rm.partition() +  " offset =" + rm.offset() ;
        System.out.println(msg) ;
      } catch(Exception e) {
        System.out.println(e) ;
      }

         
}

As mentioned earlier. The send is async and it will batch messages before sending to the broker. The send method immediately returns a Future that has the partition and offset in the partition for message send. We provide a callback to the send method whose onCompletion method is called when an acknowledgement for the message is received.

Step 4: Start the Consumer

mvn exec:java -Dexec.mainClass="com.mj.KafkaConsumer"

Step 5: Start the Producer

mvn exec:java -Dexec.mainClass="com.mj.KafkaProducer082" 


You should start seeing messages in the consumer.

In summary, the new producer API is asynchronous, scalable and returns useful metadata on the message sent.


Related Blogs:
Apache Kafka Introduction
Apache Kafka JAVA tutorial #1
Apache Kafka JAVA tutorial #2 
Apache Kafka JAVA tutorial #3


Friday, January 23, 2015

MongoDB tutorial #1 : Introduction

In the blog NoSQL, I provided an introduction to NoSql databases. We have discussed some NoSql databases such as HBase, Cassandra , Redis. In this blog, we discuss MongoDB, a document oriented database, which is in contrast to the key value stores we discussed  earlier. MongoDB is currently one of the more popular NoSql databases, primarily due to its ease of use and simpler programming model. But there have been reports that it lags in scalability or performance compared to other NoSql databases. And it has more moving parts. But its ease of use and low learning curve makes it an attractive choice in many scenarios.

The key features of MongoDB are:
  • The unit of storage like a record in relational databases or key-value pair in key value stores, is a document or more precisely a JSON document.  
    • { "employee_id":"12345",
    •    "name":"John doe",
    •    "department": "database team",
    •    "title":"architect",
    •    "start_date":"1/1/2015" }
  • Documents are stored in collections.
  • Collection can be indexed by field. 
  • Indexing support for faster queries.
  • No schema is required for the collection.
  • MongoDB is highly available using replication and automatic failover. Write happens to a primary server but can be replicated to multiple replicas. If the primary goes down, one of the replicas takes over as the primary.
  • Read operations can be scaled by sending the reads to the replicas as well.
  • Write operations are scaled by sharding.
  • Sharding is automatic.But has a couple of moving parts
    • Sharding is based on a key which is an indexed field or a indexed compound field.
    • Sharding can be range based or hash based. With range based, partitioning is based on key range, so that values close to each other are together.  With Hash based, the partioning is based on a hash of the key.
    • Data set is divided into chunks. Each shard manages some chunks
    • Query routers are used to send the request to the right shard.
    • Config servers hold meta data on which chunks are with which shard.
    • If a chunk grows too large, it is broken up. If some shards own more chunks than others, the cluster is automatically rebalanced by redistributing the chunks.
In the rest of the blog, let us fire up a mongodb instance, create some data and learn how to query it.

Step 1: Download Mongo

You can download the server from www.mongodb.org/downloads.
I like to download the generic linux version and untar it.

Untar/unzip it to a directory of your choice.

Step 2 : Start the server

Decide on a directory to store the data. Say ~/mongodata. Create the directory.

Change to the directory where you installed mongo. To start the server, type the command.

bin/mongod -dbpath ~/mongodata

Step 3: Start the mongo client

bin/mongo

Step 4: Create and insert some data into a collection

Create and use a database.
> use testDb ;

Create a employee document and insert into the employees collection.
> emp1 = { "employee_id":"12345", "name":"John doe", "department": "database team", "title":"architect", "start_date":"1/1/2015" }
> db.employees.insert(emp1)

Retrieve the document.
> db.employees.find()
{ "_id" : ObjectId("54c2de34426d3d4ea1226498"), "employee_id" : "12345", "name" : "John doe", "department" : "database team", "title" : "architect", "start_date" : "1/1/2015" }

Step 5 : Insert a few more employees

> emp2 = { "employee_id":"12346", "name":"Ste Curr", "department": "database team", "title":"developer1", "start_date":"12/1/2013" }
> db.employees.insert(emp2)

> emp3 = { "employee_id":"12347", "name":"Dre Grin", "department": "QA team", "title":"developer2", "start_date":"12/1/2011" }
> db.employees.insert(emp3)

> emp4 = { "employee_id":"12348", "name":"Daev Eel", "department": "Build team", "title":"developer3", "start_date":"12/1/2010" }
> db.employees.insert(emp4)

Step 6: Queries

Query by attribute equality
> db.employees.find({"name" : "Ste Curr"} )
{ "_id" : ObjectId("54c2e0de426d3d4ea1226499"), "employee_id" : "12346", "name" : "Ste Curr", "department" : "database team", "title" : "developer1", "start_date" : "12/1/2013"  }

Query by attribute with regex condition
> db.employees.find({"department":{$regex : "data*"}})
{ "_id" : ObjectId("54c2de34426d3d4ea1226498"), "employee_id" : "12345", "name" : "John doe", "department" : "database team", "title" : "architect", "start_date" : "1/1/2015" }
{ "_id" : ObjectId("54c2e0de426d3d4ea1226499"), "employee_id" : "12346", "name" : "Ste Curr", "department" : "database team", "title" : "developer1", "start_date" : "12/1/2013" }

Query using less than , greater than conditions
> db.employees.find({"employee_id":{$gte : "12347"}})
{ "_id" : ObjectId("54c2e382426d3d4ea122649a"), "employee_id" : "12347", "name" : "Dre Grin", "department" : "QA team", "title" : "developer2", "start_date" : "12/1/2011" }
{ "_id" : ObjectId("54c2e3af426d3d4ea122649b"), "employee_id" : "12348", "name" : "Daev Eel", "department" : "Build team", "title" : "developer3", "start_date" : "12/1/2010" }

> db.employees.find({"employee_id":{$lte : "12346"}})
{ "_id" : ObjectId("54c2de34426d3d4ea1226498"), "employee_id" : "12345", "name" : "John doe", "department" : "database team", "title" : "architect", "start_date" : "1/1/2015" }
{ "_id" : ObjectId("54c2e0de426d3d4ea1226499"), "employee_id" : "12346", "name" : "Ste Curr", "department" : "database team", "title" : "developer1", "start_date" : "12/1/2013" }

Step 7: Cursors

Iterate through results.
> var techguys = db.employees.find()
> while ( techguys.hasNext() ) printjson( techguys.next() )
{
    "_id" : ObjectId("54c2de34426d3d4ea1226498"),
    "employee_id" : "12345",
    "name" : "John doe",
    "department" : "database team",
    "title" : "architect",
    "start_date" : "1/1/2015"
}
.
.
.

Step 8: Delete records

Delete one record
> db.employees.remove({"employee_id" : "12345"})
WriteResult({ "nRemoved" : 1 })

Delete all records
> db.employees.remove({})
WriteResult({ "nRemoved" : 3 })

As you can see MongoDb is pretty easy to use. Download and give it a try.