Showing posts with label Distributed systems. Show all posts
Showing posts with label Distributed systems. Show all posts

Saturday, January 4, 2025

Dynago : A highly available distributed key value store

Introduction

Dynago is my  hobby project to implement a minimal implementation of the distributed system described the paper Dynamo : Amazons highly available key-value store.

This is the first in a series of blogs when I describe how I go about building this and post updates on the progress.

The is my way of  "#buildinpublic".

The github repository is at https://github.com/mdkhanga/dynago

Why am I doing it ?

The primary goal is learning. Learning not just for me but also for others who may read this blog and review the code.

A secondary goal is build something useful.  Perhaps I may use it somewhere or someone else might. 

It is possible that parts of the code might be reusable in another project.

Why a Dynamo clone ?

The Dynamo paper is the first paper that got me interested in distributed systems. It has been by desire to build something like that for a long time.

In the real world, DynamoDB is Amazon highly available key/value database. Apache Cassandra is also a project inspired by this paper.

Tech stack

I pick Go as the programming language. Why Go ? I have already done a couple of complex projects in Java (B+tree,  Raft protocol). So I was not interested in doing it in Java. I am not yet that familiar with Rust. And C/C++ felt like going back in time. So this is an opportunity to do a complex project in Go.

For network communication / RPC, I use GRPC instead of  programming to sockets. My Raft implementation is based on sockets. I wanted to try Grpc. If it does not work out, I can switch to sockets.

Storage engine is TBD. Early versions will be in-memory. In the future Rocksdb or another embeddable store is a possibility.

Some technologies we might learn here:

  • Go programming
  • Distributed systems 
  • Databases
  • Network programming
  • Programming servers from scratch
  • Concurrency
  • High availability

Implementation plan

I plan to implement in the following tentative order:

1. Cluster membership

Be able to start multiple servers connect and form a cluster. Gossip to detects servers joining and leaving the cluster.

2. Client API

Simple Get and Put API

Replicate to all nodes.

3. Partitioning using consistent hashing

4. Partitioning with replicas

5. Quorum based read / writes

6. Versioning

7. Hinted handoff

...... and so on

Conclusion

If you are interested in distributed systems and/or databases, I invite you to follow along.

I am open to suggestions and discussion. If you know more, I am happy to learn from you.

If you like what I am working on, please follow me on twitter/X or LinkedIn.

Saturday, July 20, 2024

Replication in modern data systems

Overview

Replication means making a copy of the data for future use in the case of failures or may be to scale.

Why is it a big deal ? We copy files for backup all the time. For static files, that do not change, making a copy is 1 copy command. But if the data is being updated by users all the time. How often do you run the command to copy. How do you keep the copy in sync with the source ?

That is the problem of replication in databases and data systems. All databases have replication built in that you can setup with a command or two. So why read or discuss it? If you are building a distributed systems that involves data, you will need to replicate data. The concepts from databases will be useful. 

While replication is most well known for its use with databases, it is also a critical part of distributed systems where the data is unstructured such as distributed file systems (HDFS) or messaging systems (Apache Kafka) 

This post covers replication in traditional single node systems as well as modern distributed systems.



Why do we need replication ?

There are several reasons why replication is needed. It is more than just taking a backup.

Redundancy

Make a copy of the data. When the main server becomes unavailable for any reason, switch to the copy. This is ensure that the data is always available.

Scalability

Your data becomes really popular and the database gets a lot of read requests and cannot keep up. So you make copies of the database and have a load balancer distribute the request across to the copies (replicas).

Geo distribution of data

Bring the data close of user. You have users in Americas, Europe and Asia. Data from americas is replicated to Europe and Asia, so users there can read data locally without making a round trip to the americas for every read.

Secondary use cases

These are lesser known and unconventional use cases. They might be done higher up in the stack at the application layer or middleware than in the database. 

Mirroring

Mirroring involves replicating the requests to the application to a copy of the entire application stack. You can think of this as application level replication.



For example, for a REST service, this involves sending the http request, not just to the production service but also to a mirror service.

The mirror service reads and writes from the mirror database. Mirror database is a previous replica that was in sync with the leader. Just before starting mirroring, it is discontinued as a replica so it does not get duplicates.

Mirroring can be used for testing large complex changes against production traffic.

Data in the mirror database is then compared with data in the production database for accuracy.

Testing

A regular database replica is used as a test database. Various kinds of tests - feature tests, performance tests, concurrency tests, scalability tests can be run with services running with the replica. This is a different use case from mirroring.

Migration

This can be used to eliminate or reduce downtimes needed for migration.

Create additional replicas.

Run migration on them.

Rollover the application services to the new database replicas.

Replication strategies

Single leader

This is the most common pattern. It shown in Figure 1.

One server is designated as the leader. The others are followers. All writes go to the leader. The leader replicates the writes to the followers.

The advantages are :

Setting up is fairly easy.

Reads become scalable. You can put a load balancer in front and distribute read requests to followers.

High availability: If the leader fails, you fail over to one of the followers and let it become the leader.

The disadvantages are :

All the writes go to one server , the leader. So this can become a bottleneck. Writes are not scaled. 

If you read from a replica that is behind on replication, you might read stale data.

Multi leader

Writes can go to more than one server.

