Sunday, August 24, 2025

JDK 21 Virtual threads: The end of regular threads ? Not quite.

 A question I get asked all the time: If JDK 21 supports virtual threads, do I ever need to use regular threads ?

Java 21 brought us virtual threads, a game-changer for writing highly concurrent applications. Their lightweight nature and massive scalability are incredibly appealing. It's natural to wonder: do we even need regular platform (OS) threads anymore?

While virtual threads are fantastic for many I/O-bound workloads, there are still scenarios where platform threads remain relevant. Here's why:

1. CPU-Bound Tasks:

Virtual threads yield the carrier thread when they perform blocking I/O operations. However, for purely CPU-bound tasks, they don't offer a significant advantage over platform threads in terms of raw processing power. In fact, the context switching involved might introduce a tiny bit of overhead.

Consider a computationally intensive task like calculating factorials:

Virtual threads example:


// A CPU-intensive task
Runnable cpuBoundTask = () -> {
    long result = 1;
    for (int i = 1; i <= 10000; i++) {
        result *= i;
    }
    System.out.println("Virtual thread task finished.");
};

// Start a virtual thread for the task
Thread.startVirtualThread(cpuBoundTask);


Platform threads example:

Runnable cpuBoundTask = () -> {
    long result = 1;
    for (int i = 1; i <= 10000; i++) {
        result *= i;
    }
    System.out.println("Platform thread task finished.");
};

// Start a regular platform thread
new Thread(cpuBoundTask).start();

For sustained CPU-bound work, managing a smaller pool of platform threads might still be a more efficient approach to leverage the underlying hardware.

2. Integration with Native Code and External Libraries:

Some native libraries or older Java APIs might have specific requirements or behaviors when used with threads. Virtual threads, being a newer abstraction, might not be fully compatible or optimally performant with all such integrations. Platform threads, being closer to the operating system's threading model, often provide better compatibility in these scenarios.

3. Thread-Local Variables with Care:

While virtual threads support thread-local variables, their potentially large number can lead to increased memory consumption if thread-locals are heavily used and store significant data. With platform threads, you typically have a smaller, more controlled number of threads, making it easier to reason about thread-local usage. However, it's crucial to manage thread-locals carefully in both models to avoid memory leaks.

4. Profiling and Debugging:

The tooling around thread analysis and debugging is more mature for platform threads. While support for virtual threads is rapidly improving, there might be cases where existing profiling tools offer more in-depth insights for platform threads.

5. Backward compatibility

If you want you library or server to be available to users who are on JDKs earlier than JDK21, then you have no choice but to use regular threads. Virtual threads are not just a new library; they are a fundamental change to the Java Virtual Machine's threading model (part of Project Loom). The underlying code that manages and schedules virtual threads on top of carrier threads is not present in older JVMs. This can be one of the most important reasons for using platform threads.

In Conclusion:

Virtual threads are a powerful addition to the Java concurrency landscape and will undoubtedly become the default choice for many concurrent applications, especially those with high I/O. However, platform threads still have their place, particularly for CPU-bound tasks, legacy integrations, and situations requiring fine-grained control over thread management.

Understanding the nuances of both models will allow you to make informed decisions and build more efficient and robust Java applications.

Sunday, May 4, 2025

Understanding Isolation levels vs Consistency levels

In databases, the terms isolation level and consistency level/model are sometimes used interchangeably. "Read repeatable" and "Serializable" are well known isolation levels. But "Strict Serializable" and "Linearizable" are consistency terms. 

If you have used Mysql or Postgresql, you know probably know what an isolation levels like "Read repeatable" or "Serializable" means. But when you work on a distributed database you hear about consistency level much more.

The first time I heard about consistency level was when I worked with Apache Cassandra which claimed to only support "eventual consistency".  A few years ago when my company was evaluating distributed databases, we had a few architects that insisted that we needed a database that support "strict serializability". CockroachDB was a database that supported this consistency level.

If you are confused, read long. I wrote this blog in attempt to clear up my confusion.

So far, the best explanations on this topic that I found are by Daniel J Abadi [2] [3]. Kyle Kingsbury @Jepson [1] has good descriptions of the topic as well.

But first, a clarification on what consistency means.

What is consistency ?

Consistency is an overloaded term and its meaning has changed in recent times.

ACID consistency

The database must preserve its internal correctness rules after every transaction.

Consider a banking database with a constraint account_balance > 0. 

If the starting account_balance is 50 and a transaction tried to deduct 100, that is a violation of that constraint and should fail.

This is the C is ACID. Databases support constraints to ensure this. But it is mainly the responsibility of the application programmer. It is well understood and rarely discussed these days.

Distributed systems consistency

The system must ensure that all nodes (or clients) agree on the same view of data.

