Showing posts with label architecture. Show all posts
Showing posts with label architecture. Show all posts

Saturday, March 1, 2025

Multi Version Concurrency Control (MVCC) in databases

Introduction

Multi version concurrency control (MVCC) is a popular optimistic technique used in modern databases for concurrency control.

MVCC does not use locking. In that regard it is an optimistic technique but distinct from what is know as the optimistic concurrency control (OCC).

MVCC is timestamp based.

MVCC does its concurrency control by keeping multiple copies or versions of each data item. Every transaction sees data only of a specific version , also known as snapshot. Changes made by a transaction will not be seen by others, until the changes are committed.

Concepts

Versioning

Versioning tuples is at the core of how MVCC does its concurrency control.

Consider the table 

id, balance

A, 500

B, 1000

Now logically assume that under the hood, the database adds 2 hidden columns - start time (st) and end time (et)

id, balance, st. , et

A, 500 , 1 , -

B, 1000, 2, -

Let us assume that increasing numbers 1,2 ..... represent time and a - implies infinity or no end time. The version number can be a timestamp or something analogous to a timestamp.

In the above example, we are saying that the first tuple (A, 500) is valid from time 1 to infinity. The second tuple (B, 1000) is valid from 2 to infinity.

With versioning, when a tuple is updated, the original one is unchanged. Instead a new row is added.

Let us say at time 3, A is updated to 550. Logically the table would look like

A, 500 , 1 , 3

A, 550 , 3 , -

B, 1000, 2, -

The latest committed version has an end time of infinity. When a new update for a tuple is committed, the end timestamp of the previous latest is changed to the start timestamp of the new latest.

A transaction reads the latest committed version at the time it starts. A transaction that starts at time 2 and reads A will read 500. But a transaction that starts at 4 and reads A will read 550.

Snapshot isolation

Every transaction only reads committed values at the time it starts. It is a snapshot of the database at that time. This referred to as Snapshot isolation.

However it will read any changes that it makes within the scope of its transaction.

Use cases

It is easier to understand MVCC with some examples,

Lost update problem

Let us start with the same table again.

id, balance, st. , et

A, 500 , 1 , -

At time 2, transaction T1 reads A. The latest version with timestamp (1, -) has value 500. It reads 500.

At time 3, transaction T2 reads the same row. The latest version is still (1, -) . It too reads 500.

At time 3, transaction T2 updates the value to 550 and commits. We now have an addition version

A, 500 , 1 , 3
A, 550, 3 , - 

At time 5, T1 updates the value to 600. 
At time 6, T1 tries to commit. 

T1s commit is disallowed and it is forced to abort. 
The reason is that T1's snapshot is of time 1. After T1 started, another transaction came along , performed a WRITE on the same record and committed. If we allowed T1 to also commit, it would overwrite T2's update. This is the lost update problem.

The rule we can deduce here is : The write set of the committing transaction T1 should not intersect with the write set of any transactions that committed after T1 started and before T1 commits.

Write Skew

Let us start this time with the table having 2 tuples.

id, balance, st. , et

A, 500 , 1 , -

B, 1000, 2, -


At time 1, transaction T1 reads A. The latest version with timestamp (1, -) has value 500. It reads 500.
At time 3 , transaction T1 writes A to 600. But it is still uncommitted.

A, 500 , 1 , -, 

A, 600 , 3 , - , uncommitted

B, 1000, 2, -

At time 4, transaction T2 reads A. The latest committed record is still at timestamp 1. It will read 500 ( not 600 ).

At time 5, T1 commits. The write set to T1 does not intersect with the write set of any other transaction. So it allowed to commit. So we have

A, 500 , 1 , 5, 

A, 600 , 5 , - , 

B, 1000, 2, -, 

At time 6, T2 updates B to 1200

B, 1200, 6, -, uncommitted

At time 7, T2 tries to commit. It is disallowed and T2 is forced to abort. The reason is that T2 read A which was committed after T2 started. Even though T2 does not update A. It might be using A (that is read earlier) to update say B and that update could be incorrect.This is the write skew problem.

We can thus deduce rule 2: The read set of a committing transaction T should not intersect with the write set of any transaction that has committed after T started.

Phantom reads

Let us look at a case where a new record is insert into the table.

id, balance, st. , et

A, 500 , 1 , -

B, 1000, 2, -

At time 3 , transaction T1 runs the query "select sum(balance) from table where balance >=500 ". It gets a result 1500.

At time 4, transaction T2 inserts a new row, (C, 600) and commits

id, balance, st. , et

A, 500 , 1 , -

B, 1000, 2, -

C, 600, 4 , -

At time 5, transaction T1  inserts the 1500 value it read into another table.

At time 6, T1 tries to commit. The transaction is not allowed to proceed and aborts.

The reason is that the 1500 value is no longer accurate. At time 4 , another transaction committed a new row and the value should be 2100. 

Rule 3 : If the committing transaction T has predicate queries  (where clause ) that depend to number of rows and affected by inserts/deletes, then just before committing the queries need to be run again. The commit is allowed only if the results are the same as before.

Obviously running the queries a second time can be expensive. But that is implementation detail and databases do various optimization.

Garbage collection

Maintaining versions for each and every record can take up a lot of disk space. Databases have background process or other method to remove old versions that have no active transaction depending on them.

Algorithm

Every tuple is versioned using timestamp or something analogous to it.

Every transaction only reads rows that are committed as of the start time of the transaction. This holds for the entire duration of the transaction. It cannot read any commits that happened after it starts. In other words it sees a "Snapshot" of the data, as of its start time. This is Snapshot isolation.

When a transaction tries to commit, we check if any other transaction has committed a new version for the rows that we read or write. If there are any such transactions, then we must abort.

Even if our read/write set does not intersect with any other transaction, if our read is impacted by inserts or deletes, we much abort.

To make this happen, the database needs to maintain start time, end time for every tuple. Not just the latest version, but older versions as well. 

For every tuple that is read or writen, the database needs to know which transactions are active and which committed.

Older versions need to cleaned up when there are no active transactions depending on them.