Multi leader replication is needed when

(1) Writes and replication needs to happen across geographically distributed areas.

(2) Connectivity to single leader is not guaranteed. The is usually the case with mobile devices or laptops or when people want the ability to work offline and/or multiple devices.

In the geo distributed case, the writes go to a local local leader. The local leader not only replicates to local replicas but also to the distributed leader (who replicate to their replicas).

In the mobile case, the writes are store locally and the replicated periodically when connectivity is available.

Advantages:

Writes are also scaled.

Writes can done locally or close to clients. Better latency for writes.

Disadvantages:

Since writes happen at multiple leaders. There can be conflict. The conflicts need to be resolved.

Leaderless

In the leaderless model, all nodes are equal and no node is designated leader. Writes can go to any node and that node replicates the write to other nodes. This is the model made popular by AWS Dynamo and later adopted by Cassandra.


Consensus based replication

All the above methods have either write conflict or read consistency issues. Raft and Paxos are two well protocols for replicating log entries. Data to be replicated is modeled as a list of entries in log. The short story is that one server sends one entry or a sequence of entries to others and it is considered committed if a majority of servers acknowledge having received them. Raft has leader election but Paxos is leaderless. Raft protocol describes in detail leader election, replication, server crashes, recovery and consistency checks. The paper is a good read for anyone interested in distributed systems.

Replication Implementations

The first three techniques apply to databases which deal with structured data and are a little more complicated.

Statement based replication

In this approach, the SQL statements such as INSERT/UPDATE/DELETE etc are forwarded as they are from the leaders to the followers. While this can work in most cases, it does not work in certain cases such as timestamps or when you generate an id or a random number.

It is not efficient either. If you insert a record and then delete it, why replicate both commands ?

Write ahead log (WAL) replication

Databases first append every write to the WAL before doing anything else, before writing it to structured storage from where it will be read. WAL is used for recovery. If the database crashed, it state is reconstructed from the WAL. A recent slogan has been "The WAL is the database". Replication here involves replicating the WAL.

A disadvantage is that WAL entries contain where specific storage details like which byte in which block is to be updated. This can create compatibility issues if the leader and followers are on different versions.

Logical replication

A logical log on the other hand captures at a row, how the table was changed. You can view this as an approach somewhere between statement based and WAL replication.

Change data capture is a form of logical replication. It is used to replicate changes in a database to other third party systems. A popular use case is data warehousing where data from multiple sources is aggregated and summarized for analytics. 

Unstructured data replication

For unstructured data as in distributed file systems the unit for replication is a block of data. Data is first partitioned into blocks and each block is replicated independently.

Potential issues with replication

Replication Lag

Most of the time replication is asynchronous. Client writes to the leader and returns before any acknowledgement that it has been replicated. Synchronous replication is not viable due both performance and availability issues. A single failure can hold up all replications.

Lost write

However, one problem this creates is that if you read immediately after a write, the replica you are reading from may not yet have your last write.

Inconsistent read

If you read multiple times in quick succession ( same read) , each read may get a different result depending on which replica services the read ( as the replicas may be in different stages of replication)

Cassandra addressed this issue using quorum. CockroachDb uses a consensus protocol like Raft.

Write Conflicts

Write conflict is an issue in multi leader replication. This happens when multiple clients update the same data while talking to a different master. The database does not know which update to accept and how they should be merged. This is similar to a merge conflict in git.

An approach to handle conflicts is to store both versions on write. But on read, send both versions to the client and let the the client resolve the conflict

Replication is real world systems

The product documentation for database on replication can be quite confusing. It best to follow a tutorial or blog in the internet.

Postgres

The documentation and blogs describe it in 2 ways.

You can set it up as synchronous, asynchronous , streaming , log file based etc

And it can be WAL based or logical replication. Statement based is rarely seen.

In snapshot replication, a snapshot of the database is taken and replicated to followers.

Instead of streaming, you can also setup the replication as file based, where the WAL files are periodically shipped to followers.

In WAL replication, replication slots lets the leader track how much of the WAL is replicated to each replica. This helps the leader not discard segments not yet replicated. But this consumes resources on the leader. Replication slots need to be managed and deleted when not needed.

Mysql

The traditional way in mysql was a logical replication based on their binlog file - a binary format for logical changes.

The newer way is based on global transaction identifier (GTID) which is built on top of the binlog. It can be either statement based or row based.

Dynamo / Cassandra

In this architecture, replication is fundamental to the architecture. All you need to do is to set the replication factor to greater than 1. All servers are equal - no leader and no follower. Writes can go to any server. Partitioning is also fundamental to the architecture. The server that receives the write redirects the write to appropriate server. From here it is replicated to other servers based on the replication factor.

Consistency issues are addressed using quorum based tunable consistency. Quorum mean a majority which is (RF/2+1) agree on something. If you have replication factor (RF) 3, quorum is 2. So on a write, at least 2 nodes need to acknowledge that the write was saved. On read, at least 2 nodes need to agree on the return value. In general, to avoid inconsistencies, you want Read quorum (R)+ Write quorum (W) > RF .

CockroachDb

CockroachDB uses the Raft distributed consensus protocol to ensure that a majority of replicas are in consensus before any change is committed. This is the safest approach to ensure consistency but comes at a cost.

