Showing posts with label architecture. Show all posts
Showing posts with label architecture. Show all posts

Saturday, July 20, 2024

Replication in modern data systems

Overview

Replication means making a copy of the data for future use in the case of failures or may be to scale.

Why is it a big deal ? We copy files for backup all the time. For static files, that do not change, making a copy is 1 copy command. But if the data is being updated by users all the time. How often do you run the command to copy. How do you keep the copy in sync with the source ?

That is the problem of replication in databases and data systems. All databases have replication built in that you can setup with a command or two. So why read or discuss it? If you are building a distributed systems that involves data, you will need to replicate data. The concepts from databases will be useful. 

While replication is most well known for its use with databases, it is also a critical part of distributed systems where the data is unstructured such as distributed file systems (HDFS) or messaging systems (Apache Kafka) 

This post covers replication in traditional single node systems as well as modern distributed systems.



Why do we need replication ?

There are several reasons why replication is needed. It is more than just taking a backup.

Redundancy

Make a copy of the data. When the main server becomes unavailable for any reason, switch to the copy. This is ensure that the data is always available.

Scalability

Your data becomes really popular and the database gets a lot of read requests and cannot keep up. So you make copies of the database and have a load balancer distribute the request across to the copies (replicas).

Geo distribution of data

Bring the data close of user. You have users in Americas, Europe and Asia. Data from americas is replicated to Europe and Asia, so users there can read data locally without making a round trip to the americas for every read.

Secondary use cases

These are lesser known and unconventional use cases. They might be done higher up in the stack at the application layer or middleware than in the database. 

Mirroring

Mirroring involves replicating the requests to the application to a copy of the entire application stack. You can think of this as application level replication.



For example, for a REST service, this involves sending the http request, not just to the production service but also to a mirror service.

The mirror service reads and writes from the mirror database. Mirror database is a previous replica that was in sync with the leader. Just before starting mirroring, it is discontinued as a replica so it does not get duplicates.

Mirroring can be used for testing large complex changes against production traffic.

Data in the mirror database is then compared with data in the production database for accuracy.

Testing

A regular database replica is used as a test database. Various kinds of tests - feature tests, performance tests, concurrency tests, scalability tests can be run with services running with the replica. This is a different use case from mirroring.

Migration

This can be used to eliminate or reduce downtimes needed for migration.

Create additional replicas.

Run migration on them.

Rollover the application services to the new database replicas.

Replication strategies

Single leader

This is the most common pattern. It shown in Figure 1.

One server is designated as the leader. The others are followers. All writes go to the leader. The leader replicates the writes to the followers.

The advantages are :

Setting up is fairly easy.

Reads become scalable. You can put a load balancer in front and distribute read requests to followers.

High availability: If the leader fails, you fail over to one of the followers and let it become the leader.

The disadvantages are :

All the writes go to one server , the leader. So this can become a bottleneck. Writes are not scaled. 

If you read from a replica that is behind on replication, you might read stale data.

Multi leader

Writes can go to more than one server.

Multi leader replication is needed when

(1) Writes and replication needs to happen across geographically distributed areas.

(2) Connectivity to single leader is not guaranteed. The is usually the case with mobile devices or laptops or when people want the ability to work offline and/or multiple devices.

In the geo distributed case, the writes go to a local local leader. The local leader not only replicates to local replicas but also to the distributed leader (who replicate to their replicas).

In the mobile case, the writes are store locally and the replicated periodically when connectivity is available.

Advantages:

Writes are also scaled.

Writes can done locally or close to clients. Better latency for writes.

Disadvantages:

Since writes happen at multiple leaders. There can be conflict. The conflicts need to be resolved.

Leaderless

In the leaderless model, all nodes are equal and no node is designated leader. Writes can go to any node and that node replicates the write to other nodes. This is the model made popular by AWS Dynamo and later adopted by Cassandra.


Consensus based replication

All the above methods have either write conflict or read consistency issues. Raft and Paxos are two well protocols for replicating log entries. Data to be replicated is modeled as a list of entries in log. The short story is that one server sends one entry or a sequence of entries to others and it is considered committed if a majority of servers acknowledge having received them. Raft has leader election but Paxos is leaderless. Raft protocol describes in detail leader election, replication, server crashes, recovery and consistency checks. The paper is a good read for anyone interested in distributed systems.

Replication Implementations

The first three techniques apply to databases which deal with structured data and are a little more complicated.

Statement based replication

In this approach, the SQL statements such as INSERT/UPDATE/DELETE etc are forwarded as they are from the leaders to the followers. While this can work in most cases, it does not work in certain cases such as timestamps or when you generate an id or a random number.

It is not efficient either. If you insert a record and then delete it, why replicate both commands ?

Write ahead log (WAL) replication

Databases first append every write to the WAL before doing anything else, before writing it to structured storage from where it will be read. WAL is used for recovery. If the database crashed, it state is reconstructed from the WAL. A recent slogan has been "The WAL is the database". Replication here involves replicating the WAL.

A disadvantage is that WAL entries contain where specific storage details like which byte in which block is to be updated. This can create compatibility issues if the leader and followers are on different versions.

Logical replication

A logical log on the other hand captures at a row, how the table was changed. You can view this as an approach somewhere between statement based and WAL replication.

Change data capture is a form of logical replication. It is used to replicate changes in a database to other third party systems. A popular use case is data warehousing where data from multiple sources is aggregated and summarized for analytics. 

Unstructured data replication

For unstructured data as in distributed file systems the unit for replication is a block of data. Data is first partitioned into blocks and each block is replicated independently.

Potential issues with replication

Replication Lag

Most of the time replication is asynchronous. Client writes to the leader and returns before any acknowledgement that it has been replicated. Synchronous replication is not viable due both performance and availability issues. A single failure can hold up all replications.

Lost write

However, one problem this creates is that if you read immediately after a write, the replica you are reading from may not yet have your last write.

Inconsistent read

If you read multiple times in quick succession ( same read) , each read may get a different result depending on which replica services the read ( as the replicas may be in different stages of replication)