For a transaction to be allowed to commit, there rules we discussed apply:

#1 : Write set of a committing transaction T should not intersect with Write set of any committed transaction that committed after T started.

#2: The Read set of a committing transaction T should not intersect with the write set of any transaction that has committed after T started.

#3: When the committing transaction T has predicate queries (WHERE clause, depend on number of rows meeting condition ), then before committing, the queries need to be rerun to ensure that they return the same result.

Some additional rules:

#4 Write set of a committing transaction T should not intersect with Read set of any transaction that committed after T started.

#5 If a transaction T updates a row, T should use its own updated version, not the one in older snapshot it started with.

Commercial Databases supporting MVCC

Almost all the modern databases we know off support MVCC.

PostgreSql
Mysql
Oracle
CockroachDB
SqlServer
.... and more

Advantages of MVCC

  • Readers do not block readers. Writers do not block readers.
  • Each transaction works under a consistent snapshot of data.
  • High concurrency is possible.
  • Databases can allow querying of old versions, which are known as time travel queries.

Disadvantages of MVCC

  • Increased storage requirement because we are storing older version.
  • Need to do garbage collection.
  • When conflicts are detect and transactions aborted, applications need to retry.

Conclusion

MVCC is a popular concurrency control technique used in many currently popular databases like Postgresql.

It does not use two phase locking. Unlike OCC, it does not use a staging area. The staging area is built in with versioning.

Like OCC, it works best for workloads where there are few conflicts.

In conclusion, Multi-Version Concurrency Control (MVCC) offers a robust approach to handling concurrent transactions in modern databases by maintaining multiple versions of data

While it provides significant benefits such as high read throughput, reduced contention, and snapshot isolation, it also introduces complexity in garbage collection and storage management.

Modern distributed databases combine versioning with clock time to provide strict serializability even in  distributed databases without the use of locking. But that is a topic for another blog.

References

Prof. Jen Dittrich Youtube videos
Andy Pavlov CMU Database Youtube video
Database Internals by Alex Petrov
Database Management Systems by Ramakrishnan & Gehrke

Saturday, July 20, 2024

Replication in modern data systems

Overview

Replication means making a copy of the data for future use in the case of failures or may be to scale.

Why is it a big deal ? We copy files for backup all the time. For static files, that do not change, making a copy is 1 copy command. But if the data is being updated by users all the time. How often do you run the command to copy. How do you keep the copy in sync with the source ?

That is the problem of replication in databases and data systems. All databases have replication built in that you can setup with a command or two. So why read or discuss it? If you are building a distributed systems that involves data, you will need to replicate data. The concepts from databases will be useful. 

While replication is most well known for its use with databases, it is also a critical part of distributed systems where the data is unstructured such as distributed file systems (HDFS) or messaging systems (Apache Kafka) 

This post covers replication in traditional single node systems as well as modern distributed systems.



Why do we need replication ?

There are several reasons why replication is needed. It is more than just taking a backup.

Redundancy

Make a copy of the data. When the main server becomes unavailable for any reason, switch to the copy. This is ensure that the data is always available.

Scalability

Your data becomes really popular and the database gets a lot of read requests and cannot keep up. So you make copies of the database and have a load balancer distribute the request across to the copies (replicas).

Geo distribution of data

Bring the data close of user. You have users in Americas, Europe and Asia. Data from americas is replicated to Europe and Asia, so users there can read data locally without making a round trip to the americas for every read.

Secondary use cases

These are lesser known and unconventional use cases. They might be done higher up in the stack at the application layer or middleware than in the database. 

Mirroring

Mirroring involves replicating the requests to the application to a copy of the entire application stack. You can think of this as application level replication.



For example, for a REST service, this involves sending the http request, not just to the production service but also to a mirror service.

The mirror service reads and writes from the mirror database. Mirror database is a previous replica that was in sync with the leader. Just before starting mirroring, it is discontinued as a replica so it does not get duplicates.

Mirroring can be used for testing large complex changes against production traffic.

Data in the mirror database is then compared with data in the production database for accuracy.

Testing

A regular database replica is used as a test database. Various kinds of tests - feature tests, performance tests, concurrency tests, scalability tests can be run with services running with the replica. This is a different use case from mirroring.

Migration

This can be used to eliminate or reduce downtimes needed for migration.

Create additional replicas.

Run migration on them.

Rollover the application services to the new database replicas.

Replication strategies

Single leader

This is the most common pattern. It shown in Figure 1.

One server is designated as the leader. The others are followers. All writes go to the leader. The leader replicates the writes to the followers.

The advantages are :

Setting up is fairly easy.

Reads become scalable. You can put a load balancer in front and distribute read requests to followers.

High availability: If the leader fails, you fail over to one of the followers and let it become the leader.

The disadvantages are :

All the writes go to one server , the leader. So this can become a bottleneck. Writes are not scaled. 

If you read from a replica that is behind on replication, you might read stale data.

Multi leader

Writes can go to more than one server.

Multi leader replication is needed when

(1) Writes and replication needs to happen across geographically distributed areas.

(2) Connectivity to single leader is not guaranteed. The is usually the case with mobile devices or laptops or when people want the ability to work offline and/or multiple devices.

In the geo distributed case, the writes go to a local local leader. The local leader not only replicates to local replicas but also to the distributed leader (who replicate to their replicas).

In the mobile case, the writes are store locally and the replicated periodically when connectivity is available.

Advantages:

Writes are also scaled.

Writes can done locally or close to clients. Better latency for writes.

Disadvantages:

Since writes happen at multiple leaders. There can be conflict. The conflicts need to be resolved.

Leaderless

In the leaderless model, all nodes are equal and no node is designated leader. Writes can go to any node and that node replicates the write to other nodes. This is the model made popular by AWS Dynamo and later adopted by Cassandra.


Consensus based replication

