Saturday, July 20, 2024

Replication in modern data systems

Overview

Replication means making a copy of the data for future use in the case of failures or may be to scale.

Why is it a big deal ? We copy files for backup all the time. For static files, that do not change, making a copy is 1 copy command. But if the data is being updated by users all the time. How often do you run the command to copy. How do you keep the copy in sync with the source ?

That is the problem of replication in databases and data systems. All databases have replication built in that you can setup with a command or two. So why read or discuss it? If you are building a distributed systems that involves data, you will need to replicate data. The concepts from databases will be useful. 

While replication is most well known for its use with databases, it is also a critical part of distributed systems where the data is unstructured such as distributed file systems (HDFS) or messaging systems (Apache Kafka) 

This post covers replication in traditional single node systems as well as modern distributed systems.



Why do we need replication ?

There are several reasons why replication is needed. It is more than just taking a backup.

Redundancy

Make a copy of the data. When the main server becomes unavailable for any reason, switch to the copy. This is ensure that the data is always available.

Scalability

Your data becomes really popular and the database gets a lot of read requests and cannot keep up. So you make copies of the database and have a load balancer distribute the request across to the copies (replicas).

Geo distribution of data

Bring the data close of user. You have users in Americas, Europe and Asia. Data from americas is replicated to Europe and Asia, so users there can read data locally without making a round trip to the americas for every read.

Secondary use cases

These are lesser known and unconventional use cases. They might be done higher up in the stack at the application layer or middleware than in the database. 

Mirroring

Mirroring involves replicating the requests to the application to a copy of the entire application stack. You can think of this as application level replication.



For example, for a REST service, this involves sending the http request, not just to the production service but also to a mirror service.

The mirror service reads and writes from the mirror database. Mirror database is a previous replica that was in sync with the leader. Just before starting mirroring, it is discontinued as a replica so it does not get duplicates.

Mirroring can be used for testing large complex changes against production traffic.

Data in the mirror database is then compared with data in the production database for accuracy.

Testing

A regular database replica is used as a test database. Various kinds of tests - feature tests, performance tests, concurrency tests, scalability tests can be run with services running with the replica. This is a different use case from mirroring.

Migration

This can be used to eliminate or reduce downtimes needed for migration.

Create additional replicas.

Run migration on them.

Rollover the application services to the new database replicas.

Replication strategies

Single leader

This is the most common pattern. It shown in Figure 1.

One server is designated as the leader. The others are followers. All writes go to the leader. The leader replicates the writes to the followers.

The advantages are :

Setting up is fairly easy.

Reads become scalable. You can put a load balancer in front and distribute read requests to followers.

High availability: If the leader fails, you fail over to one of the followers and let it become the leader.

The disadvantages are :

All the writes go to one server , the leader. So this can become a bottleneck. Writes are not scaled. 

If you read from a replica that is behind on replication, you might read stale data.

Multi leader

Writes can go to more than one server.

Multi leader replication is needed when

(1) Writes and replication needs to happen across geographically distributed areas.

(2) Connectivity to single leader is not guaranteed. The is usually the case with mobile devices or laptops or when people want the ability to work offline and/or multiple devices.

In the geo distributed case, the writes go to a local local leader. The local leader not only replicates to local replicas but also to the distributed leader (who replicate to their replicas).

In the mobile case, the writes are store locally and the replicated periodically when connectivity is available.

Advantages:

Writes are also scaled.

Writes can done locally or close to clients. Better latency for writes.

Disadvantages:

Since writes happen at multiple leaders. There can be conflict. The conflicts need to be resolved.

Leaderless

In the leaderless model, all nodes are equal and no node is designated leader. Writes can go to any node and that node replicates the write to other nodes. This is the model made popular by AWS Dynamo and later adopted by Cassandra.


Consensus based replication

All the above methods have either write conflict or read consistency issues. Raft and Paxos are two well protocols for replicating log entries. Data to be replicated is modeled as a list of entries in log. The short story is that one server sends one entry or a sequence of entries to others and it is considered committed if a majority of servers acknowledge having received them. Raft has leader election but Paxos is leaderless. Raft protocol describes in detail leader election, replication, server crashes, recovery and consistency checks. The paper is a good read for anyone interested in distributed systems.

Replication Implementations

The first three techniques apply to databases which deal with structured data and are a little more complicated.

Statement based replication

In this approach, the SQL statements such as INSERT/UPDATE/DELETE etc are forwarded as they are from the leaders to the followers. While this can work in most cases, it does not work in certain cases such as timestamps or when you generate an id or a random number.

It is not efficient either. If you insert a record and then delete it, why replicate both commands ?

Write ahead log (WAL) replication