Make the distributed system feel like a single threaded single node system. Read of a value any where in the system produces the same result [2]. The result returned is the most recently written value no matter where it was written.

Consider a system with multiple nodes. X was 1. The value X=2 is written to one node and replicated to others. If clients read from the replicas. Do they all see X=2 immediately ? With strict serializable consistency level , the answer is yes. With weaker models, it is possible they read a an older value. 

Most of us first heard of this description from the CAP theorem.

Why the difference ?

Both describe behavior under concurrency. 

Isolation levels describe problems that occurs in single node databases when transactions execute concurrently. At the highest isolation level transactions execute in some order. Each transaction executes as if it were alone.

In distributed systems there is network latency, replication and partitioning,  all contributing latency and timing issues to concurrency issues. Consistency approaches concurrency issues taking time and latency into account as well. At the highest consistency level, transactions execute in order of their order of completion (commit) in real time. 

Serializable is the strictest isolation level. Strict serializability is the strictest consistency model. In a single node system, there is very little difference between the two because the time issues are small.

Isolation Levels vs. Consistency Models

To summarize the key differences.

Isolation Levels

  • Prevent read, writes of uncommitted data.
  • Prevent anomalies like read uncommitted, non repeatable reads, phantom reads
  • Focus on managing concurrent access to data while balancing performance and correctness.
  • Common isolation levels (from weakest to strongest):
    • Read Uncommitted
    • Read Committed
    • Repeatable Read
    • Serializable — the strictest standard defined by the ANSI SQL standard.
  • Old blog

Consistency Levels

  • Typically relevant distributed databases.
  • Time is a factor
  • They describe the guarantees about visibility and ordering of updates in a distributed, replicated data system.
  • They focus on the behavior perceived by clients across multiple nodes or replicas.
  • Examples include:
    • Strict serializability
    • Linearizability 
    • causal consistency

Example to Illustrate the Difference:

Scenario:

  • Two accounts A and B initially have a balance of 100 each.
  • Two concurrent transactions:
    • Tx1: Transfer 50 from A to B.
    • Tx2: Reads balances of A and B and sums them

Isolation level Serializable:

  • Tx1 and Tx2 are serialized, and the sum read by Tx2 is 200.
  • (Tx1, Tx2) and (Tx2, Tx1) are valid orders irrespective of when each actually committed first.

Consistency level Strict Serializable

  • If Tx1 commits before Tx2 starts, Tx2 must see all effects of Tx1. The only valid order is (Tx1, Tx2)
  • However if there is some overlap like if Tx1 commits after Tx2 starts, then both orders (Tx1, Tx2) and (Tx2, Tx1) can be valid. Reason is that Tx2 cannot read the data committed by Tx1

A few descriptions


Let us briefly touch on some levels you will encounter often. For more detailed descriptions, I will refer you to https://jepsen.io/consistency [1]

Serializability

Transactions occur in some total order. Even though they may actually execute concurrently, it appears as if they execute one after another. While serializable will prevent non repeatable reads and phantom reads, It will allow "time travel" anomalies as shown in the example above. It can appear that Tx2 happened before Tx1, even though in reality it was the other way around.

Strict Serializability

Transactions occur in a strict order that is consistent with the real time (clock time) order in which transactions occur. It applies to the entire system encompassing multiple objects. A is before B in the order if A commits before B begins. So the only valid order is (A,B). However if A commits after B begin, then both orders (A, B) and (B, A) are valid. 

Linearizable

Transactions occur in a strict order that is consistent with the real time (clock time) order in which transactions occur. But this applies to a single object not to the entire dataset. Definition of a single object varies. Could be a key or a table. [1]

Most of the time concurrency issues are important when multiple threads touch the same data and that why this model is also as important as strict serializability.

Causal Consistency

Transaction that are causally related are seen by all nodes in the same order, while concurrent (unrelated) operations may be seen in different orders. In a social media application, a user making a post and another user liking the post are causally related. The like must be seen only after the post is seen. However, it is ok for a unrelated post that happened after the previous post to be seen before that.

Conclusion

It is all about how systems behave under concurrency. 

Isolation levels deal with how transactions behave when they run at the same time, while consistency models talk about how different nodes in a distributed system agree on data. And "consistency" itself has changed over time, from enforcing business rules in ACID databases to ensuring replicas don't drift apart in distributed ones. 

Database vendors advertise the consistency level they support as a key feature. That is why it is important we understand what it means and ensure that we pick the right database the fits our needs.

References 

1. https://jepsen.io/consistency

2. Introduction to consistency levels , Daniel J Abadi

Tuesday, April 1, 2025

A non trivial concurrency example in Go language

Overview

In this blog I describe a non trivial concurrency example in the Go programming language. The code is part of my Dynago project https://github.com/mdkhanga/dynago.