All the above methods have either write conflict or read consistency issues. Raft and Paxos are two well protocols for replicating log entries. Data to be replicated is modeled as a list of entries in log. The short story is that one server sends one entry or a sequence of entries to others and it is considered committed if a majority of servers acknowledge having received them. Raft has leader election but Paxos is leaderless. Raft protocol describes in detail leader election, replication, server crashes, recovery and consistency checks. The paper is a good read for anyone interested in distributed systems.

Replication Implementations

The first three techniques apply to databases which deal with structured data and are a little more complicated.

Statement based replication

In this approach, the SQL statements such as INSERT/UPDATE/DELETE etc are forwarded as they are from the leaders to the followers. While this can work in most cases, it does not work in certain cases such as timestamps or when you generate an id or a random number.

It is not efficient either. If you insert a record and then delete it, why replicate both commands ?

Write ahead log (WAL) replication

Databases first append every write to the WAL before doing anything else, before writing it to structured storage from where it will be read. WAL is used for recovery. If the database crashed, it state is reconstructed from the WAL. A recent slogan has been "The WAL is the database". Replication here involves replicating the WAL.

A disadvantage is that WAL entries contain where specific storage details like which byte in which block is to be updated. This can create compatibility issues if the leader and followers are on different versions.

Logical replication

A logical log on the other hand captures at a row, how the table was changed. You can view this as an approach somewhere between statement based and WAL replication.

Change data capture is a form of logical replication. It is used to replicate changes in a database to other third party systems. A popular use case is data warehousing where data from multiple sources is aggregated and summarized for analytics. 

Unstructured data replication

For unstructured data as in distributed file systems the unit for replication is a block of data. Data is first partitioned into blocks and each block is replicated independently.

Potential issues with replication

Replication Lag

Most of the time replication is asynchronous. Client writes to the leader and returns before any acknowledgement that it has been replicated. Synchronous replication is not viable due both performance and availability issues. A single failure can hold up all replications.

Lost write

However, one problem this creates is that if you read immediately after a write, the replica you are reading from may not yet have your last write.

Inconsistent read

If you read multiple times in quick succession ( same read) , each read may get a different result depending on which replica services the read ( as the replicas may be in different stages of replication)

Cassandra addressed this issue using quorum. CockroachDb uses a consensus protocol like Raft.

Write Conflicts

Write conflict is an issue in multi leader replication. This happens when multiple clients update the same data while talking to a different master. The database does not know which update to accept and how they should be merged. This is similar to a merge conflict in git.

An approach to handle conflicts is to store both versions on write. But on read, send both versions to the client and let the the client resolve the conflict

Replication is real world systems

The product documentation for database on replication can be quite confusing. It best to follow a tutorial or blog in the internet.

Postgres

The documentation and blogs describe it in 2 ways.

You can set it up as synchronous, asynchronous , streaming , log file based etc

And it can be WAL based or logical replication. Statement based is rarely seen.

In snapshot replication, a snapshot of the database is taken and replicated to followers.

Instead of streaming, you can also setup the replication as file based, where the WAL files are periodically shipped to followers.

In WAL replication, replication slots lets the leader track how much of the WAL is replicated to each replica. This helps the leader not discard segments not yet replicated. But this consumes resources on the leader. Replication slots need to be managed and deleted when not needed.

Mysql

The traditional way in mysql was a logical replication based on their binlog file - a binary format for logical changes.

The newer way is based on global transaction identifier (GTID) which is built on top of the binlog. It can be either statement based or row based.

Dynamo / Cassandra

In this architecture, replication is fundamental to the architecture. All you need to do is to set the replication factor to greater than 1. All servers are equal - no leader and no follower. Writes can go to any server. Partitioning is also fundamental to the architecture. The server that receives the write redirects the write to appropriate server. From here it is replicated to other servers based on the replication factor.

Consistency issues are addressed using quorum based tunable consistency. Quorum mean a majority which is (RF/2+1) agree on something. If you have replication factor (RF) 3, quorum is 2. So on a write, at least 2 nodes need to acknowledge that the write was saved. On read, at least 2 nodes need to agree on the return value. In general, to avoid inconsistencies, you want Read quorum (R)+ Write quorum (W) > RF .

CockroachDb

CockroachDB uses the Raft distributed consensus protocol to ensure that a majority of replicas are in consensus before any change is committed. This is the safest approach to ensure consistency but comes at a cost.

Apache Kafka

In Kafka, messages are sent and received from topics. Topics are split into partition. Each partition has one leader and a configurable number of replicas. Writes go to the leader which replicates to the replicas. Reads can go to the replicas. Each broker is a leader for some partitions but a follower for other partitions. Like Cassandra and CockroachDb, replication is core to the architecture and easy to setup.

Apache Hadoop (HDFS)

This applies to any distributed file system. The file is a sequence of blocks of data. HDFS has a name node and data nodes. Name node maintains a map of which data nodes have the blocks of a file. Each block is replicated to a configurable number of data nodes.

Conclusion

Replication is a critical piece of any distributed data system. It has to be part of the core architecture. It cannot come after the fact like it did in the past. While redundancy and HA are well known benefits, there are other benefits such geo distribution of data as well. It can cause some effects such as read consistency. Care should be taken to address those. Different products use different strategies. You should be familiar with the replication strategies, configuration and side effects for your data product. If you are building a new system with data, understanding how existing systems replicate and the issues they face, can help you design your replication.

Sunday, June 30, 2024

CockroachDb Review: Should I use CockroachDb ?

 Overview

CockroachDb is a modern distributed database that promises linear scalability with strict serializability.

Server side sharding is automatic. Nodes can be added easily as needed. Claims to provide the SERIALIZABLE isolation level.

Most distributed databases such as Cassandra, MongoDb, HBase etc sacrifice consistency to achieve high availability. CockroachDb distinguishes itself by claiming to be distributed and the same time offer strong consistency that even single node databases do not offer.

This falls into a database category called NewSql or DistributedSQL as opposed to NoSql (Cassandra, MongoDb)

When to choose CockroachDb ?

You should choose CockroachDb if

    Your data is of a global scale.

    As data size increases, you need to scale horizontally to several nodes.

    You need data to be distributed and localized in specific geographical regions. For
    example EU data resides in Europe while US data resides in US.

    You need strong consistency. Serializable isolation level.

    You need to keep the SQL / relational data model.

    You need distributed transactions.