Cassandra addressed this issue using quorum. CockroachDb uses a consensus protocol like Raft.

Write Conflicts

Write conflict is an issue in multi leader replication. This happens when multiple clients update the same data while talking to a different master. The database does not know which update to accept and how they should be merged. This is similar to a merge conflict in git.

An approach to handle conflicts is to store both versions on write. But on read, send both versions to the client and let the the client resolve the conflict

Replication is real world systems

The product documentation for database on replication can be quite confusing. It best to follow a tutorial or blog in the internet.

Postgres

The documentation and blogs describe it in 2 ways.

You can set it up as synchronous, asynchronous , streaming , log file based etc

And it can be WAL based or logical replication. Statement based is rarely seen.

In snapshot replication, a snapshot of the database is taken and replicated to followers.

Instead of streaming, you can also setup the replication as file based, where the WAL files are periodically shipped to followers.

In WAL replication, replication slots lets the leader track how much of the WAL is replicated to each replica. This helps the leader not discard segments not yet replicated. But this consumes resources on the leader. Replication slots need to be managed and deleted when not needed.

Mysql

The traditional way in mysql was a logical replication based on their binlog file - a binary format for logical changes.

The newer way is based on global transaction identifier (GTID) which is built on top of the binlog. It can be either statement based or row based.

Dynamo / Cassandra

In this architecture, replication is fundamental to the architecture. All you need to do is to set the replication factor to greater than 1. All servers are equal - no leader and no follower. Writes can go to any server. Partitioning is also fundamental to the architecture. The server that receives the write redirects the write to appropriate server. From here it is replicated to other servers based on the replication factor.

Consistency issues are addressed using quorum based tunable consistency. Quorum mean a majority which is (RF/2+1) agree on something. If you have replication factor (RF) 3, quorum is 2. So on a write, at least 2 nodes need to acknowledge that the write was saved. On read, at least 2 nodes need to agree on the return value. In general, to avoid inconsistencies, you want Read quorum (R)+ Write quorum (W) > RF .

CockroachDb

CockroachDB uses the Raft distributed consensus protocol to ensure that a majority of replicas are in consensus before any change is committed. This is the safest approach to ensure consistency but comes at a cost.

Apache Kafka

In Kafka, messages are sent and received from topics. Topics are split into partition. Each partition has one leader and a configurable number of replicas. Writes go to the leader which replicates to the replicas. Reads can go to the replicas. Each broker is a leader for some partitions but a follower for other partitions. Like Cassandra and CockroachDb, replication is core to the architecture and easy to setup.

Apache Hadoop (HDFS)

This applies to any distributed file system. The file is a sequence of blocks of data. HDFS has a name node and data nodes. Name node maintains a map of which data nodes have the blocks of a file. Each block is replicated to a configurable number of data nodes.

Conclusion

Replication is a critical piece of any distributed data system. It has to be part of the core architecture. It cannot come after the fact like it did in the past. While redundancy and HA are well known benefits, there are other benefits such geo distribution of data as well. It can cause some effects such as read consistency. Care should be taken to address those. Different products use different strategies. You should be familiar with the replication strategies, configuration and side effects for your data product. If you are building a new system with data, understanding how existing systems replicate and the issues they face, can help you design your replication.

Sunday, June 30, 2024

CockroachDb Review: Should I use CockroachDb ?

 Overview

CockroachDb is a modern distributed database that promises linear scalability with strict serializability.

Server side sharding is automatic. Nodes can be added easily as needed. Claims to provide the SERIALIZABLE isolation level.

Most distributed databases such as Cassandra, MongoDb, HBase etc sacrifice consistency to achieve high availability. CockroachDb distinguishes itself by claiming to be distributed and the same time offer strong consistency that even single node databases do not offer.

This falls into a database category called NewSql or DistributedSQL as opposed to NoSql (Cassandra, MongoDb)

When to choose CockroachDb ?

You should choose CockroachDb if

    Your data is of a global scale.

    As data size increases, you need to scale horizontally to several nodes.

    You need data to be distributed and localized in specific geographical regions. For
    example EU data resides in Europe while US data resides in US.

    You need strong consistency. Serializable isolation level.

    You need to keep the SQL / relational data model.

    You need distributed transactions.

You may want to pass on it if

    You data size can easily fit on a node for the foreseeable future.
    You organization is more comfortable with a stable proven database. (CockroachDb is
    still maturing).
    You data model is heavily normalized and you do a lot of joins in your queries. While this
    database can support joins, it is still not recommended in a highly distributed
    environment.

Architecture

Architecture is based on Google's Spanner paper.

It is a key value store with a SQL interface on top of it.

Database is a cluster of nodes. All nodes are equal. Nodes may join and leave the cluster at any time.

Sorted map of key values pairs. Fully ordered monolithic key space. All tables/indexes go into the same key space by encoding tablename/indexname/key together.

Sharding

Key value pairs are broken up into contiguous ranges. 

When range size reaches 512 Mib (2 power 20) It is split into 2 ranges.

Each range is assigned to a node and replicated. 

If you have 1 node all the shards are in that node. To scale, you add more nodes and the shards get distributed across nodes. A minimum of 3 nodes is recommended. 

Very easily spin up node(s) and add to cluster anywhere. 

Btree like index structure used to locate shard that has a key.

Replication

Data in each range is replicated using the Raft consensus algorithm.

A minimum replication factor of 3 is needed.

This provides the high availability. Data is available as long as a majority of the nodes in the cluster are available.

Geo-partitioning

By adding a country or region to the primary key, you can limit storage to keys to a particular region. So European data can be make to reside in Europe, US data in US and so. This has 2 benefits
There is a performance benefit and data is local to its users.

It can satisfy legal requirements where data is not allowed to leave a country or region.

Read/Write

Reads

Any node can receive a request to read a key/value.

Request is forwarded to the node that is the raft leader for that table/range.

Leader returns the data to the node that requested it. Since leader returns the data, no consensus is required.

Node returns it to the client.