Apache Kafka

In Kafka, messages are sent and received from topics. Topics are split into partition. Each partition has one leader and a configurable number of replicas. Writes go to the leader which replicates to the replicas. Reads can go to the replicas. Each broker is a leader for some partitions but a follower for other partitions. Like Cassandra and CockroachDb, replication is core to the architecture and easy to setup.

Apache Hadoop (HDFS)

This applies to any distributed file system. The file is a sequence of blocks of data. HDFS has a name node and data nodes. Name node maintains a map of which data nodes have the blocks of a file. Each block is replicated to a configurable number of data nodes.

Conclusion

Replication is a critical piece of any distributed data system. It has to be part of the core architecture. It cannot come after the fact like it did in the past. While redundancy and HA are well known benefits, there are other benefits such geo distribution of data as well. It can cause some effects such as read consistency. Care should be taken to address those. Different products use different strategies. You should be familiar with the replication strategies, configuration and side effects for your data product. If you are building a new system with data, understanding how existing systems replicate and the issues they face, can help you design your replication.

Wednesday, July 3, 2024

How modern distributed systems scale by partitioning ?

 

1.0 Introduction

In the last 20 years, software systems moved to the internet and handled large volumes of data and millions of requests. Most people interact with these systems using a browser or a mobile device. At the back end, is not one powerful computer but generally a network of commodity computers. Both the processing and storage of data is spread across multiple computers. In this blog we discuss how large datasets can be stored using multiple commodity computers.

Partitioning is the process of breaking up a large dataset in parts so that each part can fit easily on the disk on one one node and be efficiently managed by each node. For very large data sets that cannot fit on 1 machine, data needs to be broken up into parts ( partition or shard). Each partition is stored on a different machine. This is just natural horizontal scaling. But most important is that, when it time to read the partitioned data, we need to be able to find (efficiently) which partition and node has the data we want to read.

Storage space is not the only benefit of partitioning. You are also spreading the compute required to read, write and process the data.

Partitioning is generally combined with replication to make the partitions highly available. But we do not discuss replication here. That is a topic for another blog.



2.0 Types of partitioning

There are 2 types of data that need to be considered: unstructured and structured.

Most discussions on partitioning discuss partitioning of data in databases ( structured data ) but not unstructured data which is outside databases in plain files. This blog discusses both unstructured data and structured data.

2.1 Structured data

The problem is more interesting for databases because it is not enough to break up the dataset into smaller parts. During reads you need to be able to find the data. And you need to do it fast. When the database receives a query - "Give me records for Customer X", How does it know which node hosts the data ? Does the database have to send the request to all the nodes ? That would be quite inefficient.

The goal is thus to partition data and query it efficiently. Another goal is to ensure that distribution of data between partitions is even. You do not want a situation where partition 1 has 70% of the data and the other 5 partitions has the remaining 30%. This will overload partition 1 ( known as a hot spot) and you lose the benefits of partitioning.

2 strategies are commonly used for database partitioning.

2.1.1 Range based partitioning



Database records are generally stored sorted based on the primary key.

Initially there is one partition with zero records.

As clients write to the database, the size of the partition increases. When it reaches a certain size say 10MB or 64MB, it is split into two partitions.

Each partition may be assigned to a different node.

This process is repeated as more data is added and partitions grow. If data is deleted and partitions shrink, then small partitions can be merged

To efficiently query data, the database needs to do some book keeping

-- which key range is in which partition

-- which partition is at which node

Starting with 1 partition and 1 node is not efficient for obvious reasons and databases typically start with configured number of partitions or a number of partitions proportional to the number of nodes.

To balance the load on nodes, partitions may need to be moved between nodes.

This is the strategy used by HBase, CockroachDb, MongoDb.

2.1.2 Hash based partitioning



The hash value calculated from the key is used to determine the location where the record can be stored.

The wrong way to determine the node is by using hash mod n, where n is the number of nodes. The problem with this approach is that when nodes are added or removed from the cluster, a very high percentage of the keys need to be removed.

A better approach is to start with a fixed number of partitions , way more that the number of the nodes the cluster will ever have , say for example 1000 or 10000. Partitions are logical. Hash ranges are assigned to partitions. Partitions are assigned to nodes either using numPartitions mod numNodes or other algorithms. This is shown in the top half of Figure 4. The bottom half of figure 4 visualizes the same as a ring as is done in many articles that to refers to this as consistent hashing. Think of partitions being placed on the ring. Each partition owns the key space from the position of the previous position to its position. The cluster needs to maintain a mapping of partitions to nodes. When a new node is added, the cluster can take a few partitions from existing nodes and assign to the new nodes. When a node is removed, the cluster assigns its partitions to other nodes. Looking up a key is a extra level of indirection. The hash of key maps to a partition. The partition node map tell you which node has the partition that has the key. Many studies has shown that this lead to less movement of keys between nodes as the cluster changes.

In popular press, this has the poorly understood name "consistent hashing". It is just hast hash based partitioning. Nothing consistent and nothing to do with consistency.

2.1.3 Secondary Indexes

So far we have been talking about partitioning by the primary key, also know as the primary index.

To speed up retrieval of records, databases also have secondary indexes which can be very large and might need to be partitioned.