You may want to pass on it if

    You data size can easily fit on a node for the foreseeable future.
    You organization is more comfortable with a stable proven database. (CockroachDb is
    still maturing).
    You data model is heavily normalized and you do a lot of joins in your queries. While this
    database can support joins, it is still not recommended in a highly distributed
    environment.

Architecture

Architecture is based on Google's Spanner paper.

It is a key value store with a SQL interface on top of it.

Database is a cluster of nodes. All nodes are equal. Nodes may join and leave the cluster at any time.

Sorted map of key values pairs. Fully ordered monolithic key space. All tables/indexes go into the same key space by encoding tablename/indexname/key together.

Sharding

Key value pairs are broken up into contiguous ranges. 

When range size reaches 512 Mib (2 power 20) It is split into 2 ranges.

Each range is assigned to a node and replicated. 

If you have 1 node all the shards are in that node. To scale, you add more nodes and the shards get distributed across nodes. A minimum of 3 nodes is recommended. 

Very easily spin up node(s) and add to cluster anywhere. 

Btree like index structure used to locate shard that has a key.

Replication

Data in each range is replicated using the Raft consensus algorithm.

A minimum replication factor of 3 is needed.

This provides the high availability. Data is available as long as a majority of the nodes in the cluster are available.

Geo-partitioning

By adding a country or region to the primary key, you can limit storage to keys to a particular region. So European data can be make to reside in Europe, US data in US and so. This has 2 benefits
There is a performance benefit and data is local to its users.

It can satisfy legal requirements where data is not allowed to leave a country or region.

Read/Write

Reads

Any node can receive a request to read a key/value.

Request is forwarded to the node that is the raft leader for that table/range.

Leader returns the data to the node that requested it. Since leader returns the data, no consensus is required.

Node returns it to the client.

Writes

Any node can receive a request to write a key/value.

Request is forwarded to the node that is the raft leader for that table/range.

Leader writes the value to its log and initiates consensus with replicas for the range. When majority acknowledges, the key/value is considered committed and leader notifies the requesting node which notifies the client

Transactions

Supports transactions that spans multiple tables and rows.

Transactions can be distributed, that is span multiple nodes.

The supported isolation level is strict serializability which is the highest isolation level. Strict serializability means that not only are transactions ordered, but they are ordered as per wall clock time.
Transaction protocol is an improvement over two phase commit. In parallel, participants acquire locks and create write intents. The transaction is marked staged. When the client commits, if all locks are acquired and writes are replicated, the coordinator immediately returns success to client. In background the transaction is marked committed. This is one round trip between transaction coordinator and each participant - unlike two phase commit - which requires two round trips.

Hybrid logical clocks are used to timestamp each transaction. Timestamp is the version for MVCC.

Data Model

Clients see only the SQL row column relation model
Wire protocol is same as Postgresql wire protocol.

Performance

Efficient range scan.
Geo partitioning improves performance by locality.
Distributed SQL execution.
Distributed transactions will be slow.
Generally you do not want distributed transactions over large distances. If you build a 3 node CockroachDb cluster with 1 node in NewYork, 1 in London and 1 in San Francisco, the write latencies are going to be very high due to the round trips for RAFT and distributed transactions. The cluster topology needs to be designed appropriately to give you the lowest latency at the desired level of high availability.

Administration

Good command line tools and UI console make the the administration easy.
Since all nodes are equals, number of moving parts that need to be administered is low.

Summary

If you need a globally distributed database with strict serializability, this is definitely a database to look at. It has good pedigree. However remember that distributed databases are not drop in replacement for your traditional RDBMSs. Distributed queries especially joins and distributed transaction can be slow. So some application redesign, some denormalization is always required.

Note: Moved from heavydutysoftware.com


Sunday, November 1, 2020

Building Globally Distributed Applications

A globally distributed application is one where the services and data for the application are partitioned and replicated across multiple regions over the globe. Popular distributed applications that everyone is familiar with are Facebook, Amazon.com, Gmail, Twitter, Instagram. However more and more enterprise applications are finding the need to become distributed because their user base is increasingly distributed around the globe. But not every company has the expertise of a Facebook or Amazon or Google. When going distributed, it is not enough to just spin up instances of your service on AWS or Google cloud on various regions. There are issues related to data that must be addressed for the application to work correctly. While consumer centric social media applications can tolerate some correctness issues or lags in data, the same might not be true for enterprise applications. This blog discusses the data and database issues related to a globally distributed application. Lastly, we discuss 2 research papers that been around since early part of this decade, but whose relevance is increasing in recent times.

Building globally distributed applications that are scalable, highly available and consistent can be challenging. Sharding has to be managed by the application. Keep it highly available requires non database tools. When you have been on a single node database whether it is Mysql or Postgresql etc, it is tempting to scale by manual sharding or one of the clustering solutions available for those databases. It might appear easy at the beginning but the cost of managing the system increases exponentially with scale. Additionally, sharding and replication lead to consistency issues and bugs that need to be addressed. Scaling with single node databases like Mysql beyond a certain point has extremely high operational overhead.

NoSql databases such as Cassandra, Riak, MongoDB etc offer scalability and high availability but at the expense of data consistency. That might be ok for some social media or consumer applications where the dollar value of individual transaction is very small. But not in enterprise applications where the correctness of each transaction is worth several thousands of dollars. In enterprise applications, we need distributed data to behave the same way that we are used to with single node databases.

Let us look at some common correctness issues that crop up with distributed data.

Example 1 : A distributed on line store with servers in San Francisco, New York and Paris.

Each server has 2 tables products and inventory with the following data.
Products:(product)
 widget1
 widget2
Inventory: (product, count):
widget1,6
widget2,1

Customer Jose connects to server in San Francisco and buys widget2 at time t1. At time t2, Customer Pierre connects to a server in Paris and also buys widget2. Assume t2 > t1 but t2-t1 is small.