This is not a tutorial on Go or on writing concurrent programs. But if you know a little bit of Go and/or little bit of concurrent programing, then I am hoping this can be a useful example. 

In Dynago, I have so far built a leaderless cluster of peer servers. The command for starting a server needs to only point to one other server. The only exception is the first server, which has nothing to point to.  The servers exchange ping and gossip like messages to share the details on the members of the cluster. After a few messages every server has the cluster membership list. When servers join or leave the cluster, the membership list is updated.

Code

If you would like to skip reading the text and jump to the code, the relevant files are :

https://github.com/mdkhanga/dynago/blob/main/cluster/peer.go
https://github.com/mdkhanga/dynago/blob/main/cluster/ClusterService.go

The relevant tests are 
https://github.com/mdkhanga/dynago/blob/main/e2e-tests/test_membership.py
https://github.com/mdkhanga/dynago/blob/main/e2e-tests/test_membership_chain.py

I show some screenshots of code snippets in this blog for the casual reader. But if you are interested in the code, I recommend directly looking at the code in github.

Go concurrency recap

In Go, you write code that executes concurrently by writing Goroutines.

A goroutine is like a lightweight thread. It is written as a regular function.

You run a goroutine using the go keyword.

Go provides channels for communicating between goroutines. Channels are a safer way to send and receive data. This is safer than using shared memory as is done in Java, C++. You do not have to use mutexes to avoid memory visibility and race conditions.

The select statement lets the goroutine wait on receiving data on a channels.

Though not recommended, the shared memory approach of  data exchange is also supported. You will need to use primitives from the https://pkg.go.dev/sync package to synchronize access to data.

The Examples

There were two features in Dynago where concurrency is relevant.

1. Each server receives a message on a GRPC stream. It has to process the message and sometimes send a response back. This is the classic producer - consumer problem. Some goroutines produce. Other goroutines consume and do work. I use channels for sharing the messages between producers and consumers.

2. A map stores the list of cluster members. 

  • The map is to be updated as servers join or leave the cluster.
  • Periodically we need to iterate over the map and send out our copy of the membership list to others.
    • The receiving server will merge the received list with it own list.
    • If the receiving servers list is more up to date, then it sends a response to and the original sender has to merge.
    • timestamps are used to determine who is more recent
This the case of multiple goroutines reading and writing a data structure concurrently. 

Example 1: Channels 

The peer struct shown in the code below defines a channel InMessagesChan for incoming messages and a channel OurMessgesChan for outgoing messages.



The receiveMessages goroutine method has for loop with following code. It reads a message from a Grpc stream and sends it to the InMessagesChannel.



The processMessageLoop goroutine method in peer.go has a loop that takes messages from the InMessagesChannel and processes them.  When a response is needed, it processes the message and writes the response that needs to go out to the OutMessagesChannel. The image below is shortened version of the real code.



Lastly the sendLoop goroutine method has a for loop that takes messages from the outMessageChannel and writes them to the outbound Grpc stream.



As you can see, very simple. 3 goroutines working concurrently by exchange data over channels. No locking, no synchronization an fewer problems.

In my opinion, channels is one of the best features of Go.

Example 2 : Shared memory


I have this struct cluster which has the list of peers in the cluster



We need to add /update / remove entries from the map in a thread safe manner. The code below uses the mutex to synchronize access to the map. These methods are either called from both peer.go and cluster.go.



The code below shows a loop from the ClusterInfoGossip method that uses the same mutex to synchronize the map while iterating over it. Typically the calls from add, remove and gossip happen from different goroutines. If you do not synchronize, you will have memory visibility problems. Note that since peer is updated, we need to synchronize access to the peer as well.





You might be wondering, why can I not just do this using channels when it is safer and cleaner ? What you would do is to write a function with like an event loop.  The function can accept commands on a channel and read , update or iterate over the map based on the command. The code would look something like below



Which approach you take is sometimes a personal choice. For the message processing, I preferred channels. But for CRUD on a map, I preferred shared memory.

Conclusion

In conclusion, what I have shown you here is a non trivial concurrency example in Go. Go makes it quite easy to write concurrent programs. The recommended way to share data between goroutines is by channels. By using channels, you can avoid race conditions and memory visibility issues. But the traditional approach of shared memory with synchronization is also supported. 

As I build out Dynago, I plan to blog about the interesting pieces.  If you like my blogs, please follow me on LinkedIn and/or X/twitter.

Saturday, March 1, 2025

Multi Version Concurrency Control (MVCC) in databases

Introduction

Multi version concurrency control (MVCC) is a popular optimistic technique used in modern databases for concurrency control.

MVCC does not use locking. In that regard it is an optimistic technique but distinct from what is know as the optimistic concurrency control (OCC).

