Showing posts with label Cassandra. Show all posts
Showing posts with label Cassandra. Show all posts

Saturday, January 4, 2025

Dynago : A highly available distributed key value store

Introduction

Dynago is my  hobby project to implement a minimal implementation of the distributed system described the paper Dynamo : Amazons highly available key-value store.

This is the first in a series of blogs when I describe how I go about building this and post updates on the progress.

The is my way of  "#buildinpublic".

The github repository is at https://github.com/mdkhanga/dynago

Why am I doing it ?

The primary goal is learning. Learning not just for me but also for others who may read this blog and review the code.

A secondary goal is build something useful.  Perhaps I may use it somewhere or someone else might. 

It is possible that parts of the code might be reusable in another project.

Why a Dynamo clone ?

The Dynamo paper is the first paper that got me interested in distributed systems. It has been by desire to build something like that for a long time.

In the real world, DynamoDB is Amazon highly available key/value database. Apache Cassandra is also a project inspired by this paper.

Tech stack

I pick Go as the programming language. Why Go ? I have already done a couple of complex projects in Java (B+tree,  Raft protocol). So I was not interested in doing it in Java. I am not yet that familiar with Rust. And C/C++ felt like going back in time. So this is an opportunity to do a complex project in Go.

For network communication / RPC, I use GRPC instead of  programming to sockets. My Raft implementation is based on sockets. I wanted to try Grpc. If it does not work out, I can switch to sockets.

Storage engine is TBD. Early versions will be in-memory. In the future Rocksdb or another embeddable store is a possibility.

Some technologies we might learn here:

  • Go programming
  • Distributed systems 
  • Databases
  • Network programming
  • Programming servers from scratch
  • Concurrency
  • High availability

Implementation plan

I plan to implement in the following tentative order:

1. Cluster membership

Be able to start multiple servers connect and form a cluster. Gossip to detects servers joining and leaving the cluster.

2. Client API

Simple Get and Put API

Replicate to all nodes.

3. Partitioning using consistent hashing

4. Partitioning with replicas

5. Quorum based read / writes

6. Versioning

7. Hinted handoff

...... and so on

Conclusion

If you are interested in distributed systems and/or databases, I invite you to follow along.

I am open to suggestions and discussion. If you know more, I am happy to learn from you.

If you like what I am working on, please follow me on twitter/X or LinkedIn.

Wednesday, July 3, 2024

How modern distributed systems scale by partitioning ?

 

1.0 Introduction

In the last 20 years, software systems moved to the internet and handled large volumes of data and millions of requests. Most people interact with these systems using a browser or a mobile device. At the back end, is not one powerful computer but generally a network of commodity computers. Both the processing and storage of data is spread across multiple computers. In this blog we discuss how large datasets can be stored using multiple commodity computers.

Partitioning is the process of breaking up a large dataset in parts so that each part can fit easily on the disk on one one node and be efficiently managed by each node. For very large data sets that cannot fit on 1 machine, data needs to be broken up into parts ( partition or shard). Each partition is stored on a different machine. This is just natural horizontal scaling. But most important is that, when it time to read the partitioned data, we need to be able to find (efficiently) which partition and node has the data we want to read.

Storage space is not the only benefit of partitioning. You are also spreading the compute required to read, write and process the data.

Partitioning is generally combined with replication to make the partitions highly available. But we do not discuss replication here. That is a topic for another blog.



2.0 Types of partitioning

There are 2 types of data that need to be considered: unstructured and structured.

Most discussions on partitioning discuss partitioning of data in databases ( structured data ) but not unstructured data which is outside databases in plain files. This blog discusses both unstructured data and structured data.

2.1 Structured data

The problem is more interesting for databases because it is not enough to break up the dataset into smaller parts. During reads you need to be able to find the data. And you need to do it fast. When the database receives a query - "Give me records for Customer X", How does it know which node hosts the data ? Does the database have to send the request to all the nodes ? That would be quite inefficient.

The goal is thus to partition data and query it efficiently. Another goal is to ensure that distribution of data between partitions is even. You do not want a situation where partition 1 has 70% of the data and the other 5 partitions has the remaining 30%. This will overload partition 1 ( known as a hot spot) and you lose the benefits of partitioning.

2 strategies are commonly used for database partitioning.