Expected Behavior : Jose successfully completes transaction and gets the product. Since inventory of widget2 is now zero, Pierre’s transaction is aborted.
Actual Behavior (in an eventually consistent system): Both transactions complete. But only one of the customers gets the product. The other customer is later sent an apologetic email that widget2 is out of stock.

Example 2: A distributed document sharing system with servers in New York, London, Tokyo

Operation1: In London, User X creates a new empty document marked private.
Operation2. User X makes update 1 to document.
Operation3: User X deletes update 1.
Operation4: User X makes update 2.
Operation5: User X changes the document from private to public.
Due to network issues, only operations 1,2, 5 reach Tokyo. 3 and 4 do not.
In Tokyo, User Y tries to read the shared document.

Expected behavior: The document status is private and Y cannot read the document.
Actual behavior: Y is able to read the document but an incorrect version. The document has update1 which is deleted and is missing update2 which needs to be there.

The problems above are known as consistency issues. Different clients are seeing different views of the data. What is the correct view ?

Consistency here refers to C in the CAP theorem, not the C in ACID. Here Consistency means every thread in a concurrent application correctly reads the most recent write at that point in time.

How do you fix the above issues ? In a single node database, Example1 can be fixed by locking the row in the inventory table during update and Example2 is not even an issue because all the data is in one node. But in a distributed application data might be split across shards and shards replicated for high availability. User of the system might connect to any shard/server and read/write data. With NoSql databases, the application has to handle any in consistencies.

In traditional RDBMSs , database developers are given a knob called isolation level to control what concurrent threads can read. In this old blog I explain what isolation levels are. The safest isolation level is the SERIALIZABLE where the database behaves as if the transactions were executing in a serial order with no overlap, even though in reality they are executing concurrently. Most developers use the default isolation level which is generally READ_COMMITTED OR READ_REPEATABLE. In reality, these isolation levels are poorly documented and implemented differently by different vendors. The result is that in highly concurrent applications, there are consistency bugs even in traditional single node RDBMs. In a distributed database with data spread across shards and replicated for read scalability, the problem is compounded further. Most NoSql vendors punt the problem by claiming eventual consistency, meaning if there are no writes for a while, eventually all reads on all nodes will read the last write.

Consistency is often confused with isolation, which describes how the database behave under concurrent execution of the transactions. At the safest isolation level, the database behaves as if the transactions were executing in serial order, even though in reality they are executing concurrently. At the safest consistency level, every thread in a concurrent application correctly reads the most recent write. But most database documentations are not clear on how to achieve this in an application.

The problems in examples 1 and 2 would not occur if those applications/databases had the notion of a global transaction order with respect to real time. In example 1, Pierre’s transaction at t2 should see the inventory as 0 because a transaction at t1 <t2 set it to zero. In example 2, Y should only be able to read upto operation2 . It should not be able to read operation5 without operations 3,4 which occured before 5.

In database literature, the term for this requirement is called “Strict Serializability” or sometimes “external consistency”. Since this technical definitions can be confusing, it is often referred to as strong consistency.

2 research papers that have been around for a while provide answers on how this problems might be fixed. The papers are the Spanner paper and the Calvin paper.

Their approach is solving the problem can summarized as follows:
1. timestamp transactions with something that reflect their occurrence in real time
2. Order transactions based on timestamp
3. Commit transactions in the above order.

But the details of how they do it are significantly different. Let us look at how they do it.

Spanner paper from Google

Spanner is database built at Google and the paper describes the motivation and design of Spanner. Spanners approach involves
1. The use of atomic clocks and GPS to synchronize clocks across hosts in different regions and the true time API to give accurate time across nodes, regions or continent.
2. For a read/write transaction, spanner calls the true time API to get a timestamp. To address overlaps between transactions that are close to each other, the timestamp is assigned after locks are acquired and before they are released. 
3. The commit order equals timestamp order.
4. Read for particular timestamp is sent to any shard/replica that has the data at that timestamp.
5. Read without timestamp (latest read) are serviced by assigning a timestamp.
6. Writes that cross multiple shards use two phase commit.
And of course,
7. It can scale horizontally to 1000s of nodes by sharding.
8. Each shard is replicated.
And most importantly, 
9. Even though, it is a key value store, it provide SQL support to make it easy for application programmers.
CockroachDb and Yugabyte are 2 commercial databases based on spanner.

Calvin Paper


The Calvin paper addresses the above problem using distributed consensus protocols like Raft or Paxos. 
1. Every transaction has to first go through distributed consensus and secure a spot in a linear replication log. 
2. One can view the index in the log as the timestamp. 
3. The committed entries in the replication log are then executed in the exact same serial order by every node in the distributed database. 
4. Since the transaction log is replicated to every shard, it does not need or use two phase commit. In a transaction involving multiple shards, if a shard dies before committing a particular transaction, then on restart it just has to execute the uncommitted transaction from it replication log.
5. No dependency on wall clocks or time API.
6. No two phase commit.
7. No mention of SQL support.

 FaunaDb is an example of a database based on Calvin.

This class of databases that offer horizontal scalability on a global scale without sacrificing consistency is also called NewSql. 

In summary, if you are a building a globally distributed application that needs strong consistency, doing it on your own with SQL or NoSql database can be non trivial. Consistency is hard enough in a single node database. But on a distributed database, consistency bugs are harder to troubleshoot and even harder to fix. You might want to consider one of the NewSql databases to make life easier. Review the Spanner and Calvin papers to understand the architectural choices that are available. This will help you pick a database that is right for you. Spanner and Calvin papers have been around for almost a decade. But they have become more relevant now as real databases based on them become more popular. Most importantly understand what is consistency is and apply it, for lack of which can cause severe correctness bugs in your application. 

References:

The Spanner paper

The Calvin paper

Consistency and Isolation

Saturday, July 26, 2014

Distributed Systems : Consensus Protocols

Modern software systems scale by partitioning the data and distributing data across several machines. Systems are made highly available by replicating data across multiple machines. When multiple systems are involved in managing state, they need to agree when a particular piece data needs to change.