Databases first append every write to the WAL before doing anything else, before writing it to structured storage from where it will be read. WAL is used for recovery. If the database crashed, it state is reconstructed from the WAL. A recent slogan has been "The WAL is the database". Replication here involves replicating the WAL.

A disadvantage is that WAL entries contain where specific storage details like which byte in which block is to be updated. This can create compatibility issues if the leader and followers are on different versions.

Logical replication

A logical log on the other hand captures at a row, how the table was changed. You can view this as an approach somewhere between statement based and WAL replication.

Change data capture is a form of logical replication. It is used to replicate changes in a database to other third party systems. A popular use case is data warehousing where data from multiple sources is aggregated and summarized for analytics. 

Unstructured data replication

For unstructured data as in distributed file systems the unit for replication is a block of data. Data is first partitioned into blocks and each block is replicated independently.

Potential issues with replication

Replication Lag

Most of the time replication is asynchronous. Client writes to the leader and returns before any acknowledgement that it has been replicated. Synchronous replication is not viable due both performance and availability issues. A single failure can hold up all replications.

Lost write

However, one problem this creates is that if you read immediately after a write, the replica you are reading from may not yet have your last write.

Inconsistent read

If you read multiple times in quick succession ( same read) , each read may get a different result depending on which replica services the read ( as the replicas may be in different stages of replication)

Cassandra addressed this issue using quorum. CockroachDb uses a consensus protocol like Raft.

Write Conflicts

Write conflict is an issue in multi leader replication. This happens when multiple clients update the same data while talking to a different master. The database does not know which update to accept and how they should be merged. This is similar to a merge conflict in git.

An approach to handle conflicts is to store both versions on write. But on read, send both versions to the client and let the the client resolve the conflict

Replication is real world systems

The product documentation for database on replication can be quite confusing. It best to follow a tutorial or blog in the internet.

Postgres

The documentation and blogs describe it in 2 ways.

You can set it up as synchronous, asynchronous , streaming , log file based etc

And it can be WAL based or logical replication. Statement based is rarely seen.

In snapshot replication, a snapshot of the database is taken and replicated to followers.

Instead of streaming, you can also setup the replication as file based, where the WAL files are periodically shipped to followers.

In WAL replication, replication slots lets the leader track how much of the WAL is replicated to each replica. This helps the leader not discard segments not yet replicated. But this consumes resources on the leader. Replication slots need to be managed and deleted when not needed.

Mysql

The traditional way in mysql was a logical replication based on their binlog file - a binary format for logical changes.

The newer way is based on global transaction identifier (GTID) which is built on top of the binlog. It can be either statement based or row based.

Dynamo / Cassandra

In this architecture, replication is fundamental to the architecture. All you need to do is to set the replication factor to greater than 1. All servers are equal - no leader and no follower. Writes can go to any server. Partitioning is also fundamental to the architecture. The server that receives the write redirects the write to appropriate server. From here it is replicated to other servers based on the replication factor.

Consistency issues are addressed using quorum based tunable consistency. Quorum mean a majority which is (RF/2+1) agree on something. If you have replication factor (RF) 3, quorum is 2. So on a write, at least 2 nodes need to acknowledge that the write was saved. On read, at least 2 nodes need to agree on the return value. In general, to avoid inconsistencies, you want Read quorum (R)+ Write quorum (W) > RF .

CockroachDb

CockroachDB uses the Raft distributed consensus protocol to ensure that a majority of replicas are in consensus before any change is committed. This is the safest approach to ensure consistency but comes at a cost.

Apache Kafka

In Kafka, messages are sent and received from topics. Topics are split into partition. Each partition has one leader and a configurable number of replicas. Writes go to the leader which replicates to the replicas. Reads can go to the replicas. Each broker is a leader for some partitions but a follower for other partitions. Like Cassandra and CockroachDb, replication is core to the architecture and easy to setup.

Apache Hadoop (HDFS)

This applies to any distributed file system. The file is a sequence of blocks of data. HDFS has a name node and data nodes. Name node maintains a map of which data nodes have the blocks of a file. Each block is replicated to a configurable number of data nodes.

Conclusion

Replication is a critical piece of any distributed data system. It has to be part of the core architecture. It cannot come after the fact like it did in the past. While redundancy and HA are well known benefits, there are other benefits such geo distribution of data as well. It can cause some effects such as read consistency. Care should be taken to address those. Different products use different strategies. You should be familiar with the replication strategies, configuration and side effects for your data product. If you are building a new system with data, understanding how existing systems replicate and the issues they face, can help you design your replication.

Wednesday, July 3, 2024

How modern distributed systems scale by partitioning ?

 

1.0 Introduction