Writes

Any node can receive a request to write a key/value.

Request is forwarded to the node that is the raft leader for that table/range.

Leader writes the value to its log and initiates consensus with replicas for the range. When majority acknowledges, the key/value is considered committed and leader notifies the requesting node which notifies the client

Transactions

Supports transactions that spans multiple tables and rows.

Transactions can be distributed, that is span multiple nodes.

The supported isolation level is strict serializability which is the highest isolation level. Strict serializability means that not only are transactions ordered, but they are ordered as per wall clock time.
Transaction protocol is an improvement over two phase commit. In parallel, participants acquire locks and create write intents. The transaction is marked staged. When the client commits, if all locks are acquired and writes are replicated, the coordinator immediately returns success to client. In background the transaction is marked committed. This is one round trip between transaction coordinator and each participant - unlike two phase commit - which requires two round trips.

Hybrid logical clocks are used to timestamp each transaction. Timestamp is the version for MVCC.

Data Model

Clients see only the SQL row column relation model
Wire protocol is same as Postgresql wire protocol.

Performance

Efficient range scan.
Geo partitioning improves performance by locality.
Distributed SQL execution.
Distributed transactions will be slow.
Generally you do not want distributed transactions over large distances. If you build a 3 node CockroachDb cluster with 1 node in NewYork, 1 in London and 1 in San Francisco, the write latencies are going to be very high due to the round trips for RAFT and distributed transactions. The cluster topology needs to be designed appropriately to give you the lowest latency at the desired level of high availability.

Administration

Good command line tools and UI console make the the administration easy.
Since all nodes are equals, number of moving parts that need to be administered is low.

Summary

If you need a globally distributed database with strict serializability, this is definitely a database to look at. It has good pedigree. However remember that distributed databases are not drop in replacement for your traditional RDBMSs. Distributed queries especially joins and distributed transaction can be slow. So some application redesign, some denormalization is always required.

Note: Moved from heavydutysoftware.com


Sunday, November 1, 2020

Building Globally Distributed Applications

A globally distributed application is one where the services and data for the application are partitioned and replicated across multiple regions over the globe. Popular distributed applications that everyone is familiar with are Facebook, Amazon.com, Gmail, Twitter, Instagram. However more and more enterprise applications are finding the need to become distributed because their user base is increasingly distributed around the globe. But not every company has the expertise of a Facebook or Amazon or Google. When going distributed, it is not enough to just spin up instances of your service on AWS or Google cloud on various regions. There are issues related to data that must be addressed for the application to work correctly. While consumer centric social media applications can tolerate some correctness issues or lags in data, the same might not be true for enterprise applications. This blog discusses the data and database issues related to a globally distributed application. Lastly, we discuss 2 research papers that been around since early part of this decade, but whose relevance is increasing in recent times.

Building globally distributed applications that are scalable, highly available and consistent can be challenging. Sharding has to be managed by the application. Keep it highly available requires non database tools. When you have been on a single node database whether it is Mysql or Postgresql etc, it is tempting to scale by manual sharding or one of the clustering solutions available for those databases. It might appear easy at the beginning but the cost of managing the system increases exponentially with scale. Additionally, sharding and replication lead to consistency issues and bugs that need to be addressed. Scaling with single node databases like Mysql beyond a certain point has extremely high operational overhead.

NoSql databases such as Cassandra, Riak, MongoDB etc offer scalability and high availability but at the expense of data consistency. That might be ok for some social media or consumer applications where the dollar value of individual transaction is very small. But not in enterprise applications where the correctness of each transaction is worth several thousands of dollars. In enterprise applications, we need distributed data to behave the same way that we are used to with single node databases.

Let us look at some common correctness issues that crop up with distributed data.

Example 1 : A distributed on line store with servers in San Francisco, New York and Paris.

Each server has 2 tables products and inventory with the following data.
Products:(product)
 widget1
 widget2
Inventory: (product, count):
widget1,6
widget2,1

Customer Jose connects to server in San Francisco and buys widget2 at time t1. At time t2, Customer Pierre connects to a server in Paris and also buys widget2. Assume t2 > t1 but t2-t1 is small.

Expected Behavior : Jose successfully completes transaction and gets the product. Since inventory of widget2 is now zero, Pierre’s transaction is aborted.
Actual Behavior (in an eventually consistent system): Both transactions complete. But only one of the customers gets the product. The other customer is later sent an apologetic email that widget2 is out of stock.

Example 2: A distributed document sharing system with servers in New York, London, Tokyo

Operation1: In London, User X creates a new empty document marked private.
Operation2. User X makes update 1 to document.
Operation3: User X deletes update 1.
Operation4: User X makes update 2.
Operation5: User X changes the document from private to public.
Due to network issues, only operations 1,2, 5 reach Tokyo. 3 and 4 do not.
In Tokyo, User Y tries to read the shared document.

Expected behavior: The document status is private and Y cannot read the document.
Actual behavior: Y is able to read the document but an incorrect version. The document has update1 which is deleted and is missing update2 which needs to be there.

The problems above are known as consistency issues. Different clients are seeing different views of the data. What is the correct view ?

Consistency here refers to C in the CAP theorem, not the C in ACID. Here Consistency means every thread in a concurrent application correctly reads the most recent write at that point in time.

How do you fix the above issues ? In a single node database, Example1 can be fixed by locking the row in the inventory table during update and Example2 is not even an issue because all the data is in one node. But in a distributed application data might be split across shards and shards replicated for high availability. User of the system might connect to any shard/server and read/write data. With NoSql databases, the application has to handle any in consistencies.

In traditional RDBMSs , database developers are given a knob called isolation level to control what concurrent threads can read. In this old blog I explain what isolation levels are. The safest isolation level is the SERIALIZABLE where the database behaves as if the transactions were executing in a serial order with no overlap, even though in reality they are executing concurrently. Most developers use the default isolation level which is generally READ_COMMITTED OR READ_REPEATABLE. In reality, these isolation levels are poorly documented and implemented differently by different vendors. The result is that in highly concurrent applications, there are consistency bugs even in traditional single node RDBMs. In a distributed database with data spread across shards and replicated for read scalability, the problem is compounded further. Most NoSql vendors punt the problem by claiming eventual consistency, meaning if there are no writes for a while, eventually all reads on all nodes will read the last write.