You are familiar with the concept of a transaction in a relational database. A transaction is a unit of work (like a insert or update or some combination of multiple statements) that as a whole can be committed and aborted. What if the work involves updating multiple databases that are on different machines ? To ensure consistent state across the system, all the databases should agree on what to do, whether to commit or abort the state change.

Modern distributed NoSql databases have a similar but slightly different problem. If you had a single server and set a value v=8 in the server. There is no doubt what the value of v is. Any client that connects to the server reads the value as 8.  What if you had a cluster of 3 servers ? Would a client connecting to one the servers see the value as 8 ? Consensus is required to ensure all servers agree on what the value of v is.

Consider systems like Apache Zookeeper or Apache Cassandra. To ensure high availability, clients can connect to any node in the cluster and read  or write data. To ensure consistency in the cluster, some consensus is required among the nodes in the cluster when state changes.

In the rest of this blog we briefly cover some distributed protocols starting with two phase commit, which users of relational databases are very familiar with. We will then talk about Paxos , ZAB and Raft. Paxos became popular because it was used by google for its distributed systems. ZAB is used by Zookeeper which is an important component of the Hadoop echosystem. These protocols are hard to understand and no attempt is made to go into detail. The purpose is to introduce readers to some of the consensus concepts that are important in distributed systems.

1. Two phase commit

Used in databases to ensure all participants in distributed updates either commit or abort the changes.
One node called the co-ordinator originates the transaction.

1.1 Co-ordinator sends a prepare message to all participants.
1.2 Each participant replies with a yes if it can commit its part of the transaction or No otherwise.
1.3 If the co-ordinator receives a yes from all participants, it sends a commit message to the participants. Otherwise it sends an abort message.
1.4 If the participant receives a commit message, it commits its change. If it receives an abort message, it aborts the change. In both cases, it sends an acknowledgement back to the co-ordinator.
1.5 Transaction is complete when the coordinator receives all acknowledgments.

One limitation of this protocol is that if the co-ordinator crashes, the participants do not know whether to commit or abort the transaction, as they do not know how the other participants responded.

2. Three phase commit

The protocol attempts to let the participants make progress even if the co-ordinator fails.

2.1 Co-ordinator sends a prepare message to all participants.
2.2  Each participant replies with a yes if it can commit its part of the transaction or No otherwise.
2.3. If the co-ordinator receives yes from all of participants, it send a pre-commit  message to all participants.
2.4 When the co-ordinator receives an acknowledgment from a majority of participants, it sends a commit message to all participants.

If the co-ordinator fails, the participants can communicate with each other and determine whether to commit or abort.

3. PAXOS

Paxos was first published in that nineties but it became more popular after Google implemented and used it in its distributed infrastructure. The protocol is notorious for being difficult to understand. Below is a very brief description. See references for more details.

There are nodes that propose values called proposers and that accept values called acceptor.

3.1 A proposer with a value to propose submits a proposal (v,n) with value v and sequence number n.

3.2.  When an acceptor receives a proposal (v,.n), it compares it with the highest version proposal accepted for that value. If this proposal is higher version that any accepted proposal, the acceptor replies agree and sends the value of any previously accepted proposal. If the acceptor has already accepted a higher version, it rejects the current proposal.

3.3 If the proposer receives agree from majority of acceptors, it can pick one of the values sent by the acceptors. If they acceptors have not sent any value, it can pick its own value. It then sends a commit message with the chosen value to acceptors. If majority reject or do not respond, abort this proposal and try another one.

3.4  When the acceptor receives a commit message, it agrees to commit if the sequence number is the highest it has agreed to or if the value is the same as the last accepted proposal. Otherwise it rejects the commit.

3.5 If a majority accept the commit, the proposal is complete. Otherwise abort and try again.

Key takeaway is that majorities are used to accept proposal. If there are multiple proposers competing for a value, it is possible that no progress is made in accepting values. The solution is to elect a leader that proposes values. Other players in the system could be learners who learn about accepted values from either the leader or other participants.

4. ZAB (Zookeeper Atomic Broadcast)

ZAB was developed for use in Apache Zookeeper due to limitations in PAXOS. In Zookeeper , the order in which changes are applied in important. In PAXOS, it is possible that updates get applied by acceptors out of order.

ZAB is similar to PAXOS in that a leader proposes values and values are accepted based on majority vote. The key difference is that strict order of updates is maintained. If the leader crashes and a new leader is elected, the updates are applied in the original order.

5. RAFT

RAFT is another distributed consensus protocol that claims to be simpler that PAXOS or ZAB

A node can either be a leader, follower or candidate.

5.1 By default all nodes are followers. When there is no leader, a node can make itself a candidate for leadership and solicit votes.

5.2 The candidate that gets majority votes is elected leader.

5.3 A client submits its updates to the leader. Leader updates a log (uncommitted) and sends the update to followers.

5.4 When leaders hears from a majority of followers that they have made the update, leader commits the change and informs the followers of the commit

5.5 Followers commit the update.

5.6 If a leader terminates for some reason, one of the followers turns itself into a candidate and gets elected as the leader.

We have a given a brief description of some consensus protocols. If you use Hadoop, Cassandra, Kafka or similar distributed systems, you will run into these protocols. For more details, some references are provided below.

References:

1.  Database Management Systems by RamaKrishnan and Gehrke
2. PAXOS made simple
PAXOS by example
4. The secret lives of data
5. Apache Zookeeper
6. Paxos paper trail
7. Raft Consensus

Friday, April 18, 2014

Apache Kafka Introduction : Should I use Kafka as a message broker ?

Asynchronous messaging is an important component of any distributed application. Producers and consumers of messages are de-coupled. Producers send messages to a queue or topic. Consumers consume messages from the queue or topic. The consumers do not have to be running when the message is sent. New consumers can be added on the fly. For Java programmers, JMS was and is the popular API for programming messaging applications. ActiveMQ, RabbitMQ , MQSeries (henceforth referred to as traditional brokers) are some of the popular message brokers that are widely used. While these brokers are very popular, they do have some limitations when it comes to internet scale applications. Generally their throughput will max out at few ten thousands of messages per second. Also, in many cases, the broker is a single point of failure.