2.1.1 Range based partitioning



Database records are generally stored sorted based on the primary key.

Initially there is one partition with zero records.

As clients write to the database, the size of the partition increases. When it reaches a certain size say 10MB or 64MB, it is split into two partitions.

Each partition may be assigned to a different node.

This process is repeated as more data is added and partitions grow. If data is deleted and partitions shrink, then small partitions can be merged

To efficiently query data, the database needs to do some book keeping

-- which key range is in which partition

-- which partition is at which node

Starting with 1 partition and 1 node is not efficient for obvious reasons and databases typically start with configured number of partitions or a number of partitions proportional to the number of nodes.

To balance the load on nodes, partitions may need to be moved between nodes.

This is the strategy used by HBase, CockroachDb, MongoDb.

2.1.2 Hash based partitioning



The hash value calculated from the key is used to determine the location where the record can be stored.

The wrong way to determine the node is by using hash mod n, where n is the number of nodes. The problem with this approach is that when nodes are added or removed from the cluster, a very high percentage of the keys need to be removed.

A better approach is to start with a fixed number of partitions , way more that the number of the nodes the cluster will ever have , say for example 1000 or 10000. Partitions are logical. Hash ranges are assigned to partitions. Partitions are assigned to nodes either using numPartitions mod numNodes or other algorithms. This is shown in the top half of Figure 4. The bottom half of figure 4 visualizes the same as a ring as is done in many articles that to refers to this as consistent hashing. Think of partitions being placed on the ring. Each partition owns the key space from the position of the previous position to its position. The cluster needs to maintain a mapping of partitions to nodes. When a new node is added, the cluster can take a few partitions from existing nodes and assign to the new nodes. When a node is removed, the cluster assigns its partitions to other nodes. Looking up a key is a extra level of indirection. The hash of key maps to a partition. The partition node map tell you which node has the partition that has the key. Many studies has shown that this lead to less movement of keys between nodes as the cluster changes.

In popular press, this has the poorly understood name "consistent hashing". It is just hast hash based partitioning. Nothing consistent and nothing to do with consistency.

2.1.3 Secondary Indexes

So far we have been talking about partitioning by the primary key, also know as the primary index.

To speed up retrieval of records, databases also have secondary indexes which can be very large and might need to be partitioned.

One approach can be to keep the secondary indexes local to the node on which the primary index partition is. The advantage of this approach is that since all related rows are on the same node, inserts/updates/deletes are are all local. But queries on the secondary index requires sending queries to all nodes and aggregating the responses.

Another approach is to create a global secondary index and partition it as an independent entity. However since the secondary index partitions might be on a different partition from the primary partition, CUD operations are more expensive. Transactions might be distributed. However range queries on secondary indexes are more efficient since closer records (by sort) are on the same partition.

2.2 Unstructured data

Unstructured data refer to ordinary files that have text or binary data. Of course we are talking about large files or many large files. This is the use case for a distributed file system such as HDFS (hadoop file system) or GFS ( Amazon ). Logically the implementation of distributed file system is similar to say a linux filesystem. You view the file system as a list of blocks of fixed size. On a single node linux file system , all the disk blocks are on one node. In a distributed file system, the blocks are spread across multiple nodes. In HDFS, the name node maintains the metadata for the distributed filesystem -- given a file , which blocks make up the file and which nodes have the blocks. To create a file, the name node may assign a block on a particular node and the client talks directly to service called data node running on the target node to write to block. To read a block, the name node directs the client to the data node that hosts the block and the client reads directly from that block. But the basic algorithm is simple -- break up the file data into blocks and spread them across nodes.



Another example is partitioning of topic logs in Apache Kafka is a messaging system ( they like to call it event streaming) where producers write messages to a topic and consumers read messages from the topic. The storage for a topic is a log file. New messages are appended to the end of log file. They are read from the front. Obviously the logs can grow beyond what can fit on a node. So log is broken into partitions and distributed across multiple nodes. The broker serves producers and consumer. A Kafka cluster has multiple brokers with each broker running on a seprate node managing multiple partitions.

3.0 Rebalancing