MVCC is timestamp based.

MVCC does its concurrency control by keeping multiple copies or versions of each data item. Every transaction sees data only of a specific version , also known as snapshot. Changes made by a transaction will not be seen by others, until the changes are committed.

Concepts

Versioning

Versioning tuples is at the core of how MVCC does its concurrency control.

Consider the table 

id, balance

A, 500

B, 1000

Now logically assume that under the hood, the database adds 2 hidden columns - start time (st) and end time (et)

id, balance, st. , et

A, 500 , 1 , -

B, 1000, 2, -

Let us assume that increasing numbers 1,2 ..... represent time and a - implies infinity or no end time. The version number can be a timestamp or something analogous to a timestamp.

In the above example, we are saying that the first tuple (A, 500) is valid from time 1 to infinity. The second tuple (B, 1000) is valid from 2 to infinity.

With versioning, when a tuple is updated, the original one is unchanged. Instead a new row is added.

Let us say at time 3, A is updated to 550. Logically the table would look like

A, 500 , 1 , 3

A, 550 , 3 , -

B, 1000, 2, -

The latest committed version has an end time of infinity. When a new update for a tuple is committed, the end timestamp of the previous latest is changed to the start timestamp of the new latest.

A transaction reads the latest committed version at the time it starts. A transaction that starts at time 2 and reads A will read 500. But a transaction that starts at 4 and reads A will read 550.

Snapshot isolation

Every transaction only reads committed values at the time it starts. It is a snapshot of the database at that time. This referred to as Snapshot isolation.

However it will read any changes that it makes within the scope of its transaction.

Use cases

It is easier to understand MVCC with some examples,

Lost update problem

Let us start with the same table again.

id, balance, st. , et

A, 500 , 1 , -

At time 2, transaction T1 reads A. The latest version with timestamp (1, -) has value 500. It reads 500.

At time 3, transaction T2 reads the same row. The latest version is still (1, -) . It too reads 500.

At time 3, transaction T2 updates the value to 550 and commits. We now have an addition version

A, 500 , 1 , 3
A, 550, 3 , - 

At time 5, T1 updates the value to 600. 
At time 6, T1 tries to commit. 

T1s commit is disallowed and it is forced to abort. 
The reason is that T1's snapshot is of time 1. After T1 started, another transaction came along , performed a WRITE on the same record and committed. If we allowed T1 to also commit, it would overwrite T2's update. This is the lost update problem.

The rule we can deduce here is : The write set of the committing transaction T1 should not intersect with the write set of any transactions that committed after T1 started and before T1 commits.

Write Skew

Let us start this time with the table having 2 tuples.

id, balance, st. , et

A, 500 , 1 , -

B, 1000, 2, -


At time 1, transaction T1 reads A. The latest version with timestamp (1, -) has value 500. It reads 500.
At time 3 , transaction T1 writes A to 600. But it is still uncommitted.

A, 500 , 1 , -, 

A, 600 , 3 , - , uncommitted

B, 1000, 2, -

At time 4, transaction T2 reads A. The latest committed record is still at timestamp 1. It will read 500 ( not 600 ).

At time 5, T1 commits. The write set to T1 does not intersect with the write set of any other transaction. So it allowed to commit. So we have

A, 500 , 1 , 5, 

A, 600 , 5 , - , 

B, 1000, 2, -, 

At time 6, T2 updates B to 1200

B, 1200, 6, -, uncommitted

At time 7, T2 tries to commit. It is disallowed and T2 is forced to abort. The reason is that T2 read A which was committed after T2 started. Even though T2 does not update A. It might be using A (that is read earlier) to update say B and that update could be incorrect.This is the write skew problem.

We can thus deduce rule 2: The read set of a committing transaction T should not intersect with the write set of any transaction that has committed after T started.

Phantom reads

Let us look at a case where a new record is insert into the table.

id, balance, st. , et

A, 500 , 1 , -

B, 1000, 2, -

At time 3 , transaction T1 runs the query "select sum(balance) from table where balance >=500 ". It gets a result 1500.

At time 4, transaction T2 inserts a new row, (C, 600) and commits

id, balance, st. , et

A, 500 , 1 , -

B, 1000, 2, -

C, 600, 4 , -

At time 5, transaction T1  inserts the 1500 value it read into another table.

At time 6, T1 tries to commit. The transaction is not allowed to proceed and aborts.

The reason is that the 1500 value is no longer accurate. At time 4 , another transaction committed a new row and the value should be 2100. 

Rule 3 : If the committing transaction T has predicate queries  (where clause ) that depend to number of rows and affected by inserts/deletes, then just before committing the queries need to be run again. The commit is allowed only if the results are the same as before.