One approach can be to keep the secondary indexes local to the node on which the primary index partition is. The advantage of this approach is that since all related rows are on the same node, inserts/updates/deletes are are all local. But queries on the secondary index requires sending queries to all nodes and aggregating the responses.

Another approach is to create a global secondary index and partition it as an independent entity. However since the secondary index partitions might be on a different partition from the primary partition, CUD operations are more expensive. Transactions might be distributed. However range queries on secondary indexes are more efficient since closer records (by sort) are on the same partition.

2.2 Unstructured data

Unstructured data refer to ordinary files that have text or binary data. Of course we are talking about large files or many large files. This is the use case for a distributed file system such as HDFS (hadoop file system) or GFS ( Amazon ). Logically the implementation of distributed file system is similar to say a linux filesystem. You view the file system as a list of blocks of fixed size. On a single node linux file system , all the disk blocks are on one node. In a distributed file system, the blocks are spread across multiple nodes. In HDFS, the name node maintains the metadata for the distributed filesystem -- given a file , which blocks make up the file and which nodes have the blocks. To create a file, the name node may assign a block on a particular node and the client talks directly to service called data node running on the target node to write to block. To read a block, the name node directs the client to the data node that hosts the block and the client reads directly from that block. But the basic algorithm is simple -- break up the file data into blocks and spread them across nodes.



Another example is partitioning of topic logs in Apache Kafka is a messaging system ( they like to call it event streaming) where producers write messages to a topic and consumers read messages from the topic. The storage for a topic is a log file. New messages are appended to the end of log file. They are read from the front. Obviously the logs can grow beyond what can fit on a node. So log is broken into partitions and distributed across multiple nodes. The broker serves producers and consumer. A Kafka cluster has multiple brokers with each broker running on a seprate node managing multiple partitions.

3.0 Rebalancing

Rebalancing is the process of moving partitions between nodes to make the distribution of load even across all the nodes. This is necessary when nodes join or leave the cluster or if the cluster starts receiving more data for certain keys. Either way rebalancing is an expensive operation that needs lot of CPU, memory and network bandwidth. It can have an impact on the performance of regular CRUD processing. In a ideal world, we would like rebalancing to happen automatically behind the scenes, without end users knowing about it. But for performance reasons listed above, that rarely works well in practice. Some databases require an admin to manually start a rebalance, which can be done during a period of low load and monitored.

4.0 Routing

How does a client know which partition to connect to ? The short answer is that the database has to maintain mapping of partition to node. In the case of hash based partitioning, hash maps to a partition which maps to a node. In the case of range based partitioning the key maps to a key range which maps to a partition which maps to a node. The partition node map is available to nodes. If a client can connect to any node, then if the node does not have the partition to handle the clients request, it can redirect the client to the appropriate node or it can get the data from the target node and return to client.

5.0 Use case

There are cases where you might not have a ready made database doing server side partitioning for you and you might need to do it yourself. Or even when the database does it for you, you still need to pick the right partition key for the partitioning to be optimal. Let us look at some large datasets and discuss how they might be logically partitioned.

Let us say you are building a twitter like system.

Say 100 million tweets of 140 character per day

100M * ( 280 bytes + 20 bytes for id, timestamp)

30 GB / day

10 TB / year

Need to store 5 years to data

Need to store and query 50 TB

How does twitter work ?

Users follow other users.

When a user connects, we need to show the most recent tweets from the users he follows.

So we need to store about 50 TB of tweets. Given a user, we need to query say the 50 most recent tweets from the users he follows.

Using commodity hardware, 50TB would need say 25 nodes. What key would you use to partition the data ?

Option 1 : hash based partitioning based on user.

To store tweets, a hash of the user is to used to locate the node where the tweet is stored. To query - for each user that the user follows, use the hash to query the node for that users tweets. A problem with this approach is that some users tweet way more than other users. Their nodes are going to be overloaded while others are idle. Unbalanced load.

Option 2 : hash based on randomly generated tweet id

The problem with this approach is the for every query, you have to query every server and aggregate the results. Inefficient for queries.

Option 3: hash based on timestamp

Timestamp is relevant because for each feed request we want the latest tweets. It would be good if tweets are sorted by timestamp. However with such as hash, at any given point in time , one server is overloaded as all the writes are going to the server.

Option 4:

Given the choices, inefficient query (option 2) is more tolerable that unbalanced load (option 1) which could crash some of the servers making the system unavailable. But we also want queries to return the most recent data (sorted by timestamp). So we can improve querying little bit by combining option 2 and 3. Assume a timestamp in epoch time in increments of 1 sec, the tweetid could be timestamp + auto incremented seq. The sequence gives randomness to the tweet id and will give uniform distribution across node.

So given a epoch 1692547708 you will have tweet ids like

1692547708 1

1692547708 2

1692547708 3

1692547708 n

Of course you are wondering how to generate unique tweet ids in a distributed system. That is a topic for another blog.

6.0 Summary

Partitioning data and spreading it across nodes is fundamental to distributed system. Special thought needs to be given to how the partitioned data can be queried efficiently. Hash based partitioning and range based partitioning are two popular strategies. Nodes can fail or additional nodes may need to added to scale. To ensure that load is even across nodes, partitions may be moved between nodes in a process called rebalancing. For best results design your partition keys so that load is distributed evenly and querying is efficient.