Consistency is often confused with isolation, which describes how the database behave under concurrent execution of the transactions. At the safest isolation level, the database behaves as if the transactions were executing in serial order, even though in reality they are executing concurrently. At the safest consistency level, every thread in a concurrent application correctly reads the most recent write. But most database documentations are not clear on how to achieve this in an application.

The problems in examples 1 and 2 would not occur if those applications/databases had the notion of a global transaction order with respect to real time. In example 1, Pierre’s transaction at t2 should see the inventory as 0 because a transaction at t1 <t2 set it to zero. In example 2, Y should only be able to read upto operation2 . It should not be able to read operation5 without operations 3,4 which occured before 5.

In database literature, the term for this requirement is called “Strict Serializability” or sometimes “external consistency”. Since this technical definitions can be confusing, it is often referred to as strong consistency.

2 research papers that have been around for a while provide answers on how this problems might be fixed. The papers are the Spanner paper and the Calvin paper.

Their approach is solving the problem can summarized as follows:
1. timestamp transactions with something that reflect their occurrence in real time
2. Order transactions based on timestamp
3. Commit transactions in the above order.

But the details of how they do it are significantly different. Let us look at how they do it.

Spanner paper from Google

Spanner is database built at Google and the paper describes the motivation and design of Spanner. Spanners approach involves
1. The use of atomic clocks and GPS to synchronize clocks across hosts in different regions and the true time API to give accurate time across nodes, regions or continent.
2. For a read/write transaction, spanner calls the true time API to get a timestamp. To address overlaps between transactions that are close to each other, the timestamp is assigned after locks are acquired and before they are released. 
3. The commit order equals timestamp order.
4. Read for particular timestamp is sent to any shard/replica that has the data at that timestamp.
5. Read without timestamp (latest read) are serviced by assigning a timestamp.
6. Writes that cross multiple shards use two phase commit.
And of course,
7. It can scale horizontally to 1000s of nodes by sharding.
8. Each shard is replicated.
And most importantly, 
9. Even though, it is a key value store, it provide SQL support to make it easy for application programmers.
CockroachDb and Yugabyte are 2 commercial databases based on spanner.

Calvin Paper


The Calvin paper addresses the above problem using distributed consensus protocols like Raft or Paxos. 
1. Every transaction has to first go through distributed consensus and secure a spot in a linear replication log. 
2. One can view the index in the log as the timestamp. 
3. The committed entries in the replication log are then executed in the exact same serial order by every node in the distributed database. 
4. Since the transaction log is replicated to every shard, it does not need or use two phase commit. In a transaction involving multiple shards, if a shard dies before committing a particular transaction, then on restart it just has to execute the uncommitted transaction from it replication log.
5. No dependency on wall clocks or time API.
6. No two phase commit.
7. No mention of SQL support.

 FaunaDb is an example of a database based on Calvin.

This class of databases that offer horizontal scalability on a global scale without sacrificing consistency is also called NewSql. 

In summary, if you are a building a globally distributed application that needs strong consistency, doing it on your own with SQL or NoSql database can be non trivial. Consistency is hard enough in a single node database. But on a distributed database, consistency bugs are harder to troubleshoot and even harder to fix. You might want to consider one of the NewSql databases to make life easier. Review the Spanner and Calvin papers to understand the architectural choices that are available. This will help you pick a database that is right for you. Spanner and Calvin papers have been around for almost a decade. But they have become more relevant now as real databases based on them become more popular. Most importantly understand what is consistency is and apply it, for lack of which can cause severe correctness bugs in your application. 

References:

The Spanner paper

The Calvin paper

Consistency and Isolation

Saturday, July 26, 2014

Distributed Systems : Consensus Protocols

Modern software systems scale by partitioning the data and distributing data across several machines. Systems are made highly available by replicating data across multiple machines. When multiple systems are involved in managing state, they need to agree when a particular piece data needs to change.

You are familiar with the concept of a transaction in a relational database. A transaction is a unit of work (like a insert or update or some combination of multiple statements) that as a whole can be committed and aborted. What if the work involves updating multiple databases that are on different machines ? To ensure consistent state across the system, all the databases should agree on what to do, whether to commit or abort the state change.

Modern distributed NoSql databases have a similar but slightly different problem. If you had a single server and set a value v=8 in the server. There is no doubt what the value of v is. Any client that connects to the server reads the value as 8.  What if you had a cluster of 3 servers ? Would a client connecting to one the servers see the value as 8 ? Consensus is required to ensure all servers agree on what the value of v is.

Consider systems like Apache Zookeeper or Apache Cassandra. To ensure high availability, clients can connect to any node in the cluster and read  or write data. To ensure consistency in the cluster, some consensus is required among the nodes in the cluster when state changes.

In the rest of this blog we briefly cover some distributed protocols starting with two phase commit, which users of relational databases are very familiar with. We will then talk about Paxos , ZAB and Raft. Paxos became popular because it was used by google for its distributed systems. ZAB is used by Zookeeper which is an important component of the Hadoop echosystem. These protocols are hard to understand and no attempt is made to go into detail. The purpose is to introduce readers to some of the consensus concepts that are important in distributed systems.

1. Two phase commit

Used in databases to ensure all participants in distributed updates either commit or abort the changes.
One node called the co-ordinator originates the transaction.

1.1 Co-ordinator sends a prepare message to all participants.
1.2 Each participant replies with a yes if it can commit its part of the transaction or No otherwise.
1.3 If the co-ordinator receives a yes from all participants, it sends a commit message to the participants. Otherwise it sends an abort message.
1.4 If the participant receives a commit message, it commits its change. If it receives an abort message, it aborts the change. In both cases, it sends an acknowledgement back to the co-ordinator.
1.5 Transaction is complete when the coordinator receives all acknowledgments.