Obviously running the queries a second time can be expensive. But that is implementation detail and databases do various optimization.

Garbage collection

Maintaining versions for each and every record can take up a lot of disk space. Databases have background process or other method to remove old versions that have no active transaction depending on them.

Algorithm

Every tuple is versioned using timestamp or something analogous to it.

Every transaction only reads rows that are committed as of the start time of the transaction. This holds for the entire duration of the transaction. It cannot read any commits that happened after it starts. In other words it sees a "Snapshot" of the data, as of its start time. This is Snapshot isolation.

When a transaction tries to commit, we check if any other transaction has committed a new version for the rows that we read or write. If there are any such transactions, then we must abort.

Even if our read/write set does not intersect with any other transaction, if our read is impacted by inserts or deletes, we much abort.

To make this happen, the database needs to maintain start time, end time for every tuple. Not just the latest version, but older versions as well. 

For every tuple that is read or writen, the database needs to know which transactions are active and which committed.

Older versions need to cleaned up when there are no active transactions depending on them.

For a transaction to be allowed to commit, there rules we discussed apply:

#1 : Write set of a committing transaction T should not intersect with Write set of any committed transaction that committed after T started.

#2: The Read set of a committing transaction T should not intersect with the write set of any transaction that has committed after T started.

#3: When the committing transaction T has predicate queries (WHERE clause, depend on number of rows meeting condition ), then before committing, the queries need to be rerun to ensure that they return the same result.

Some additional rules:

#4 Write set of a committing transaction T should not intersect with Read set of any transaction that committed after T started.

#5 If a transaction T updates a row, T should use its own updated version, not the one in older snapshot it started with.

Commercial Databases supporting MVCC

Almost all the modern databases we know off support MVCC.

PostgreSql
Mysql
Oracle
CockroachDB
SqlServer
.... and more

Advantages of MVCC

  • Readers do not block readers. Writers do not block readers.
  • Each transaction works under a consistent snapshot of data.
  • High concurrency is possible.
  • Databases can allow querying of old versions, which are known as time travel queries.

Disadvantages of MVCC

  • Increased storage requirement because we are storing older version.
  • Need to do garbage collection.
  • When conflicts are detect and transactions aborted, applications need to retry.

Conclusion

MVCC is a popular concurrency control technique used in many currently popular databases like Postgresql.

It does not use two phase locking. Unlike OCC, it does not use a staging area. The staging area is built in with versioning.

Like OCC, it works best for workloads where there are few conflicts.

In conclusion, Multi-Version Concurrency Control (MVCC) offers a robust approach to handling concurrent transactions in modern databases by maintaining multiple versions of data

While it provides significant benefits such as high read throughput, reduced contention, and snapshot isolation, it also introduces complexity in garbage collection and storage management.

Modern distributed databases combine versioning with clock time to provide strict serializability even in  distributed databases without the use of locking. But that is a topic for another blog.

References

Prof. Jen Dittrich Youtube videos
Andy Pavlov CMU Database Youtube video
Database Internals by Alex Petrov
Database Management Systems by Ramakrishnan & Gehrke

Saturday, February 1, 2025

Optimistic Concurrency Control In Databases

Introduction

Optimistic concurrency control refers to concurrency control in databases without the use of explicit locks. Traditionally it is done using locks.  But locking reduces throughput and scalability. Modern databases therefore prefer not to use locking and use optimistic techniques. This is a brief description of the optimistic technique.

Why concurrency control ?

Concurrency control in databases refers to techniques used to execute transactions concurrently.

From a end users standpoint, every transaction needs to execute correctly and preserve the consistency of the database, irrespective of what other transactions are doing. For the user, it should appear that the transactions are executing in some serial order.

When multiple transactions read and/or write the same data several kinds of errors can happen such as

  • dirty reads - reading uncommitted data
  • non repeatable reads - read a second time and you get a different value
  • phantom reads - read a second time and you get additional rows
  • Lost update - one transaction overwrites others write
  • dirty write - a transaction reads an uncommitted value and updates it
  • write skew - the combination of two transactions breaks some invariant like minimum balance
At a user level, databases give the users a knob called the isolation level to protect against some of these errors. For an explanation of isolation levels, see the blog I wrote many a few years ago.

But under the hood databases implement concurrency control to ensure that users see a consistent view of the data at all times.

There are 3 main types of concurrency control:
  • Pessimistic concurrency control
  • Optimistic concurrency control (OCC) 
  • Multi version concurrency control (MVCC)

The pessimistic approach involves locking resources - rows, tables , pages etc while they are used by one transactions. Other transaction that need that resource wait until the lock is released. This approach prevents conflicts but comes at the cost of throughput.