7.0 Related Content

CockroachDB Review


Sunday, June 30, 2024

CockroachDb Review: Should I use CockroachDb ?

 Overview

CockroachDb is a modern distributed database that promises linear scalability with strict serializability.

Server side sharding is automatic. Nodes can be added easily as needed. Claims to provide the SERIALIZABLE isolation level.

Most distributed databases such as Cassandra, MongoDb, HBase etc sacrifice consistency to achieve high availability. CockroachDb distinguishes itself by claiming to be distributed and the same time offer strong consistency that even single node databases do not offer.

This falls into a database category called NewSql or DistributedSQL as opposed to NoSql (Cassandra, MongoDb)

When to choose CockroachDb ?

You should choose CockroachDb if

    Your data is of a global scale.

    As data size increases, you need to scale horizontally to several nodes.

    You need data to be distributed and localized in specific geographical regions. For
    example EU data resides in Europe while US data resides in US.

    You need strong consistency. Serializable isolation level.

    You need to keep the SQL / relational data model.

    You need distributed transactions.

You may want to pass on it if

    You data size can easily fit on a node for the foreseeable future.
    You organization is more comfortable with a stable proven database. (CockroachDb is
    still maturing).
    You data model is heavily normalized and you do a lot of joins in your queries. While this
    database can support joins, it is still not recommended in a highly distributed
    environment.

Architecture

Architecture is based on Google's Spanner paper.

It is a key value store with a SQL interface on top of it.

Database is a cluster of nodes. All nodes are equal. Nodes may join and leave the cluster at any time.

Sorted map of key values pairs. Fully ordered monolithic key space. All tables/indexes go into the same key space by encoding tablename/indexname/key together.

Sharding

Key value pairs are broken up into contiguous ranges. 

When range size reaches 512 Mib (2 power 20) It is split into 2 ranges.

Each range is assigned to a node and replicated. 

If you have 1 node all the shards are in that node. To scale, you add more nodes and the shards get distributed across nodes. A minimum of 3 nodes is recommended. 

Very easily spin up node(s) and add to cluster anywhere. 

Btree like index structure used to locate shard that has a key.

Replication

Data in each range is replicated using the Raft consensus algorithm.

A minimum replication factor of 3 is needed.

This provides the high availability. Data is available as long as a majority of the nodes in the cluster are available.

Geo-partitioning

By adding a country or region to the primary key, you can limit storage to keys to a particular region. So European data can be make to reside in Europe, US data in US and so. This has 2 benefits
There is a performance benefit and data is local to its users.

It can satisfy legal requirements where data is not allowed to leave a country or region.

Read/Write

Reads

Any node can receive a request to read a key/value.

Request is forwarded to the node that is the raft leader for that table/range.

Leader returns the data to the node that requested it. Since leader returns the data, no consensus is required.

Node returns it to the client.

Writes

Any node can receive a request to write a key/value.

Request is forwarded to the node that is the raft leader for that table/range.

Leader writes the value to its log and initiates consensus with replicas for the range. When majority acknowledges, the key/value is considered committed and leader notifies the requesting node which notifies the client

Transactions

Supports transactions that spans multiple tables and rows.

Transactions can be distributed, that is span multiple nodes.

The supported isolation level is strict serializability which is the highest isolation level. Strict serializability means that not only are transactions ordered, but they are ordered as per wall clock time.
Transaction protocol is an improvement over two phase commit. In parallel, participants acquire locks and create write intents. The transaction is marked staged. When the client commits, if all locks are acquired and writes are replicated, the coordinator immediately returns success to client. In background the transaction is marked committed. This is one round trip between transaction coordinator and each participant - unlike two phase commit - which requires two round trips.

Hybrid logical clocks are used to timestamp each transaction. Timestamp is the version for MVCC.

Data Model

Clients see only the SQL row column relation model
Wire protocol is same as Postgresql wire protocol.

Performance

Efficient range scan.
Geo partitioning improves performance by locality.
Distributed SQL execution.
Distributed transactions will be slow.
Generally you do not want distributed transactions over large distances. If you build a 3 node CockroachDb cluster with 1 node in NewYork, 1 in London and 1 in San Francisco, the write latencies are going to be very high due to the round trips for RAFT and distributed transactions. The cluster topology needs to be designed appropriately to give you the lowest latency at the desired level of high availability.

Administration

Good command line tools and UI console make the the administration easy.
Since all nodes are equals, number of moving parts that need to be administered is low.

Summary

If you need a globally distributed database with strict serializability, this is definitely a database to look at. It has good pedigree. However remember that distributed databases are not drop in replacement for your traditional RDBMSs. Distributed queries especially joins and distributed transaction can be slow. So some application redesign, some denormalization is always required.

Note: Moved from heavydutysoftware.com


Sunday, November 1, 2020

Building Globally Distributed Applications