One limitation of this protocol is that if the co-ordinator crashes, the participants do not know whether to commit or abort the transaction, as they do not know how the other participants responded.

2. Three phase commit

The protocol attempts to let the participants make progress even if the co-ordinator fails.

2.1 Co-ordinator sends a prepare message to all participants.
2.2  Each participant replies with a yes if it can commit its part of the transaction or No otherwise.
2.3. If the co-ordinator receives yes from all of participants, it send a pre-commit  message to all participants.
2.4 When the co-ordinator receives an acknowledgment from a majority of participants, it sends a commit message to all participants.

If the co-ordinator fails, the participants can communicate with each other and determine whether to commit or abort.

3. PAXOS

Paxos was first published in that nineties but it became more popular after Google implemented and used it in its distributed infrastructure. The protocol is notorious for being difficult to understand. Below is a very brief description. See references for more details.

There are nodes that propose values called proposers and that accept values called acceptor.

3.1 A proposer with a value to propose submits a proposal (v,n) with value v and sequence number n.

3.2.  When an acceptor receives a proposal (v,.n), it compares it with the highest version proposal accepted for that value. If this proposal is higher version that any accepted proposal, the acceptor replies agree and sends the value of any previously accepted proposal. If the acceptor has already accepted a higher version, it rejects the current proposal.

3.3 If the proposer receives agree from majority of acceptors, it can pick one of the values sent by the acceptors. If they acceptors have not sent any value, it can pick its own value. It then sends a commit message with the chosen value to acceptors. If majority reject or do not respond, abort this proposal and try another one.

3.4  When the acceptor receives a commit message, it agrees to commit if the sequence number is the highest it has agreed to or if the value is the same as the last accepted proposal. Otherwise it rejects the commit.

3.5 If a majority accept the commit, the proposal is complete. Otherwise abort and try again.

Key takeaway is that majorities are used to accept proposal. If there are multiple proposers competing for a value, it is possible that no progress is made in accepting values. The solution is to elect a leader that proposes values. Other players in the system could be learners who learn about accepted values from either the leader or other participants.

4. ZAB (Zookeeper Atomic Broadcast)

ZAB was developed for use in Apache Zookeeper due to limitations in PAXOS. In Zookeeper , the order in which changes are applied in important. In PAXOS, it is possible that updates get applied by acceptors out of order.

ZAB is similar to PAXOS in that a leader proposes values and values are accepted based on majority vote. The key difference is that strict order of updates is maintained. If the leader crashes and a new leader is elected, the updates are applied in the original order.

5. RAFT

RAFT is another distributed consensus protocol that claims to be simpler that PAXOS or ZAB

A node can either be a leader, follower or candidate.

5.1 By default all nodes are followers. When there is no leader, a node can make itself a candidate for leadership and solicit votes.

5.2 The candidate that gets majority votes is elected leader.

5.3 A client submits its updates to the leader. Leader updates a log (uncommitted) and sends the update to followers.

5.4 When leaders hears from a majority of followers that they have made the update, leader commits the change and informs the followers of the commit

5.5 Followers commit the update.

5.6 If a leader terminates for some reason, one of the followers turns itself into a candidate and gets elected as the leader.

We have a given a brief description of some consensus protocols. If you use Hadoop, Cassandra, Kafka or similar distributed systems, you will run into these protocols. For more details, some references are provided below.

References:

1.  Database Management Systems by RamaKrishnan and Gehrke
2. PAXOS made simple
PAXOS by example
4. The secret lives of data
5. Apache Zookeeper
6. Paxos paper trail
7. Raft Consensus

Friday, April 18, 2014

Apache Kafka Introduction : Should I use Kafka as a message broker ?

Asynchronous messaging is an important component of any distributed application. Producers and consumers of messages are de-coupled. Producers send messages to a queue or topic. Consumers consume messages from the queue or topic. The consumers do not have to be running when the message is sent. New consumers can be added on the fly. For Java programmers, JMS was and is the popular API for programming messaging applications. ActiveMQ, RabbitMQ , MQSeries (henceforth referred to as traditional brokers) are some of the popular message brokers that are widely used. While these brokers are very popular, they do have some limitations when it comes to internet scale applications. Generally their throughput will max out at few ten thousands of messages per second. Also, in many cases, the broker is a single point of failure.

A message broker is little bit like a database. It takes a message from a producer, stores it. Later a consumer reads the messages. The concepts involved in scaling a message broker are the same concepts as in scaling databases.  Databases are scaled by partitioning the data storage and we have seen that applied in Hadoop, HBASE, Cassandra and many other popular open source projects. Replication adds redundancy and failure tolerance.

A common use case in internet companies is that log messages from thousands of servers need to sent to other servers that do number crunching and analytics. The rate at which messages are produced and consumed is several thousands per sec, much higher than a typical enterprise application. This needs message brokers that can handle internet scale traffic.

Apache Kafka is a open source message broker that claims to support internet scale traffic. Some key highlights of Kafka are
  • Message broker is a cluster of brokers. So there is partitioning and no single point of failure.
  • Producers send messages to Topics.
  • Messages in a Topic are partitioned among brokers so that you are not limited by machine size.
    • For each topic partition 1 broker is a leader
    • leader handles reads and writes
    • followers replicate
  • For redundancy, partitions can be replicated.
  • A topic is like a log file with new messages appended to the end.
  • Messages are deleted after a configurable period of time. Unlike other messaging systems where message is deleted after it is consumed. Consumer can re-consume messages if necessary.
  • Each consumer maintains the position in the log file where it last read.
  • Point to point messaging is implemented using Consumer groups. Consumer groups is a set of consumers with the same groupid. Within a group, each message is delivered to only one member of the group.
  • Every message is delivered at least once to every consumer group. You can get publish subscribe using multiple consumer groups.
  • Ordering of messages is preserved per partition. Partition is assigned to consumer within a consumer group. If you have same number of partitions and consumers in a group, then each consumer is assigned one partition and will get messages from that partition in order.
  • Message delivery: For a producer , once a message is committed, it will be available as long as at least one replica is available. For the consumer, by default, Kafka provides at least once delivery, which means, in case of a crash, the message could be delivered multiple times. However with each consume, Kafka returns the offset in the logfile. The offset can be stored with the message consumed and in the event of a consumer crash, the consumer that takes over can start reading from the stored offset. For both producer and consumer, acknowledgement from broker is configurable.
  • Kafka uses zookeeper to store metadata.
  • Producer API is easy to use. There 2 consumer APIs.
  • High level API is the simple API to use when you don'nt want to manage read offset within the topic. ConsumerConnector is the consumer class in this API and it stores offsets in zookeeper.
  • What they call the Simple API is the hard to use API to be used when you want low level control of read offsets.
  • Relies on filesystem for storage and caching. Caching is file system page cache.
  • O(1) reads and writes since message and written to end of log and read sequentially. Reads and writes are batched for further efficiency.
  • Developed in Scala programming language