OCC and MVCC take the optimistic approach that conflicts rarely happen. They lets transactions proceed without acquiring locks. If a conflict is detect downstream, one of the transaction is forced to abort.

In this blog we discuss one such concurrency control technique - optimistic concurrency control. I will discuss the other two in subsequent blogs.

What is optimistic concurrency control ?

As the name suggests, the approach is optimistic. Optimistic implies a positive view of things that most transactions do not conflict with other and there is no need to slow things down by acquiring locks and holding on to them. 

Assumptions are that
  • Transactions are short lived
  • They rarely conflict
The concurrency control here is done in 3 stages.

In the first stage (read) the transaction executes its operations in a private or staging area. Rows that are read known as the read set and rows that are updated known as the write set are identified.

In the second stage (validation) the operations are checked for conflicts. Rows that are being read by one transaction should not be be updated by others. If a conflict is detected, then the transaction is aborted and the staging area cleared

In the third stage, if there are no conflicts the changes are committed.


Detecting the conflicts


Given 2 transactions A and B, 

A and B are given timestamps(TS) before the validation starts.

Let assume TS (A)  < TS (B).

If A committed before B started read, then there are no issues and B is also allowed to commit.

If A committed after B started read but before B started write, then both can commit if the write set of A does not intersect with the read set of B.

If A read completes before B read completes, and the write set of B does not intersect with both the read set of A and write set of A, then both can commit.

In all the 3 cases, A and B are operating on different records and do not conflict.

In this example, we are looking "backward" from B. If there is a conflict B will asked to abort. Some databases look "forward" but the concept is the same.

When a validation is being done, no other transaction is allowed to commit. Only one transaction is validated at a time.

Timestamp based

In pessimistic concurrency control, transactions are ordered in the order in which locks are acquired.

In optimistic concurrency control, transactions are ordered based on timestamps given to transactions.

Commercial databases using OCC

Some example are:

Amazon Aurora DSQL
Yugabyte
CockroachDB
FoundationDB
Fauna
Google Spanner

Advantages of OCC

Higher scalability
Higher throughput
No deadlocks

In general this is suitable for low conflict data access.

Disadvantages

Transactions might need to be aborted.
Applications need to handle aborted transactions and retry.
This might not be suitable for cases when conflict is high - that is multiple transactions needs to access the same data.

Conclusion

Optimistic concurrency control is becoming increasing popular is modern databases that need to scale and can be distributed across multiple node. It really works well in low data contention scenarios. It is ideal for distributed databases where the locking approach becomes even more expensive when you need to lock across nodes - as would be needed in modern distributed databases. For reasons mentioned above, distributed , cloud native, high concurrency systems are moving towards OCC.

References:

1. Database Management Systems - Ramakrishnan and Gehrke
2. Database Internals - Petrov
3. Andy Pavlov CMU lecture on youtube




Saturday, January 4, 2025

Dynago : A highly available distributed key value store

Introduction

Dynago is my  hobby project to implement a minimal implementation of the distributed system described the paper Dynamo : Amazons highly available key-value store.

This is the first in a series of blogs when I describe how I go about building this and post updates on the progress.

The is my way of  "#buildinpublic".

The github repository is at https://github.com/mdkhanga/dynago

Why am I doing it ?

The primary goal is learning. Learning not just for me but also for others who may read this blog and review the code.

A secondary goal is build something useful.  Perhaps I may use it somewhere or someone else might. 

It is possible that parts of the code might be reusable in another project.

Why a Dynamo clone ?

The Dynamo paper is the first paper that got me interested in distributed systems. It has been by desire to build something like that for a long time.

In the real world, DynamoDB is Amazon highly available key/value database. Apache Cassandra is also a project inspired by this paper.

Tech stack

I pick Go as the programming language. Why Go ? I have already done a couple of complex projects in Java (B+tree,  Raft protocol). So I was not interested in doing it in Java. I am not yet that familiar with Rust. And C/C++ felt like going back in time. So this is an opportunity to do a complex project in Go.

For network communication / RPC, I use GRPC instead of  programming to sockets. My Raft implementation is based on sockets. I wanted to try Grpc. If it does not work out, I can switch to sockets.

Storage engine is TBD. Early versions will be in-memory. In the future Rocksdb or another embeddable store is a possibility.

Some technologies we might learn here:

  • Go programming
  • Distributed systems 
  • Databases
  • Network programming
  • Programming servers from scratch
  • Concurrency
  • High availability

Implementation plan

I plan to implement in the following tentative order:

1. Cluster membership

Be able to start multiple servers connect and form a cluster. Gossip to detects servers joining and leaving the cluster.

2. Client API

Simple Get and Put API

Replicate to all nodes.

3. Partitioning using consistent hashing

4. Partitioning with replicas

5. Quorum based read / writes

6. Versioning