A message broker is little bit like a database. It takes a message from a producer, stores it. Later a consumer reads the messages. The concepts involved in scaling a message broker are the same concepts as in scaling databases.  Databases are scaled by partitioning the data storage and we have seen that applied in Hadoop, HBASE, Cassandra and many other popular open source projects. Replication adds redundancy and failure tolerance.

A common use case in internet companies is that log messages from thousands of servers need to sent to other servers that do number crunching and analytics. The rate at which messages are produced and consumed is several thousands per sec, much higher than a typical enterprise application. This needs message brokers that can handle internet scale traffic.

Apache Kafka is a open source message broker that claims to support internet scale traffic. Some key highlights of Kafka are
  • Message broker is a cluster of brokers. So there is partitioning and no single point of failure.
  • Producers send messages to Topics.
  • Messages in a Topic are partitioned among brokers so that you are not limited by machine size.
    • For each topic partition 1 broker is a leader
    • leader handles reads and writes
    • followers replicate
  • For redundancy, partitions can be replicated.
  • A topic is like a log file with new messages appended to the end.
  • Messages are deleted after a configurable period of time. Unlike other messaging systems where message is deleted after it is consumed. Consumer can re-consume messages if necessary.
  • Each consumer maintains the position in the log file where it last read.
  • Point to point messaging is implemented using Consumer groups. Consumer groups is a set of consumers with the same groupid. Within a group, each message is delivered to only one member of the group.
  • Every message is delivered at least once to every consumer group. You can get publish subscribe using multiple consumer groups.
  • Ordering of messages is preserved per partition. Partition is assigned to consumer within a consumer group. If you have same number of partitions and consumers in a group, then each consumer is assigned one partition and will get messages from that partition in order.
  • Message delivery: For a producer , once a message is committed, it will be available as long as at least one replica is available. For the consumer, by default, Kafka provides at least once delivery, which means, in case of a crash, the message could be delivered multiple times. However with each consume, Kafka returns the offset in the logfile. The offset can be stored with the message consumed and in the event of a consumer crash, the consumer that takes over can start reading from the stored offset. For both producer and consumer, acknowledgement from broker is configurable.
  • Kafka uses zookeeper to store metadata.
  • Producer API is easy to use. There 2 consumer APIs.
  • High level API is the simple API to use when you don'nt want to manage read offset within the topic. ConsumerConnector is the consumer class in this API and it stores offsets in zookeeper.
  • What they call the Simple API is the hard to use API to be used when you want low level control of read offsets.
  • Relies on filesystem for storage and caching. Caching is file system page cache.
  • O(1) reads and writes since message and written to end of log and read sequentially. Reads and writes are batched for further efficiency.
  • Developed in Scala programming language
Apache Kafka can be downloaded at http://kafka.apache.org/downloads.html.

They have a good starter tutorial at http://kafka.apache.org/documentation.html#quickstart. So I will not repeat it. I will however write a future tutorial for JAVA producers and consumers.

Apache Kafka is a suitable choice for a messaging engine when
  • You have a very high volume of messages - several billion per day
  • You need high through put
  • You need the broker to be highly available
  • You need cross data center replication
  • You messages are logs from web servers
  • Some loss of messages is tolerable
Some concerns that you need to be aware of are
  • Compared to JMS, the APIs are low level and hard to use
  • APIs are not well documented. Documentation does not have javadocs
  • APIs are changing and the product is evolving
  • Default delivery is at least once delivery. Once and only once delivery requires additional work for the application developer
  • Application developer needs to understand lower level storage details like partitions and consumer read offsets within the partition
It is useful to remember history and draw an analogy with NoSQL databases. 3 or 4 years ago Nosql database were hot and people wanted to use them everywhere. Today we know that traditional RDBMS are not going anywhere and the NoSQL databases are suitable for some specialized use cases. In fact NoSQL database are going in the direction of additing features that are available in RDBMSs. Kafka today is where NoSql databases were a few years ago. Don'nt throw away your traditional message broker yet. While Kafka will be great for the cases mentioned above, lot of the simpler messaging use cases can be done lot more easily  with a traditional message broker.


Related Blogs :

Apache Kafka JAVA tutorial #1
Apache Kafka JAVA tutorial #2 
Apache Kafka JAVA tutorial #3 
Apache Kafka 0.8.2 New Producer API  

Friday, March 28, 2014

10 Tips for building low latency applications

In this previous blog on low latency I described 5 tips for building low latency applications. Read that for the first 5 tips. Here are 5 more tips.

6. Co-locate services

Networks hops add latency. A network call to another server on a different subnet or datacenter can add a few milli-seconds to your response and affect SLA. Install dependent services on the same server or same rack and definitely the same data center.

7. Geographically distribute customer facing services

This might sound contradictory to item 6. But it is not. A round trip over the internet from New York to San Francisco takes 80-90 milli seconds. If your servers are in San Francisco, a user in New York will see some latency even without the server doing any work. Users in New york should be served from servers near New York so their time is not wasted on the round trip. To ensure the rule 6 is not violated, this might mean replicating the dependencies such as the database so that the server in New York is able to serve from a database that is close to it.

As your user base grows, you many need to distribute the services to several locations - east coast US, west coast US , Europe , Asia Pacific and so on.

8. Reduce serialization / de-serialization

Network calls, cross process , cross JVM calls all involve serialization and de-serialization of data which is expensive. Care should be taken to reduce and limit serialization/de-serialization to only required data and to delay to only when required. If you store your data as large blogs, then when you need a small piece of data, you end up serializing de-serializing the entire blog. A few years ago, when XML bandwagon was in full swing, there were many products using XML for RPC. They soon realized that while XML good for reading text, it adds way too much overhead for serialization/de-serialization.

9. Tolerate weak consistency with application design

A system with strong consistency ( think ACID and RDBMS) requires locking data during updates. This mean other writes and readers may need to wait at times. Waiting implies increased latency.