Apache Kafka can be downloaded at http://kafka.apache.org/downloads.html.

They have a good starter tutorial at http://kafka.apache.org/documentation.html#quickstart. So I will not repeat it. I will however write a future tutorial for JAVA producers and consumers.

Apache Kafka is a suitable choice for a messaging engine when
  • You have a very high volume of messages - several billion per day
  • You need high through put
  • You need the broker to be highly available
  • You need cross data center replication
  • You messages are logs from web servers
  • Some loss of messages is tolerable
Some concerns that you need to be aware of are
  • Compared to JMS, the APIs are low level and hard to use
  • APIs are not well documented. Documentation does not have javadocs
  • APIs are changing and the product is evolving
  • Default delivery is at least once delivery. Once and only once delivery requires additional work for the application developer
  • Application developer needs to understand lower level storage details like partitions and consumer read offsets within the partition
It is useful to remember history and draw an analogy with NoSQL databases. 3 or 4 years ago Nosql database were hot and people wanted to use them everywhere. Today we know that traditional RDBMS are not going anywhere and the NoSQL databases are suitable for some specialized use cases. In fact NoSQL database are going in the direction of additing features that are available in RDBMSs. Kafka today is where NoSql databases were a few years ago. Don'nt throw away your traditional message broker yet. While Kafka will be great for the cases mentioned above, lot of the simpler messaging use cases can be done lot more easily  with a traditional message broker.


Related Blogs :

Apache Kafka JAVA tutorial #1
Apache Kafka JAVA tutorial #2 
Apache Kafka JAVA tutorial #3 
Apache Kafka 0.8.2 New Producer API  

Friday, March 28, 2014

10 Tips for building low latency applications

In this previous blog on low latency I described 5 tips for building low latency applications. Read that for the first 5 tips. Here are 5 more tips.

6. Co-locate services

Networks hops add latency. A network call to another server on a different subnet or datacenter can add a few milli-seconds to your response and affect SLA. Install dependent services on the same server or same rack and definitely the same data center.

7. Geographically distribute customer facing services

This might sound contradictory to item 6. But it is not. A round trip over the internet from New York to San Francisco takes 80-90 milli seconds. If your servers are in San Francisco, a user in New York will see some latency even without the server doing any work. Users in New york should be served from servers near New York so their time is not wasted on the round trip. To ensure the rule 6 is not violated, this might mean replicating the dependencies such as the database so that the server in New York is able to serve from a database that is close to it.

As your user base grows, you many need to distribute the services to several locations - east coast US, west coast US , Europe , Asia Pacific and so on.

8. Reduce serialization / de-serialization

Network calls, cross process , cross JVM calls all involve serialization and de-serialization of data which is expensive. Care should be taken to reduce and limit serialization/de-serialization to only required data and to delay to only when required. If you store your data as large blogs, then when you need a small piece of data, you end up serializing de-serializing the entire blog. A few years ago, when XML bandwagon was in full swing, there were many products using XML for RPC. They soon realized that while XML good for reading text, it adds way too much overhead for serialization/de-serialization.

9. Tolerate weak consistency with application design

A system with strong consistency ( think ACID and RDBMS) requires locking data during updates. This mean other writes and readers may need to wait at times. Waiting implies increased latency.

Weak consistency means a reader may not always read the latest updated data. In reality many systems can tolerate this. Weak consistency systems generally do not involve locking. They allow more concurrent readers and writers. They are be easily partitioned and distributed. For these reasons, they have lower latency for reads and writes.

10. Measure and tune

Most systems of any complexity have multiple components. In todays agile development model, developers are continuously deploying new releases of their sub components. If latency suddenly goes up, how do you know what caused it ?

It is important to continuously measure and monitor not only the end to end latency but also the latency contributed by the sub components. Know the averages and what deviations are tolerable. Set up alerts when ever there are deviations from mean. If a new component is released and suddenly the latency goes up, you know the likely culprit. As you user base grows , if you see gradual increases in latency, perhaps you need additional capacity. If users in a particular geographical location are complaining, then perhaps you need to replicated and deploy your service to that location.

In summary, ensuring low latency is a continuous and iterative process that is to be done through out the life of a system

Friday, October 25, 2013

Apache Cassandra Data Model

This is an introduction to the Apache Cassandra data model. For the benefit of those not familiar with Cassandra,  it is an open source, highly scalable, highly available NoSQL database. Some key architectural highlights of  Cassandra are :

No Single point of failure.
No Master - All servers in cluster are equal.
Peer to peer communication between nodes to exchange data and configuration.
Data is partitioned across nodes based on consistent hash.
Data is automatically replicated.
(and recently added) SQL like data access model.

Cassandra has moved to a simple model that is described by a SQL like language called CQL. Lower level constructs like column family are no longer mentioned. Note that earlier column family models were without much of a schema. You needed to define column family upfront. But the column name in each family could be added as needed. The new CQL model is more schema oriented.

1.0 Tables, row keys and columns

Data is stored in Tables which has rows and columns.