7. Hinted handoff

...... and so on

Conclusion

If you are interested in distributed systems and/or databases, I invite you to follow along.

I am open to suggestions and discussion. If you know more, I am happy to learn from you.

If you like what I am working on, please follow me on twitter/X or LinkedIn.

Saturday, July 20, 2024

Replication in modern data systems

Overview

Replication means making a copy of the data for future use in the case of failures or may be to scale.

Why is it a big deal ? We copy files for backup all the time. For static files, that do not change, making a copy is 1 copy command. But if the data is being updated by users all the time. How often do you run the command to copy. How do you keep the copy in sync with the source ?

That is the problem of replication in databases and data systems. All databases have replication built in that you can setup with a command or two. So why read or discuss it? If you are building a distributed systems that involves data, you will need to replicate data. The concepts from databases will be useful. 

While replication is most well known for its use with databases, it is also a critical part of distributed systems where the data is unstructured such as distributed file systems (HDFS) or messaging systems (Apache Kafka) 

This post covers replication in traditional single node systems as well as modern distributed systems.



Why do we need replication ?

There are several reasons why replication is needed. It is more than just taking a backup.

Redundancy

Make a copy of the data. When the main server becomes unavailable for any reason, switch to the copy. This is ensure that the data is always available.

Scalability

Your data becomes really popular and the database gets a lot of read requests and cannot keep up. So you make copies of the database and have a load balancer distribute the request across to the copies (replicas).

Geo distribution of data

Bring the data close of user. You have users in Americas, Europe and Asia. Data from americas is replicated to Europe and Asia, so users there can read data locally without making a round trip to the americas for every read.

Secondary use cases

These are lesser known and unconventional use cases. They might be done higher up in the stack at the application layer or middleware than in the database. 

Mirroring

Mirroring involves replicating the requests to the application to a copy of the entire application stack. You can think of this as application level replication.



For example, for a REST service, this involves sending the http request, not just to the production service but also to a mirror service.

The mirror service reads and writes from the mirror database. Mirror database is a previous replica that was in sync with the leader. Just before starting mirroring, it is discontinued as a replica so it does not get duplicates.

Mirroring can be used for testing large complex changes against production traffic.

Data in the mirror database is then compared with data in the production database for accuracy.

Testing

A regular database replica is used as a test database. Various kinds of tests - feature tests, performance tests, concurrency tests, scalability tests can be run with services running with the replica. This is a different use case from mirroring.

Migration

This can be used to eliminate or reduce downtimes needed for migration.

Create additional replicas.

Run migration on them.

Rollover the application services to the new database replicas.

Replication strategies

Single leader

This is the most common pattern. It shown in Figure 1.

One server is designated as the leader. The others are followers. All writes go to the leader. The leader replicates the writes to the followers.

The advantages are :

Setting up is fairly easy.

Reads become scalable. You can put a load balancer in front and distribute read requests to followers.

High availability: If the leader fails, you fail over to one of the followers and let it become the leader.

The disadvantages are :

All the writes go to one server , the leader. So this can become a bottleneck. Writes are not scaled. 

If you read from a replica that is behind on replication, you might read stale data.

Multi leader

Writes can go to more than one server.

Multi leader replication is needed when

(1) Writes and replication needs to happen across geographically distributed areas.

(2) Connectivity to single leader is not guaranteed. The is usually the case with mobile devices or laptops or when people want the ability to work offline and/or multiple devices.

In the geo distributed case, the writes go to a local local leader. The local leader not only replicates to local replicas but also to the distributed leader (who replicate to their replicas).

In the mobile case, the writes are store locally and the replicated periodically when connectivity is available.

Advantages:

Writes are also scaled.

Writes can done locally or close to clients. Better latency for writes.

Disadvantages:

Since writes happen at multiple leaders. There can be conflict. The conflicts need to be resolved.

Leaderless

In the leaderless model, all nodes are equal and no node is designated leader. Writes can go to any node and that node replicates the write to other nodes. This is the model made popular by AWS Dynamo and later adopted by Cassandra.


Consensus based replication

All the above methods have either write conflict or read consistency issues. Raft and Paxos are two well protocols for replicating log entries. Data to be replicated is modeled as a list of entries in log. The short story is that one server sends one entry or a sequence of entries to others and it is considered committed if a majority of servers acknowledge having received them. Raft has leader election but Paxos is leaderless. Raft protocol describes in detail leader election, replication, server crashes, recovery and consistency checks. The paper is a good read for anyone interested in distributed systems.

Replication Implementations

The first three techniques apply to databases which deal with structured data and are a little more complicated.

Statement based replication

In this approach, the SQL statements such as INSERT/UPDATE/DELETE etc are forwarded as they are from the leaders to the followers. While this can work in most cases, it does not work in certain cases such as timestamps or when you generate an id or a random number.