In the last 20 years, software systems moved to the internet and handled large volumes of data and millions of requests. Most people interact with these systems using a browser or a mobile device. At the back end, is not one powerful computer but generally a network of commodity computers. Both the processing and storage of data is spread across multiple computers. In this blog we discuss how large datasets can be stored using multiple commodity computers.

Partitioning is the process of breaking up a large dataset in parts so that each part can fit easily on the disk on one one node and be efficiently managed by each node. For very large data sets that cannot fit on 1 machine, data needs to be broken up into parts ( partition or shard). Each partition is stored on a different machine. This is just natural horizontal scaling. But most important is that, when it time to read the partitioned data, we need to be able to find (efficiently) which partition and node has the data we want to read.

Storage space is not the only benefit of partitioning. You are also spreading the compute required to read, write and process the data.

Partitioning is generally combined with replication to make the partitions highly available. But we do not discuss replication here. That is a topic for another blog.



2.0 Types of partitioning

There are 2 types of data that need to be considered: unstructured and structured.

Most discussions on partitioning discuss partitioning of data in databases ( structured data ) but not unstructured data which is outside databases in plain files. This blog discusses both unstructured data and structured data.

2.1 Structured data

The problem is more interesting for databases because it is not enough to break up the dataset into smaller parts. During reads you need to be able to find the data. And you need to do it fast. When the database receives a query - "Give me records for Customer X", How does it know which node hosts the data ? Does the database have to send the request to all the nodes ? That would be quite inefficient.

The goal is thus to partition data and query it efficiently. Another goal is to ensure that distribution of data between partitions is even. You do not want a situation where partition 1 has 70% of the data and the other 5 partitions has the remaining 30%. This will overload partition 1 ( known as a hot spot) and you lose the benefits of partitioning.

2 strategies are commonly used for database partitioning.

2.1.1 Range based partitioning



Database records are generally stored sorted based on the primary key.

Initially there is one partition with zero records.

As clients write to the database, the size of the partition increases. When it reaches a certain size say 10MB or 64MB, it is split into two partitions.

Each partition may be assigned to a different node.

This process is repeated as more data is added and partitions grow. If data is deleted and partitions shrink, then small partitions can be merged

To efficiently query data, the database needs to do some book keeping

-- which key range is in which partition

-- which partition is at which node

Starting with 1 partition and 1 node is not efficient for obvious reasons and databases typically start with configured number of partitions or a number of partitions proportional to the number of nodes.

To balance the load on nodes, partitions may need to be moved between nodes.

This is the strategy used by HBase, CockroachDb, MongoDb.

2.1.2 Hash based partitioning



The hash value calculated from the key is used to determine the location where the record can be stored.

The wrong way to determine the node is by using hash mod n, where n is the number of nodes. The problem with this approach is that when nodes are added or removed from the cluster, a very high percentage of the keys need to be removed.

A better approach is to start with a fixed number of partitions , way more that the number of the nodes the cluster will ever have , say for example 1000 or 10000. Partitions are logical. Hash ranges are assigned to partitions. Partitions are assigned to nodes either using numPartitions mod numNodes or other algorithms. This is shown in the top half of Figure 4. The bottom half of figure 4 visualizes the same as a ring as is done in many articles that to refers to this as consistent hashing. Think of partitions being placed on the ring. Each partition owns the key space from the position of the previous position to its position. The cluster needs to maintain a mapping of partitions to nodes. When a new node is added, the cluster can take a few partitions from existing nodes and assign to the new nodes. When a node is removed, the cluster assigns its partitions to other nodes. Looking up a key is a extra level of indirection. The hash of key maps to a partition. The partition node map tell you which node has the partition that has the key. Many studies has shown that this lead to less movement of keys between nodes as the cluster changes.

In popular press, this has the poorly understood name "consistent hashing". It is just hast hash based partitioning. Nothing consistent and nothing to do with consistency.

2.1.3 Secondary Indexes

So far we have been talking about partitioning by the primary key, also know as the primary index.

To speed up retrieval of records, databases also have secondary indexes which can be very large and might need to be partitioned.

One approach can be to keep the secondary indexes local to the node on which the primary index partition is. The advantage of this approach is that since all related rows are on the same node, inserts/updates/deletes are are all local. But queries on the secondary index requires sending queries to all nodes and aggregating the responses.

Another approach is to create a global secondary index and partition it as an independent entity. However since the secondary index partitions might be on a different partition from the primary partition, CUD operations are more expensive. Transactions might be distributed. However range queries on secondary indexes are more efficient since closer records (by sort) are on the same partition.

2.2 Unstructured data