Table is partitioned by the primary key, which is the row key.

For columns , CQL supports various data type like int , varchar, text, float, Set , List and many more

The CQL create statement below creates the users table with userid as the primary key.

create Table Users (
    userid varchar,
    name varchar,
    email varchar,
    address varchar,
    PRIMARY KEY(userid)
) ;

You insert rows into this table using the insert statement. Rows of table are partitioned across nodes based on the primary key.

insert into Users(userid,name,email) values('user1', 'user1 name', 'user1@gmail.com') ;

2.0 No Joins but wide columns

Let us say you want groups of users. In a RDBMS , you might have a table with columns, groupid and userid with userid being a foreign key into Users table. In a distributed database like Cassandra joins are expensive. Hence the data needs to be de-normalized. You can create a table GroupsOfUsers with groupid as the primary key. As de-normalization, in addition to having userid column, repeat some useful columns like user name and user email that you might need when looking at members of the group.

create Table GroupsOfUsers (
    groupid varchar,
    groupname varchar,
    userid varchar,
     user_name varchar,
     user_email varchar
     PRIMARY KEY(groupid,userid)
)

When you have a compound primary key, the first column, in this case group id is used as the partition key. The other columns, in this case userid is used to cluster the remaining columns by userid. Additionally, the columns in the row are sorted based of the other columns of the primary key, namely userid.

If you do ,

select * from GroupsOfUsers where groupid = "group1" ;

The result might be

group1     user1  name1 email1
group1     user2   name2 email2
group1     user3   name3  email3

Think of the above as logical rows.

Under the hood , the columns might be stored physically as 1 row with one or more columns for each user.

key        column1       column2            column3          column4          column5        column6
group1  user1:name1  user1:email1     user2 :name2   user2:email1   user3:name3  user3:email3

Each row can have as many as 2 billion columns if necessary. This is very useful in other use cases such as creating indexes or storing collections.

3.0 Collection column types

If each user had a number of  friends, in RDBMS, this would be modeled by joining with a Friends table. In Cassandra you can do that by adding a column type of Collection. The collections supported are List, Map and Set.

Alter Table Users add friends Set ;

insert into Users set friends = friends + {'friend6'} where  userid = 'user1' ;

4.0 Indexes using wide columns

Index is a data structure that enables fast look up based on a Key. In Cassandra the table is partitioned across nodes based on the primary key. By default, each node in Cassandra maintains an index for the primary keys that it host.

You can create additional indexes on other columns. For such indexed columns, Cassandra under the hood creates another table whose primary key is the indexed column.

For example , if frequently had to do a query such as

select groupid from GroupOfUsers where userid = 'user1' ;

It could be worthwhile to create an index on userid column to speed up the query.

create index userid_index on GroupOfUsers(userid) ;

Logically this would be like creating a table

create table userid_idx (
     userid varchar,
     groupid varchar,
     primary key(userid,groupid)
)

The partition key will be userid and the columns in the row will be the groups to which the user belong. This makes use of the wide column feature of Cassandra mentioned above.

5.0 Note on consistency

The original Dynamo paper on which Cassandra is based on, talks about being eventually consistent. Eventual consistency scares people even though we see it in life all the time. For example, the ATM may let you take more cash than you have in your account. When the bank reconciles ATM withdrawals with your account and realizes that you have overdrawn, it takes appropriate action.

Cassandra extends eventually consistency by offering a model of tunable consistency.

A write consistency of ANY means that it is enough for write to be written to any one node. This gives low consistency but high availability. A write consistency on ONE, TWO, THREE or QUORUM implies that writes need to be written to that many replicas. Higher the writes , more the consistency and less availability.

A read consistency of ONE, TWO, THREE, QUORUM indicates the number of replicas to be consulted before returning the most recent data from the replicas.

Note that unlike what is described in the Dynamo paper, when there is a conflict between data in replicas, Cassandra returns the most recent data and not vector clocks with different versions that clients need to resolve.

In summary, with CQL Cassandra provides a simple data model that makes it easier to model and develop applications. With CQL, Cassandra can be looked at as a viable alternative to a relational database when scalability and high availability are important. For additional details, the Cassandra documentation is at http://www.datastax.com/documentation/cassandra/2.0/webhelp/index.html





          

Friday, September 20, 2013

Cassandra Vs HBase : Which NoSql store do I need ?

There many NoSql databases out there and it can be confusing to determine which one is suitable for a particular use case. In this blog, we discuss the two more popular ones, Cassandra and HBase. If you are new to NoSql , you may review these earlier posts:

What is NoSql ?
HBase
HBase architecture

1. History

To understand the goals and motivation behind any product, it is a good idea to trace its origin.

HBase is based on Google's Bigtable as described in the paper "Bigtable: A Distributed Storage System for Structured Data". You only need to read the first line of the abstract to understand what BigTable attempts to do. It is a distributed storage system for managing structured data that can scale to very large size of the order of petabytes using thousands of commodity servers.

Cassandra derives its motivation from Amazon's Dynamo as described in the paper "Dynamo: Amazon's highly available key value store". Reading the first page of this paper it is clear that the primary goals were reliability at scale and high availability for amazon's online store.

While both papers talk about scale, reliability and availability, the primary problem BigTable (and HBase) is addressing is  random access to data of the scale of  hundreds of terrabytes or petabytes  and the primary problem that Dynamo ( and Cassandra) addresses  is high availability.

2. Stack

HBase is based on Hadoop  and depends on HDFS, namenodes, datanodes, zookeeper etc. If you are already familiar and invested in Hadoop, this is not a big issue. Cassandra has no such dependency. If you are not invested in Hadoop, then Cassandra has less moving parts and could be easier to go with.

3. Data Model

HBase data model has table, rows, column families and columns. Table has rows and column families. Each row has a key. A column family has columns. When a table is created, the column families have to be defined. But the columns in a column family do not have to be defined and can be added ad hoc.