Weak consistency means a reader may not always read the latest updated data. In reality many systems can tolerate this. Weak consistency systems generally do not involve locking. They allow more concurrent readers and writers. They are be easily partitioned and distributed. For these reasons, they have lower latency for reads and writes.

10. Measure and tune

Most systems of any complexity have multiple components. In todays agile development model, developers are continuously deploying new releases of their sub components. If latency suddenly goes up, how do you know what caused it ?

It is important to continuously measure and monitor not only the end to end latency but also the latency contributed by the sub components. Know the averages and what deviations are tolerable. Set up alerts when ever there are deviations from mean. If a new component is released and suddenly the latency goes up, you know the likely culprit. As you user base grows , if you see gradual increases in latency, perhaps you need additional capacity. If users in a particular geographical location are complaining, then perhaps you need to replicated and deploy your service to that location.

In summary, ensuring low latency is a continuous and iterative process that is to be done through out the life of a system

Friday, October 25, 2013

Apache Cassandra Data Model

This is an introduction to the Apache Cassandra data model. For the benefit of those not familiar with Cassandra,  it is an open source, highly scalable, highly available NoSQL database. Some key architectural highlights of  Cassandra are :

No Single point of failure.
No Master - All servers in cluster are equal.
Peer to peer communication between nodes to exchange data and configuration.
Data is partitioned across nodes based on consistent hash.
Data is automatically replicated.
(and recently added) SQL like data access model.

Cassandra has moved to a simple model that is described by a SQL like language called CQL. Lower level constructs like column family are no longer mentioned. Note that earlier column family models were without much of a schema. You needed to define column family upfront. But the column name in each family could be added as needed. The new CQL model is more schema oriented.

1.0 Tables, row keys and columns

Data is stored in Tables which has rows and columns.

Table is partitioned by the primary key, which is the row key.

For columns , CQL supports various data type like int , varchar, text, float, Set , List and many more

The CQL create statement below creates the users table with userid as the primary key.

create Table Users (
    userid varchar,
    name varchar,
    email varchar,
    address varchar,
    PRIMARY KEY(userid)
) ;

You insert rows into this table using the insert statement. Rows of table are partitioned across nodes based on the primary key.

insert into Users(userid,name,email) values('user1', 'user1 name', 'user1@gmail.com') ;

2.0 No Joins but wide columns

Let us say you want groups of users. In a RDBMS , you might have a table with columns, groupid and userid with userid being a foreign key into Users table. In a distributed database like Cassandra joins are expensive. Hence the data needs to be de-normalized. You can create a table GroupsOfUsers with groupid as the primary key. As de-normalization, in addition to having userid column, repeat some useful columns like user name and user email that you might need when looking at members of the group.

create Table GroupsOfUsers (
    groupid varchar,
    groupname varchar,
    userid varchar,
     user_name varchar,
     user_email varchar
     PRIMARY KEY(groupid,userid)
)

When you have a compound primary key, the first column, in this case group id is used as the partition key. The other columns, in this case userid is used to cluster the remaining columns by userid. Additionally, the columns in the row are sorted based of the other columns of the primary key, namely userid.

If you do ,

select * from GroupsOfUsers where groupid = "group1" ;

The result might be

group1     user1  name1 email1
group1     user2   name2 email2
group1     user3   name3  email3

Think of the above as logical rows.

Under the hood , the columns might be stored physically as 1 row with one or more columns for each user.

key        column1       column2            column3          column4          column5        column6
group1  user1:name1  user1:email1     user2 :name2   user2:email1   user3:name3  user3:email3

Each row can have as many as 2 billion columns if necessary. This is very useful in other use cases such as creating indexes or storing collections.

3.0 Collection column types

If each user had a number of  friends, in RDBMS, this would be modeled by joining with a Friends table. In Cassandra you can do that by adding a column type of Collection. The collections supported are List, Map and Set.

Alter Table Users add friends Set ;

insert into Users set friends = friends + {'friend6'} where  userid = 'user1' ;

4.0 Indexes using wide columns

Index is a data structure that enables fast look up based on a Key. In Cassandra the table is partitioned across nodes based on the primary key. By default, each node in Cassandra maintains an index for the primary keys that it host.

You can create additional indexes on other columns. For such indexed columns, Cassandra under the hood creates another table whose primary key is the indexed column.

For example , if frequently had to do a query such as

select groupid from GroupOfUsers where userid = 'user1' ;

It could be worthwhile to create an index on userid column to speed up the query.

create index userid_index on GroupOfUsers(userid) ;

Logically this would be like creating a table

create table userid_idx (
     userid varchar,
     groupid varchar,
     primary key(userid,groupid)
)

The partition key will be userid and the columns in the row will be the groups to which the user belong. This makes use of the wide column feature of Cassandra mentioned above.

5.0 Note on consistency

The original Dynamo paper on which Cassandra is based on, talks about being eventually consistent. Eventual consistency scares people even though we see it in life all the time. For example, the ATM may let you take more cash than you have in your account. When the bank reconciles ATM withdrawals with your account and realizes that you have overdrawn, it takes appropriate action.

Cassandra extends eventually consistency by offering a model of tunable consistency.

A write consistency of ANY means that it is enough for write to be written to any one node. This gives low consistency but high availability. A write consistency on ONE, TWO, THREE or QUORUM implies that writes need to be written to that many replicas. Higher the writes , more the consistency and less availability.

A read consistency of ONE, TWO, THREE, QUORUM indicates the number of replicas to be consulted before returning the most recent data from the replicas.

Note that unlike what is described in the Dynamo paper, when there is a conflict between data in replicas, Cassandra returns the most recent data and not vector clocks with different versions that clients need to resolve.

In summary, with CQL Cassandra provides a simple data model that makes it easier to model and develop applications. With CQL, Cassandra can be looked at as a viable alternative to a relational database when scalability and high availability are important. For additional details, the Cassandra documentation is at http://www.datastax.com/documentation/cassandra/2.0/webhelp/index.html