Unstructured data refer to ordinary files that have text or binary data. Of course we are talking about large files or many large files. This is the use case for a distributed file system such as HDFS (hadoop file system) or GFS ( Amazon ). Logically the implementation of distributed file system is similar to say a linux filesystem. You view the file system as a list of blocks of fixed size. On a single node linux file system , all the disk blocks are on one node. In a distributed file system, the blocks are spread across multiple nodes. In HDFS, the name node maintains the metadata for the distributed filesystem -- given a file , which blocks make up the file and which nodes have the blocks. To create a file, the name node may assign a block on a particular node and the client talks directly to service called data node running on the target node to write to block. To read a block, the name node directs the client to the data node that hosts the block and the client reads directly from that block. But the basic algorithm is simple -- break up the file data into blocks and spread them across nodes.



Another example is partitioning of topic logs in Apache Kafka is a messaging system ( they like to call it event streaming) where producers write messages to a topic and consumers read messages from the topic. The storage for a topic is a log file. New messages are appended to the end of log file. They are read from the front. Obviously the logs can grow beyond what can fit on a node. So log is broken into partitions and distributed across multiple nodes. The broker serves producers and consumer. A Kafka cluster has multiple brokers with each broker running on a seprate node managing multiple partitions.

3.0 Rebalancing

Rebalancing is the process of moving partitions between nodes to make the distribution of load even across all the nodes. This is necessary when nodes join or leave the cluster or if the cluster starts receiving more data for certain keys. Either way rebalancing is an expensive operation that needs lot of CPU, memory and network bandwidth. It can have an impact on the performance of regular CRUD processing. In a ideal world, we would like rebalancing to happen automatically behind the scenes, without end users knowing about it. But for performance reasons listed above, that rarely works well in practice. Some databases require an admin to manually start a rebalance, which can be done during a period of low load and monitored.

4.0 Routing

How does a client know which partition to connect to ? The short answer is that the database has to maintain mapping of partition to node. In the case of hash based partitioning, hash maps to a partition which maps to a node. In the case of range based partitioning the key maps to a key range which maps to a partition which maps to a node. The partition node map is available to nodes. If a client can connect to any node, then if the node does not have the partition to handle the clients request, it can redirect the client to the appropriate node or it can get the data from the target node and return to client.

5.0 Use case

There are cases where you might not have a ready made database doing server side partitioning for you and you might need to do it yourself. Or even when the database does it for you, you still need to pick the right partition key for the partitioning to be optimal. Let us look at some large datasets and discuss how they might be logically partitioned.

Let us say you are building a twitter like system.

Say 100 million tweets of 140 character per day

100M * ( 280 bytes + 20 bytes for id, timestamp)

30 GB / day

10 TB / year

Need to store 5 years to data

Need to store and query 50 TB

How does twitter work ?

Users follow other users.

When a user connects, we need to show the most recent tweets from the users he follows.

So we need to store about 50 TB of tweets. Given a user, we need to query say the 50 most recent tweets from the users he follows.

Using commodity hardware, 50TB would need say 25 nodes. What key would you use to partition the data ?

Option 1 : hash based partitioning based on user.

To store tweets, a hash of the user is to used to locate the node where the tweet is stored. To query - for each user that the user follows, use the hash to query the node for that users tweets. A problem with this approach is that some users tweet way more than other users. Their nodes are going to be overloaded while others are idle. Unbalanced load.

Option 2 : hash based on randomly generated tweet id

The problem with this approach is the for every query, you have to query every server and aggregate the results. Inefficient for queries.

Option 3: hash based on timestamp

Timestamp is relevant because for each feed request we want the latest tweets. It would be good if tweets are sorted by timestamp. However with such as hash, at any given point in time , one server is overloaded as all the writes are going to the server.

Option 4:

Given the choices, inefficient query (option 2) is more tolerable that unbalanced load (option 1) which could crash some of the servers making the system unavailable. But we also want queries to return the most recent data (sorted by timestamp). So we can improve querying little bit by combining option 2 and 3. Assume a timestamp in epoch time in increments of 1 sec, the tweetid could be timestamp + auto incremented seq. The sequence gives randomness to the tweet id and will give uniform distribution across node.

So given a epoch 1692547708 you will have tweet ids like

1692547708 1

1692547708 2

1692547708 3

1692547708 n

Of course you are wondering how to generate unique tweet ids in a distributed system. That is a topic for another blog.

6.0 Summary

Partitioning data and spreading it across nodes is fundamental to distributed system. Special thought needs to be given to how the partitioned data can be queried efficiently. Hash based partitioning and range based partitioning are two popular strategies. Nodes can fail or additional nodes may need to added to scale. To ensure that load is even across nodes, partitions may be moved between nodes in a process called rebalancing. For best results design your partition keys so that load is distributed evenly and querying is efficient.

7.0 Related Content

CockroachDB Review