Cassandra data model used to be described in a similar manner. More recently Cassandra has adopted CQL which is like SQL, as the client interface to data. The data model consists of  tables, rows and columns. You have to define Tables and their columns. Additional columns can be added ad hoc. The data is partitioned based on row key.

Neither supports joins on tables. This is all about denormalization.

HBase does not support secondary indexes. Cassandra supports indexes on columns by denormalization and creating additional tables under the hood.

4. Consistency

HBase supports strong consistency for reads and writes at a row level. Out of the box Cassandra is eventually consistent, though it can be configured for strong consistency. Eventual consistency means that a read may not see the latest update by a write. When that happens , the application is expected to deal with it in an appropriate manner.

In Cassandra, client might connect and write to any node. The write is also replicated to multiple nodes. At any time, not all nodes might have that latest value replicated. Multiple writes that conflict can happen at the same. These might need to be reconciled.

5. Range queries on row keys  

Partioning of rows in HBase is ordered. This implies one can do range queries by row key in HBase. In Cassandra the default partitioner does not keep ordering and you cannot do range queries. There is an ordered partitioner but the recommendation is to never use it in production because it can cause the cluster to become unbalanced. This is a pretty important difference. If your use cases require range queries as would be the case in data warehousing type applications, then you will need to go with HBase.

5. High Availability and Partition tolerance

Cassandra is built ground up for availability and partition tolerance while sacrificing on consistency. All nodes are equal. Data is replicated across nodes. Client can connect to any node and read or write any key.

Early versions of  HBase had very obvious single point of failures in the HDFS name node and the HBASE master. Early versions required external HA solutions (Eg. Linux HA project)  in an Active - Passive configuration. For example you would have a stand by Name node , shared storage and monitoring of the active name node. When monitoring detects that the active node is down, the HA solution would fail over to the stand by node. Recently Hadoop 2 has added native support for hot failover (active active) for the namenode. HBASE has added hot failover support for HBASE master as well. But clearly these came after the fact.

If high availability is a primary use case, then Cassandra clearly wins.

6. Storage

HBase stores data on HDFS. Cassandra stores data locally on each node.

The primary storage data structure for both is based on Google's Bigtable and is the Log Structured merge tree. Your typical RDBMS uses the B+ tree as the primary data structure for creating and storing indexes. In a B+ tree, each node is a disk block and to find a key or to write a key, the code has to read and write  a fews blocks from disk. The log structured merge tree has lesser disk access than a B+tree. LSM has 2 tree like structures one is memory and one on disk. Initially reads and writes happen to the tree in memory. When the in memory tree reaches a certain size, it is written to disk, where it is merged with the larger tree.

In both HBase and Cassandra, users have complained that the merge process mentioned above ( also known as compaction) slows things down.

7. Summary

HBase and Cassandra are suitable for different use cases.

If you have data warehousing type of use cases and large amounts the data that will continue to grow, HBase is a more suitable choice. If you are thinking data from large application log files, several hundred terrabytes or petabytes of time series data or monitoring type data that needs to be aggregated or mined, then HBase might be appropriate. Recently I spoke to a cloud based application monitoring company that was storing large amounts of monitoring data generated every minute in MySql and were complaining about not being able to scale. They need HBase.

Cassandra on the other hand is more suitable if you needs to serve traffic from a highly available data store. If you are already on an RDBMS but are have availability or scale issues, then Cassandra might be something to look at. With CQL, which is like SQL, Cassandra is positioning itself as an alternative to your typical RDBMS. A company that I know uses a key value store that is fast, but does not scale nor does it have built support for high availability. Very often, their shards go down and they lose data. They should consider Cassandra.

Related Blogs:

Cassandra data model
Choosing Cassandra 
Cassandra Compaction

Tuesday, June 12, 2012

5 Tips for building low latency web applications

Low latency applications are those that need to service requests in a few milliseconds or microseconds. Examples of a low latency application are
  • Servers that serve ads to be shown on web pages. If you don'nt serve the ad as per SLA dictated by publishers, you will not be given the opportunity to serve the ad. Typically the server has a few milliseconds to respond.
  • Servers that participate in real time bidding. Again, taking internet advertising as an example, if you are bidding for the opportunity to show an ad, you have just a few milli-seconds to make a bid.
  • Applications that provide real time quotes such as a travel portal. The portal makes requests to its partner agents. For the portal to be usable, the quotes need to displayed before the user loses interest and moves on to another travel site. Typically the portal app gives each partner a few milliseconds to respond. Otherwise the response is ignored.
Here are 5 simple guidelines to remember when building low latency applications.

1. Read from Memory: Keep the data required to serve the request in memory. Keep as much as possible in local memory and then use external caches like memcached, Ehcache and the more recent NoSql key value stores. Having to read data from a database or filesystem is slow and should be avoided.

2. Write asynchronously: The thread that services the request should never be involved in writing to any external storage such database or disk. The main thread should hand off the write task to worker threads that do the writing asynchronously.

3. Avoid locks and contention: Design the application in way that multiple threads are not contending for the same data and requiring locks. This is generally not an issue if the application mostly reads data. But multiple threads trying to write requires accquiring locks that slows down the application. You should consider a design where write operations are delegated to a single thread. You might need to relax the requirement of ACID properties on data. In many cases, your users will be able to tolerate data that becomes eventually consistent.

4. Architect with loosely coupled components: When tuning for low latency, you might need to co locate certain components. Co location reduces hops and hence latency. In other cases, you might need to geographically distribute components, so that they are closer to the end user. An application built with loosely coupled components can accommodate such physical changes without requiring a rewrite of the application.

5. Horizontal scalability: Architect the application so that you can scale horizontally as the load on the application goes up. Even if you read all data from a cache, as the number of requests go up and size of data increases, because of things like garbage collection(in the case of JAVA) the latency will go up. To handle more requests without increasing latency, you will need to add more servers. If you are using a key-value store, you might need to shard the data across multiple servers to keep sub millisecond response time.

Most importantly, building and maintaining low latency is an iterative process that involves designing correctly, building, measuring performance and then tuning by looking at the design and code for improvements.