Rebalancing is the process of moving partitions between nodes to make the distribution of load even across all the nodes. This is necessary when nodes join or leave the cluster or if the cluster starts receiving more data for certain keys. Either way rebalancing is an expensive operation that needs lot of CPU, memory and network bandwidth. It can have an impact on the performance of regular CRUD processing. In a ideal world, we would like rebalancing to happen automatically behind the scenes, without end users knowing about it. But for performance reasons listed above, that rarely works well in practice. Some databases require an admin to manually start a rebalance, which can be done during a period of low load and monitored.

4.0 Routing

How does a client know which partition to connect to ? The short answer is that the database has to maintain mapping of partition to node. In the case of hash based partitioning, hash maps to a partition which maps to a node. In the case of range based partitioning the key maps to a key range which maps to a partition which maps to a node. The partition node map is available to nodes. If a client can connect to any node, then if the node does not have the partition to handle the clients request, it can redirect the client to the appropriate node or it can get the data from the target node and return to client.

5.0 Use case

There are cases where you might not have a ready made database doing server side partitioning for you and you might need to do it yourself. Or even when the database does it for you, you still need to pick the right partition key for the partitioning to be optimal. Let us look at some large datasets and discuss how they might be logically partitioned.

Let us say you are building a twitter like system.

Say 100 million tweets of 140 character per day

100M * ( 280 bytes + 20 bytes for id, timestamp)

30 GB / day

10 TB / year

Need to store 5 years to data

Need to store and query 50 TB

How does twitter work ?

Users follow other users.

When a user connects, we need to show the most recent tweets from the users he follows.

So we need to store about 50 TB of tweets. Given a user, we need to query say the 50 most recent tweets from the users he follows.

Using commodity hardware, 50TB would need say 25 nodes. What key would you use to partition the data ?

Option 1 : hash based partitioning based on user.

To store tweets, a hash of the user is to used to locate the node where the tweet is stored. To query - for each user that the user follows, use the hash to query the node for that users tweets. A problem with this approach is that some users tweet way more than other users. Their nodes are going to be overloaded while others are idle. Unbalanced load.

Option 2 : hash based on randomly generated tweet id

The problem with this approach is the for every query, you have to query every server and aggregate the results. Inefficient for queries.

Option 3: hash based on timestamp

Timestamp is relevant because for each feed request we want the latest tweets. It would be good if tweets are sorted by timestamp. However with such as hash, at any given point in time , one server is overloaded as all the writes are going to the server.

Option 4:

Given the choices, inefficient query (option 2) is more tolerable that unbalanced load (option 1) which could crash some of the servers making the system unavailable. But we also want queries to return the most recent data (sorted by timestamp). So we can improve querying little bit by combining option 2 and 3. Assume a timestamp in epoch time in increments of 1 sec, the tweetid could be timestamp + auto incremented seq. The sequence gives randomness to the tweet id and will give uniform distribution across node.

So given a epoch 1692547708 you will have tweet ids like

1692547708 1

1692547708 2

1692547708 3

1692547708 n

Of course you are wondering how to generate unique tweet ids in a distributed system. That is a topic for another blog.

6.0 Summary

Partitioning data and spreading it across nodes is fundamental to distributed system. Special thought needs to be given to how the partitioned data can be queried efficiently. Hash based partitioning and range based partitioning are two popular strategies. Nodes can fail or additional nodes may need to added to scale. To ensure that load is even across nodes, partitions may be moved between nodes in a process called rebalancing. For best results design your partition keys so that load is distributed evenly and querying is efficient.

7.0 Related Content

CockroachDB Review


Friday, October 25, 2013

Apache Cassandra Data Model

This is an introduction to the Apache Cassandra data model. For the benefit of those not familiar with Cassandra,  it is an open source, highly scalable, highly available NoSQL database. Some key architectural highlights of  Cassandra are :

No Single point of failure.
No Master - All servers in cluster are equal.
Peer to peer communication between nodes to exchange data and configuration.
Data is partitioned across nodes based on consistent hash.
Data is automatically replicated.
(and recently added) SQL like data access model.

Cassandra has moved to a simple model that is described by a SQL like language called CQL. Lower level constructs like column family are no longer mentioned. Note that earlier column family models were without much of a schema. You needed to define column family upfront. But the column name in each family could be added as needed. The new CQL model is more schema oriented.

1.0 Tables, row keys and columns

Data is stored in Tables which has rows and columns.