A globally distributed application is one where the services and data for the application are partitioned and replicated across multiple regions over the globe. Popular distributed applications that everyone is familiar with are Facebook, Amazon.com, Gmail, Twitter, Instagram. However more and more enterprise applications are finding the need to become distributed because their user base is increasingly distributed around the globe. But not every company has the expertise of a Facebook or Amazon or Google. When going distributed, it is not enough to just spin up instances of your service on AWS or Google cloud on various regions. There are issues related to data that must be addressed for the application to work correctly. While consumer centric social media applications can tolerate some correctness issues or lags in data, the same might not be true for enterprise applications. This blog discusses the data and database issues related to a globally distributed application. Lastly, we discuss 2 research papers that been around since early part of this decade, but whose relevance is increasing in recent times.

Building globally distributed applications that are scalable, highly available and consistent can be challenging. Sharding has to be managed by the application. Keep it highly available requires non database tools. When you have been on a single node database whether it is Mysql or Postgresql etc, it is tempting to scale by manual sharding or one of the clustering solutions available for those databases. It might appear easy at the beginning but the cost of managing the system increases exponentially with scale. Additionally, sharding and replication lead to consistency issues and bugs that need to be addressed. Scaling with single node databases like Mysql beyond a certain point has extremely high operational overhead.

NoSql databases such as Cassandra, Riak, MongoDB etc offer scalability and high availability but at the expense of data consistency. That might be ok for some social media or consumer applications where the dollar value of individual transaction is very small. But not in enterprise applications where the correctness of each transaction is worth several thousands of dollars. In enterprise applications, we need distributed data to behave the same way that we are used to with single node databases.

Let us look at some common correctness issues that crop up with distributed data.

Example 1 : A distributed on line store with servers in San Francisco, New York and Paris.

Each server has 2 tables products and inventory with the following data.
Products:(product)
 widget1
 widget2
Inventory: (product, count):
widget1,6
widget2,1

Customer Jose connects to server in San Francisco and buys widget2 at time t1. At time t2, Customer Pierre connects to a server in Paris and also buys widget2. Assume t2 > t1 but t2-t1 is small.

Expected Behavior : Jose successfully completes transaction and gets the product. Since inventory of widget2 is now zero, Pierre’s transaction is aborted.
Actual Behavior (in an eventually consistent system): Both transactions complete. But only one of the customers gets the product. The other customer is later sent an apologetic email that widget2 is out of stock.

Example 2: A distributed document sharing system with servers in New York, London, Tokyo

Operation1: In London, User X creates a new empty document marked private.
Operation2. User X makes update 1 to document.
Operation3: User X deletes update 1.
Operation4: User X makes update 2.
Operation5: User X changes the document from private to public.
Due to network issues, only operations 1,2, 5 reach Tokyo. 3 and 4 do not.
In Tokyo, User Y tries to read the shared document.

Expected behavior: The document status is private and Y cannot read the document.
Actual behavior: Y is able to read the document but an incorrect version. The document has update1 which is deleted and is missing update2 which needs to be there.

The problems above are known as consistency issues. Different clients are seeing different views of the data. What is the correct view ?

Consistency here refers to C in the CAP theorem, not the C in ACID. Here Consistency means every thread in a concurrent application correctly reads the most recent write at that point in time.

How do you fix the above issues ? In a single node database, Example1 can be fixed by locking the row in the inventory table during update and Example2 is not even an issue because all the data is in one node. But in a distributed application data might be split across shards and shards replicated for high availability. User of the system might connect to any shard/server and read/write data. With NoSql databases, the application has to handle any in consistencies.

In traditional RDBMSs , database developers are given a knob called isolation level to control what concurrent threads can read. In this old blog I explain what isolation levels are. The safest isolation level is the SERIALIZABLE where the database behaves as if the transactions were executing in a serial order with no overlap, even though in reality they are executing concurrently. Most developers use the default isolation level which is generally READ_COMMITTED OR READ_REPEATABLE. In reality, these isolation levels are poorly documented and implemented differently by different vendors. The result is that in highly concurrent applications, there are consistency bugs even in traditional single node RDBMs. In a distributed database with data spread across shards and replicated for read scalability, the problem is compounded further. Most NoSql vendors punt the problem by claiming eventual consistency, meaning if there are no writes for a while, eventually all reads on all nodes will read the last write.

Consistency is often confused with isolation, which describes how the database behave under concurrent execution of the transactions. At the safest isolation level, the database behaves as if the transactions were executing in serial order, even though in reality they are executing concurrently. At the safest consistency level, every thread in a concurrent application correctly reads the most recent write. But most database documentations are not clear on how to achieve this in an application.

The problems in examples 1 and 2 would not occur if those applications/databases had the notion of a global transaction order with respect to real time. In example 1, Pierre’s transaction at t2 should see the inventory as 0 because a transaction at t1 <t2 set it to zero. In example 2, Y should only be able to read upto operation2 . It should not be able to read operation5 without operations 3,4 which occured before 5.

In database literature, the term for this requirement is called “Strict Serializability” or sometimes “external consistency”. Since this technical definitions can be confusing, it is often referred to as strong consistency.

2 research papers that have been around for a while provide answers on how this problems might be fixed. The papers are the Spanner paper and the Calvin paper.

Their approach is solving the problem can summarized as follows:
1. timestamp transactions with something that reflect their occurrence in real time
2. Order transactions based on timestamp
3. Commit transactions in the above order.

But the details of how they do it are significantly different. Let us look at how they do it.

Spanner paper from Google