It is not efficient either. If you insert a record and then delete it, why replicate both commands ?

Write ahead log (WAL) replication

Databases first append every write to the WAL before doing anything else, before writing it to structured storage from where it will be read. WAL is used for recovery. If the database crashed, it state is reconstructed from the WAL. A recent slogan has been "The WAL is the database". Replication here involves replicating the WAL.

A disadvantage is that WAL entries contain where specific storage details like which byte in which block is to be updated. This can create compatibility issues if the leader and followers are on different versions.

Logical replication

A logical log on the other hand captures at a row, how the table was changed. You can view this as an approach somewhere between statement based and WAL replication.

Change data capture is a form of logical replication. It is used to replicate changes in a database to other third party systems. A popular use case is data warehousing where data from multiple sources is aggregated and summarized for analytics. 

Unstructured data replication

For unstructured data as in distributed file systems the unit for replication is a block of data. Data is first partitioned into blocks and each block is replicated independently.

Potential issues with replication

Replication Lag

Most of the time replication is asynchronous. Client writes to the leader and returns before any acknowledgement that it has been replicated. Synchronous replication is not viable due both performance and availability issues. A single failure can hold up all replications.

Lost write

However, one problem this creates is that if you read immediately after a write, the replica you are reading from may not yet have your last write.

Inconsistent read

If you read multiple times in quick succession ( same read) , each read may get a different result depending on which replica services the read ( as the replicas may be in different stages of replication)

Cassandra addressed this issue using quorum. CockroachDb uses a consensus protocol like Raft.

Write Conflicts

Write conflict is an issue in multi leader replication. This happens when multiple clients update the same data while talking to a different master. The database does not know which update to accept and how they should be merged. This is similar to a merge conflict in git.

An approach to handle conflicts is to store both versions on write. But on read, send both versions to the client and let the the client resolve the conflict

Replication is real world systems

The product documentation for database on replication can be quite confusing. It best to follow a tutorial or blog in the internet.

Postgres

The documentation and blogs describe it in 2 ways.

You can set it up as synchronous, asynchronous , streaming , log file based etc

And it can be WAL based or logical replication. Statement based is rarely seen.

In snapshot replication, a snapshot of the database is taken and replicated to followers.

Instead of streaming, you can also setup the replication as file based, where the WAL files are periodically shipped to followers.

In WAL replication, replication slots lets the leader track how much of the WAL is replicated to each replica. This helps the leader not discard segments not yet replicated. But this consumes resources on the leader. Replication slots need to be managed and deleted when not needed.

Mysql

The traditional way in mysql was a logical replication based on their binlog file - a binary format for logical changes.

The newer way is based on global transaction identifier (GTID) which is built on top of the binlog. It can be either statement based or row based.

Dynamo / Cassandra

In this architecture, replication is fundamental to the architecture. All you need to do is to set the replication factor to greater than 1. All servers are equal - no leader and no follower. Writes can go to any server. Partitioning is also fundamental to the architecture. The server that receives the write redirects the write to appropriate server. From here it is replicated to other servers based on the replication factor.

Consistency issues are addressed using quorum based tunable consistency. Quorum mean a majority which is (RF/2+1) agree on something. If you have replication factor (RF) 3, quorum is 2. So on a write, at least 2 nodes need to acknowledge that the write was saved. On read, at least 2 nodes need to agree on the return value. In general, to avoid inconsistencies, you want Read quorum (R)+ Write quorum (W) > RF .

CockroachDb

CockroachDB uses the Raft distributed consensus protocol to ensure that a majority of replicas are in consensus before any change is committed. This is the safest approach to ensure consistency but comes at a cost.

Apache Kafka

In Kafka, messages are sent and received from topics. Topics are split into partition. Each partition has one leader and a configurable number of replicas. Writes go to the leader which replicates to the replicas. Reads can go to the replicas. Each broker is a leader for some partitions but a follower for other partitions. Like Cassandra and CockroachDb, replication is core to the architecture and easy to setup.

Apache Hadoop (HDFS)

This applies to any distributed file system. The file is a sequence of blocks of data. HDFS has a name node and data nodes. Name node maintains a map of which data nodes have the blocks of a file. Each block is replicated to a configurable number of data nodes.

Conclusion

Replication is a critical piece of any distributed data system. It has to be part of the core architecture. It cannot come after the fact like it did in the past. While redundancy and HA are well known benefits, there are other benefits such geo distribution of data as well. It can cause some effects such as read consistency. Care should be taken to address those. Different products use different strategies. You should be familiar with the replication strategies, configuration and side effects for your data product. If you are building a new system with data, understanding how existing systems replicate and the issues they face, can help you design your replication.