Table is partitioned by the primary key, which is the row key.

For columns , CQL supports various data type like int , varchar, text, float, Set , List and many more

The CQL create statement below creates the users table with userid as the primary key.

create Table Users (
    userid varchar,
    name varchar,
    email varchar,
    address varchar,
    PRIMARY KEY(userid)
) ;

You insert rows into this table using the insert statement. Rows of table are partitioned across nodes based on the primary key.

insert into Users(userid,name,email) values('user1', 'user1 name', 'user1@gmail.com') ;

2.0 No Joins but wide columns

Let us say you want groups of users. In a RDBMS , you might have a table with columns, groupid and userid with userid being a foreign key into Users table. In a distributed database like Cassandra joins are expensive. Hence the data needs to be de-normalized. You can create a table GroupsOfUsers with groupid as the primary key. As de-normalization, in addition to having userid column, repeat some useful columns like user name and user email that you might need when looking at members of the group.

create Table GroupsOfUsers (
    groupid varchar,
    groupname varchar,
    userid varchar,
     user_name varchar,
     user_email varchar
     PRIMARY KEY(groupid,userid)
)

When you have a compound primary key, the first column, in this case group id is used as the partition key. The other columns, in this case userid is used to cluster the remaining columns by userid. Additionally, the columns in the row are sorted based of the other columns of the primary key, namely userid.

If you do ,

select * from GroupsOfUsers where groupid = "group1" ;

The result might be

group1     user1  name1 email1
group1     user2   name2 email2
group1     user3   name3  email3

Think of the above as logical rows.

Under the hood , the columns might be stored physically as 1 row with one or more columns for each user.

key        column1       column2            column3          column4          column5        column6
group1  user1:name1  user1:email1     user2 :name2   user2:email1   user3:name3  user3:email3

Each row can have as many as 2 billion columns if necessary. This is very useful in other use cases such as creating indexes or storing collections.

3.0 Collection column types

If each user had a number of  friends, in RDBMS, this would be modeled by joining with a Friends table. In Cassandra you can do that by adding a column type of Collection. The collections supported are List, Map and Set.

Alter Table Users add friends Set ;

insert into Users set friends = friends + {'friend6'} where  userid = 'user1' ;

4.0 Indexes using wide columns

Index is a data structure that enables fast look up based on a Key. In Cassandra the table is partitioned across nodes based on the primary key. By default, each node in Cassandra maintains an index for the primary keys that it host.

You can create additional indexes on other columns. For such indexed columns, Cassandra under the hood creates another table whose primary key is the indexed column.

For example , if frequently had to do a query such as

select groupid from GroupOfUsers where userid = 'user1' ;

It could be worthwhile to create an index on userid column to speed up the query.

create index userid_index on GroupOfUsers(userid) ;

Logically this would be like creating a table

create table userid_idx (
     userid varchar,
     groupid varchar,
     primary key(userid,groupid)
)

The partition key will be userid and the columns in the row will be the groups to which the user belong. This makes use of the wide column feature of Cassandra mentioned above.

5.0 Note on consistency

The original Dynamo paper on which Cassandra is based on, talks about being eventually consistent. Eventual consistency scares people even though we see it in life all the time. For example, the ATM may let you take more cash than you have in your account. When the bank reconciles ATM withdrawals with your account and realizes that you have overdrawn, it takes appropriate action.

Cassandra extends eventually consistency by offering a model of tunable consistency.

A write consistency of ANY means that it is enough for write to be written to any one node. This gives low consistency but high availability. A write consistency on ONE, TWO, THREE or QUORUM implies that writes need to be written to that many replicas. Higher the writes , more the consistency and less availability.

A read consistency of ONE, TWO, THREE, QUORUM indicates the number of replicas to be consulted before returning the most recent data from the replicas.

Note that unlike what is described in the Dynamo paper, when there is a conflict between data in replicas, Cassandra returns the most recent data and not vector clocks with different versions that clients need to resolve.

In summary, with CQL Cassandra provides a simple data model that makes it easier to model and develop applications. With CQL, Cassandra can be looked at as a viable alternative to a relational database when scalability and high availability are important. For additional details, the Cassandra documentation is at http://www.datastax.com/documentation/cassandra/2.0/webhelp/index.html