Spanner is database built at Google and the paper describes the motivation and design of Spanner. Spanners approach involves
1. The use of atomic clocks and GPS to synchronize clocks across hosts in different regions and the true time API to give accurate time across nodes, regions or continent.
2. For a read/write transaction, spanner calls the true time API to get a timestamp. To address overlaps between transactions that are close to each other, the timestamp is assigned after locks are acquired and before they are released. 
3. The commit order equals timestamp order.
4. Read for particular timestamp is sent to any shard/replica that has the data at that timestamp.
5. Read without timestamp (latest read) are serviced by assigning a timestamp.
6. Writes that cross multiple shards use two phase commit.
And of course,
7. It can scale horizontally to 1000s of nodes by sharding.
8. Each shard is replicated.
And most importantly, 
9. Even though, it is a key value store, it provide SQL support to make it easy for application programmers.
CockroachDb and Yugabyte are 2 commercial databases based on spanner.

Calvin Paper


The Calvin paper addresses the above problem using distributed consensus protocols like Raft or Paxos. 
1. Every transaction has to first go through distributed consensus and secure a spot in a linear replication log. 
2. One can view the index in the log as the timestamp. 
3. The committed entries in the replication log are then executed in the exact same serial order by every node in the distributed database. 
4. Since the transaction log is replicated to every shard, it does not need or use two phase commit. In a transaction involving multiple shards, if a shard dies before committing a particular transaction, then on restart it just has to execute the uncommitted transaction from it replication log.
5. No dependency on wall clocks or time API.
6. No two phase commit.
7. No mention of SQL support.

 FaunaDb is an example of a database based on Calvin.

This class of databases that offer horizontal scalability on a global scale without sacrificing consistency is also called NewSql. 

In summary, if you are a building a globally distributed application that needs strong consistency, doing it on your own with SQL or NoSql database can be non trivial. Consistency is hard enough in a single node database. But on a distributed database, consistency bugs are harder to troubleshoot and even harder to fix. You might want to consider one of the NewSql databases to make life easier. Review the Spanner and Calvin papers to understand the architectural choices that are available. This will help you pick a database that is right for you. Spanner and Calvin papers have been around for almost a decade. But they have become more relevant now as real databases based on them become more popular. Most importantly understand what is consistency is and apply it, for lack of which can cause severe correctness bugs in your application. 

References:

The Spanner paper

The Calvin paper

Consistency and Isolation

Sunday, August 26, 2018

ElasticSearch Tutorial

ElasticSearch is a distributed , scalable, search and analytics engine.

It is similar to Apache Solr with a difference that is built to be scalable from ground up.

Like Solr, ElasticSearch is built on top of Apache Lucene which is a full text search library.

What is difference between a database and a search engine ? Read this blog.

1.0 Key features


Based on very successful search library Apache Lucene.
Provides the ablity to store and search documents.
Supports full text search.
Schema free.
Ability to analyze data - count , summarize ,aggregate etc.
Horizontally scalable and distributed architecture.
REST API support.
Easy to install and operate.
API support for several languages.

2.0 Concepts

An elasticsearch server process called a node is a single instance of a java process.

A key differentiator for elasticsearch is that it was built to be horizontally scalable from ground up.

In production environment, you generally run multiple nodes. A cluster is a collection of nodes that store your data.

A document is a unit of data that can be stored in elasticsearch. JSON is the format.

An Index is a collection of documents of a particular type. For example you might have one index for customer documents and another for product information. Index is the data structure that helps the search engine find the document fast. The document being stored is analyzed and broken into tokens based on rules. Each token is indexed - meaning - given the token -there is pointer back to the document - just like the index at the back of the book. Full text search or the ability to search on any token or partial token in the document is what differentiates a search engine from a more traditional database.

Elasticsearch documentation sometimes use the term inverted index to refer to their indexes. This author believes that the term "inverted index" is just confusing and this is nothing but an index.

In the real world, you never use just one node. You will use an elasticsearch cluster with multiple nodes. To scale horizontally, elasticsearch partitions the index into shards that get assigned to nodes. For redundancy, the shards are also replicated, so that they are available at multiple nodes.

3.0 Install ElasticSearch

Download from https://www.elastic.co/downloads/elasticsearch the latest version of elasticsearch. You will download elasticsearch-version.tar.gz.

Untar it to a directory of your choice.

4.0 Start ElasticSearch


For this tutorial we will use just a single node. The rest of the tutorial will use curl to send http requests to a elasticsearch node to demonstrate basic functions. Most of it is self explanatory.

To start elasticsearch type

install_dir/bin/elasticsearch

To confirm it is running

curl -X GET "localhost:9200/_cat/health?v"

5.0 Create an index


Let us create a index person to store person information such as name , sex , age , person etc

curl -X PUT "localhost:9200/person"{"acknowledged":true,"shards_acknowledged":true,"index":"person"}

List the indexes created so far

curl -X GET "localhost:9200/_cat/indices?v"

health status index    uuid                   pri rep docs.count docs.deleted store.size pri.store.size
yellow open   person   AJCSCg0gTXaX6N5g6malnA   5   1          0            0      1.1kb          1.1kb

6.0 Add Documents


Let us add a few documents to the person index.
In the url, _doc is the type of document. It is way to group documents of a particular type
In /person/_doc/1, the number 1 is the id of the document we provided. If we do not provide an id , elasticsearch with generate an id.
You will notice that the data elasticsearch accepts is JSON.

curl -X PUT "localhost:9200/person/_doc/1" -H 'Content-Type: application/json' -d'
{
  "name": "Big Stalk",
  "sex":"male",
  "age":41,
  "interests":"Hiking Cooking Reading"
}
'
curl -X PUT "localhost:9200/person/_doc/2" -H 'Content-Type: application/json' -d'
{
  "name": "Kelly Kidney",
  "sex":"female",
  "age":35,
  "interests":"Dancing Cooking Painting"
}
'

curl -X PUT "localhost:9200/person/_doc/3" -H 'Content-Type: application/json' -d'
{
  "name": "Marco Dill",
  "sex":"male",
  "age":26,
  "interests":"Sports Reading Painting"
}
'

curl -X PUT "localhost:9200/person/_doc/4" -H 'Content-Type: application/json' -d'
{
  "name": "Missy Ketchat",
  "sex":"female",
  "age":22,
  "interests":"Singing Cooking Dancing"
}
'

curl -X PUT "localhost:9200/person/_doc/5" -H 'Content-Type: application/json' -d'
{
  "name": "Hal Spito",
  "sex":"male",
  "age":31,
  "interests":"Sports Singing Hiking"
}

'

7.0 Search or Query

The query can be provided either as a query parameter or in the body of a GET. Yes, Elasticsearch accepts query data in the body of a GET request. 


7.1 Query string example


To retrieve all documents:

curl -X GET "localhost:9200/person/_search?q=*"

Response is not shown to save space.

Exact match search as query string:

curl -X GET "localhost:9200/person/_search?q=sex:female"

{"took":14,"timed_out":false,"_shards":{"total":5,"successful":5,"skipped":0,"failed":0},"hits":{"total":2,"max_score":0.18232156,"hits":[{"_index":"person","_type":"_doc","_id":"2","_score":0.18232156,"_source":
{
  "name": "Kelly Kidney",
  "sex":"female",
  "age":35,
  "interests":"Dancing Cooking Painting"
}
},{"_index":"person","_type":"_doc","_id":"4","_score":0.18232156,"_source":
{
  "name": "Missy Ketchat",
  "sex":"female",
  "age":22,
  "interests":"Singing Cooking Dancing"
}


7.2 GET body examples


Query syntax when sent as body is much more expressive and rich. It merits a blog of its own.
This query finds persons with singing and dancing in the interest field. This is full text search on a field.

curl -X GET "localhost:9200/person/_search" -H 'Content-Type: application/json' -d'
{
  "query": {
    "bool": {
      "should": [
        { "match": { "interests": "singing" } },
        { "match": { "interests": "dancing" } }
      ]
    }
  }
}'

{"took":15,"timed_out":false,"_shards":{"total":5,"successful":5,"skipped":0,"failed":0},"hits":{"total":3,"max_score":0.87546873,"hits":[{"_index":"person","_type":"_doc","_id":"4","_score":0.87546873,"_source":
{
  "name": "Missy Ketchat",
  "sex":"female",
  "age":22,
  "interests":"Singing Cooking Dancing"
}
},{"_index":"person","_type":"_doc","_id":"5","_score":0.2876821,"_source":
{
  "name": "Hal Spito",
  "sex":"male",
  "age":31,
  "interests":"Sports Singing Hiking"
}
},{"_index":"person","_type":"_doc","_id":"2","_score":0.18232156,"_source":
{
  "name": "Kelly Kidney",
  "sex":"female",
  "age":35,
  "interests":"Dancing Cooking Painting"
}

Below is a range query on a field.

curl -X GET "localhost:9200/person/_search" -H 'Content-Type: application/json' -d'
{
  "query": {
    "range": {
      "age": [
        { "gte": 30, "lte":40 }

      ]
    }
  }
}'

{"took":1,"timed_out":false,"_shards":{"total":5,"successful":5,"skipped":0,"failed":0},"hits":{"total":2,"max_score":1.0,"hits":[{"_index":"person","_type":"_doc","_id":"5","_score":1.0,"_source":
{
  "name": "Hal Spito",
  "sex":"male",
  "age":31,
  "interests":"Sports Singing Hiking"
}
},{"_index":"person","_type":"_doc","_id":"2","_score":1.0,"_source":
{
  "name": "Kelly Kidney",
  "sex":"female",
  "age":35,
  "interests":"Dancing Cooking Painting"
}
}]}}

8.0 Update a document



$curl -X POST "localhost:9200/person/_doc/5/_update" -H 'Content-Type: application/json' -d'
{
  "doc": { "name": "Hal Spito Jr" }
}

'

After executing the above update, do a search for "Jr". The above document will be returned.


9.0 Delete a document



curl -X DELETE "localhost:9200/person/_doc/1"

This will delete the document with id for 1. Any searches will not return this document anymore

10. Delete Index

curl -X DELETE "localhost:9200/person"
{"acknowledged":true}

That deletes the index we created.


11. Conclusion


This has been a brief introduction of elasticsearch just enough to get you started. There are lot of more details in each category of APIs. We will explore them in subsequent APIs.