Monday, February 28, 2022

Quick Review: Mysql NDB cluster

 This is a quick 2 min overview of Mysql NDB Cluster. The goal is to help you decide within a minute or two, whether this is an appropriate solution for you.

Cluster of in-memory Mysql databases with a shared nothing architecture.

Consists of Mysql nodes and data nodes.

Mysql nodes are Mysql servers that get data from data nodes. Data nodes hold the data using the NDB storage engine. There are also admin nodes.

NDB nodes serve the data from memory. Data is persisted at checkpoints.

Data is partitioned and replicated.

Up to 48 data nodes and 2 replicas for each fragment of data.

ACID compliant.

READ_COMMITTED  isolation level.

Sharding of data is done automatically. No involvement of user or application is required.

Data is replicated for high availability. Node failures are handled automatically.

Clients can access data using NDB Api. Both SQL and NOSQL styles are possible.

This is not a good general purpose database. It is suitable for certain specific use cases of telecom and game but not for general OLTP.

Feels like it has too many moving parts to manage.

High performance -- it is serving data from memory.


Not a general purpose distributed databases. Unless you are in telecom or gaming or know for sure why this meets your use case, don'nt even think about it.

If you are on Mysql and want high availability, try Mysql InnoDb Cluster, which is much easier to understand and use.


Mysql Documentation for NDB cluster

Monday, February 14, 2022

Quick Review: Mysql InnoDb Cluster


This is a quick 2 min overview of Mysql InnoDb Cluster. The goal is to help you decide within a minute or two, whether this is an appropriate solution for you.

Simple HA solution for Mysql.

Built on top of MySql group replication.

It has 3 Components:

Replication: Uses existing mysql asynchronous replication capabilities. Default is Primary and secondary configuration. Writes go to master which replicates to slaves. Slaves can service reads

Mysql router: Provides routing between your application and the cluster. Supports automatic failover. If the primary dies. The router will redirect writes to the secondary that takes over.

Mysql shell: This is an advance shell that let you code and configure the cluster.

Works best over a local area network. Performance degrades over wide area networks

Easy to setup. Simple commands that are entered on the mysql shell. 

var cluster = dba.createCluster('testCluster')




Cluster elects the primary. If you want a particular server to be the primary, you can give it extra weight.

Client do not connect directly to the servers. Rather they connect to the Mysql router that provides the routing as well failover.

MySql InnoDB clusterSet provide additional resiliency by replicating data from a primary cluster to a cluster in another datacenter or location. If the primary cluster becomes available, one of the secondary cluster can become the primary.


Provides scalability for reads and some HA for Mysql deployments. Simple, easy to use solution. No sharding. Some consistency issues will there when you read from replicas that lag a little bit


Sunday, November 1, 2020

Building Globally Distributed Applications

A globally distributed application is one where the services and data for the application are partitioned and replicated across multiple regions over the globe. Popular distributed applications that everyone is familiar with are Facebook,, Gmail, Twitter, Instagram. However more and more enterprise applications are finding the need to become distributed because their user base is increasingly distributed around the globe. But not every company has the expertise of a Facebook or Amazon or Google. When going distributed, it is not enough to just spin up instances of your service on AWS or Google cloud on various regions. There are issues related to data that must be addressed for the application to work correctly. While consumer centric social media applications can tolerate some correctness issues or lags in data, the same might not be true for enterprise applications. This blog discusses the data and database issues related to a globally distributed application. Lastly, we discuss 2 research papers that been around since early part of this decade, but whose relevance is increasing in recent times.

Building globally distributed applications that are scalable, highly available and consistent can be challenging. Sharding has to be managed by the application. Keep it highly available requires non database tools. When you have been on a single node database whether it is Mysql or Postgresql etc, it is tempting to scale by manual sharding or one of the clustering solutions available for those databases. It might appear easy at the beginning but the cost of managing the system increases exponentially with scale. Additionally, sharding and replication lead to consistency issues and bugs that need to be addressed. Scaling with single node databases like Mysql beyond a certain point has extremely high operational overhead.

NoSql databases such as Cassandra, Riak, MongoDB etc offer scalability and high availability but at the expense of data consistency. That might be ok for some social media or consumer applications where the dollar value of individual transaction is very small. But not in enterprise applications where the correctness of each transaction is worth several thousands of dollars. In enterprise applications, we need distributed data to behave the same way that we are used to with single node databases.

Let us look at some common correctness issues that crop up with distributed data.

Example 1 : A distributed on line store with servers in San Francisco, New York and Paris.

Each server has 2 tables products and inventory with the following data.
Inventory: (product, count):

Customer Jose connects to server in San Francisco and buys widget2 at time t1. At time t2, Customer Pierre connects to a server in Paris and also buys widget2. Assume t2 > t1 but t2-t1 is small.

Expected Behavior : Jose successfully completes transaction and gets the product. Since inventory of widget2 is now zero, Pierre’s transaction is aborted.
Actual Behavior (in an eventually consistent system): Both transactions complete. But only one of the customers gets the product. The other customer is later sent an apologetic email that widget2 is out of stock.

Example 2: A distributed document sharing system with servers in New York, London, Tokyo

Operation1: In London, User X creates a new empty document marked private.
Operation2. User X makes update 1 to document.
Operation3: User X deletes update 1.
Operation4: User X makes update 2.
Operation5: User X changes the document from private to public.
Due to network issues, only operations 1,2, 5 reach Tokyo. 3 and 4 do not.
In Tokyo, User Y tries to read the shared document.

Expected behavior: The document status is private and Y cannot read the document.
Actual behavior: Y is able to read the document but an incorrect version. The document has update1 which is deleted and is missing update2 which needs to be there.

The problems above are known as consistency issues. Different clients are seeing different views of the data. What is the correct view ?

Consistency here refers to C in the CAP theorem, not the C in ACID. Here Consistency means every thread in a concurrent application correctly reads the most recent write at that point in time.

How do you fix the above issues ? In a single node database, Example1 can be fixed by locking the row in the inventory table during update and Example2 is not even an issue because all the data is in one node. But in a distributed application data might be split across shards and shards replicated for high availability. User of the system might connect to any shard/server and read/write data. With NoSql databases, the application has to handle any in consistencies.

In traditional RDBMSs , database developers are given a knob called isolation level to control what concurrent threads can read. In this old blog I explain what isolation levels are. The safest isolation level is the SERIALIZABLE where the database behaves as if the transactions were executing in a serial order with no overlap, even though in reality they are executing concurrently. Most developers use the default isolation level which is generally READ_COMMITTED OR READ_REPEATABLE. In reality, these isolation levels are poorly documented and implemented differently by different vendors. The result is that in highly concurrent applications, there are consistency bugs even in traditional single node RDBMs. In a distributed database with data spread across shards and replicated for read scalability, the problem is compounded further. Most NoSql vendors punt the problem by claiming eventual consistency, meaning if there are no writes for a while, eventually all reads on all nodes will read the last write.

Consistency is often confused with isolation, which describes how the database behave under concurrent execution of the transactions. At the safest isolation level, the database behaves as if the transactions were executing in serial order, even though in reality they are executing concurrently. At the safest consistency level, every thread in a concurrent application correctly reads the most recent write. But most database documentations are not clear on how to achieve this in an application.

The problems in examples 1 and 2 would not occur if those applications/databases had the notion of a global transaction order with respect to real time. In example 1, Pierre’s transaction at t2 should see the inventory as 0 because a transaction at t1 <t2 set it to zero. In example 2, Y should only be able to read upto operation2 . It should not be able to read operation5 without operations 3,4 which occured before 5.

In database literature, the term for this requirement is called “Strict Serializability” or sometimes “external consistency”. Since this technical definitions can be confusing, it is often referred to as strong consistency.

2 research papers that have been around for a while provide answers on how this problems might be fixed. The papers are the Spanner paper and the Calvin paper.

Their approach is solving the problem can summarized as follows:
1. timestamp transactions with something that reflect their occurrence in real time
2. Order transactions based on timestamp
3. Commit transactions in the above order.

But the details of how they do it are significantly different. Let us look at how they do it.

Spanner paper from Google

Spanner is database built at Google and the paper describes the motivation and design of Spanner. Spanners approach involves
1. The use of atomic clocks and GPS to synchronize clocks across hosts in different regions and the true time API to give accurate time across nodes, regions or continent.
2. For a read/write transaction, spanner calls the true time API to get a timestamp. To address overlaps between transactions that are close to each other, the timestamp is assigned after locks are acquired and before they are released. 
3. The commit order equals timestamp order.
4. Read for particular timestamp is sent to any shard/replica that has the data at that timestamp.
5. Read without timestamp (latest read) are serviced by assigning a timestamp.
6. Writes that cross multiple shards use two phase commit.
And of course,
7. It can scale horizontally to 1000s of nodes by sharding.
8. Each shard is replicated.
And most importantly, 
9. Even though, it is a key value store, it provide SQL support to make it easy for application programmers.
CockroachDb and Yugabyte are 2 commercial databases based on spanner.

Calvin Paper

The Calvin paper addresses the above problem using distributed consensus protocols like Raft or Paxos. 
1. Every transaction has to first go through distributed consensus and secure a spot in a linear replication log. 
2. One can view the index in the log as the timestamp. 
3. The committed entries in the replication log are then executed in the exact same serial order by every node in the distributed database. 
4. Since the transaction log is replicated to every shard, it does not need or use two phase commit. In a transaction involving multiple shards, if a shard dies before committing a particular transaction, then on restart it just has to execute the uncommitted transaction from it replication log.
5. No dependency on wall clocks or time API.
6. No two phase commit.
7. No mention of SQL support.

 FaunaDb is an example of a database based on Calvin.

This class of databases that offer horizontal scalability on a global scale without sacrificing consistency is also called NewSql. 

In summary, if you are a building a globally distributed application that needs strong consistency, doing it on your own with SQL or NoSql database can be non trivial. Consistency is hard enough in a single node database. But on a distributed database, consistency bugs are harder to troubleshoot and even harder to fix. You might want to consider one of the NewSql databases to make life easier. Review the Spanner and Calvin papers to understand the architectural choices that are available. This will help you pick a database that is right for you. Spanner and Calvin papers have been around for almost a decade. But they have become more relevant now as real databases based on them become more popular. Most importantly understand what is consistency is and apply it, for lack of which can cause severe correctness bugs in your application. 


The Spanner paper

The Calvin paper

Consistency and Isolation

Sunday, January 20, 2019

A Microservices Introduction

Modern distributed applications are built as a suite of microservices. In this blog we discuss the characteristics of microservices. We will also compare microservices to its predecessors like SOA and monolithic applications. We point out the benefits and downsides of a microservices architecture.

1.0 Introduction

Let us start with a little bit of history and go back to late 90s or early 2000's. Web applications were monolithic. A single web container would serve the entire application. Even worse, a single web container would serve multiple applications. Not only was this not scalable, it was a development and maintenance nightmare. A single bug could bring multiple applications down. And there was an ownership issue. You had multiple teams/developers contributing code. When there was a bug, the ownership was not clear and bugs would bounce around among developers.

Around mid 2000's the new buzz word was service oriented architecture SOA. This was promoted by large web application server companies. See my blog on SOA written in 2010. SOA encouraged number of good design philosophies such as interface based programming, loosely coupled applications, asynchronous interaction. REST, XML,JSON and messaging platforms enabled SOA. SOA was a big improvement, but the tools and deployment technologies were still heavyweight.

The microservices architecture is the next step in evolution further improving the ideas from SOA.

Many dismiss microservices as another buzz word. But having developed real world applications using tools listed in section 3.0, I see real value and benefit in this architecture.

2.0 Description

The main idea around microservices is that large complex systems are easier to build, maintain and scale using independently built and owned smaller services that work together.

Each microservice is a modular fine grained application providing a specific service. Let us say you have an application that has a UI , authentication, Apis for customer info, Apis for uploading documents, Apis for analytics. You may have a microservice for the UI, a microservice for customer apis, a document upload microservice, an analytics microservice.

A microservice is fully functional.

A microservice performs one specific business or IT function.

The development of the microservice can be done independently.

A microservice runs as its own process.

A microservice communicates using common protocols such as REST/Http.

A microservice offers services via its Apis. It can communicate with other microservices using their APIs.

A microservice is deployable to production independently.

When your application has multiple microservices, each could be developed in a different or the most suitable programming language or framework suited for that service.

A microservice should scale horizontally by just running more instances of the microservice.

Testing, bug fixing, performance tuning etc on the microservice should happen independently without affecting other microservices.

The above listed characteristics make it easier to build large complex systems.

3.0 Enabling Technologies

A number of newer frameworks have made building microservices easier.

For Java programmers, Dropwizard and SpringBoot are very useful frameworks for building microservices. The old way was monolithic application servers like websphere, weblogic , jboss etc. Dropwizard and SpringBoot turn the table by embedding the web server within your java application. Development is much easier as you are developing a plain java application with a main method. The entire microservice is packaged in one jar and can be run with the java -jar command. For additional information, please read my blog comparing Dropwizard to Tomcat. For Javascript, python and other languages there are similar frameworks.

To start with microservices, a framework as mentioned above is all you need. Once you have developed and use several microservices, the following platforms may be useful.

Docker is a containerization technology that makes it easier to manage production deployments. This is of interest for a dev-ops person who has to roll out services to production.

Kubernetes is platform for automation, deployment and scaling of containerized applications.

4.0 Disadvantages

For smaller business and smaller applications, the overhead of  many microservices could be a problem. If your infrastructure is one or two $20 per month VMs on AWS (or other cloud providers) you will not have enough memory/cpu/disk for multiple microservices.

The increased network communication is a cost.

Each microservice is its own process. The remote calls have a serialization/deserialization cost.

5.0 Conclusion

Microservices are a logical next step in the evolution of  the development of complex applications.
They are a best practice. But they are not a silver bullet that solve every problem.


If you like my blogs and would like to use my services, please visit my website

Sunday, December 2, 2018

Apache kafka Streams

Apache Kafka is a popular distributed messaging and streaming open source system. A key differentiator for Kafka is that its distributed broker architecture makes it highly scalable. Earlier versions of Kafka were more about messaging. I have a number of blogs on Kafka messaging some of which are listed below in the related blogs section.

This blog introduces Kafka streams which builds on messaging.

1.0 Introduction

In a traditional Kafka producer/consumer application, producers write messages to a topic and consumers consume the messages. The consumer may process the message and then write it to a database , filesystem or even discard it. For a consumer to write the message back to another topic, it has to create a producer.

Kafka streams is a higher level library that lets you build a processing pipeline on streams of messages where each stream processor reads a message, does some analytics such as counting, categorizing , aggregation etc and then potentially writes a result back to another topic.

2.0 Use Cases

Analytics from e-commerce site usage.

Analytics from any distributed application.

Distributed processing of any kind of event or data steams.

Transforming from monolithic to micro-services architecture.

Moving away from database intensive architectures.

3.0 When to use Kafka streams ?

If yours is a traditional messaging application, where you need the broker to hold on to messages till they get processed by a consumer, then the producer/consumer framework might be suitable. Here kafka competes with ActiveMQ, Websphere MQ and other traditional message brokers. Here the processing of each message is independent of other messages.

If yours is a analytics style application, where you have do different forms of counting, aggregation, slicing /dicing on a stream of data, then Kafka streams might be an appropriate library. Here the processing is for a set of messages in the stream. In this space.  Kafka competes with analytics frameworks like Apache Spark, Storm,  splunk etc.

If you were to use producers/consumers for an analytics style application, you would end up creating many producers/consumer, you would probably have to read and write a database several time, you would need to maintain in memory state and probably use a third party library for analytics primitives. Kafka streams library makes all this easier for you.

4.0 Features

Some key features of Kafka streams are:

Provides an API for stream processing primitives such counting, aggregation, categorization etc. API supports timing windows.

Message processing is one at the time. In producer/consumer , messages are generally processed in batches.

Fault tolerant local state is provided by the library. In consumers, any state has to be managed by the application.

Supports exactly once  or once and only once message message delivery. In producer/consumer, it is at least once delivery.

No need to deal will lower level messaging concepts like partitions, producers, consumers, polling.

Self contained complete library that handles both messaging and processing. No need for other third party libraries like Spark.

5.0 Concepts 

A stream is an unbounded sequence of Kafka messages on a topic.

A stream processor is a piece of code that gets a message, does some processing on it, perhaps stores some in memory state and then writes to another topic for processing by another processor. This is also known as a node.

A stream application is a set of processors where the output of one processor is further processed by one or more other processors. A stream application can depicted as graph with the processors as vertexes and streams/topics as edges.

A source processor is the first node in the topology. It has no upstream processors and gets messages from a topic. A sink processor has no downstream processors and will typically write a result somewhere.

The figure below shows a sample application topology

6.0 Programming model

2 core programming models. Below are some sample code snippets.

6.1 Streams DSL

This is a higher level API build on top of the processor API. Great for beginners.

KStream models the stream of messages. KTable is the in-memory story. You can convert from stream to table and vice versa.

Example: Simple analytics on a stream of pageviews from e-commerce site

// consume from a topic

StreamBuilder builder = new StreamBuilder() ;
KStream pageViewlines ="someTopic") ;

// From each line extract productid and create a table key=productid,value=count
// We get page view count by product

KTable productCounts = pageViewlines.flatMapValues(value->getProduct(value))
.count() ;

// write the running counts to another topic or storage"productCountsTopic",Produced.with(serdes.String(),serdes.Long()) ;

6.2 Processor API

same example using processor API

public class ProductFromPageViewProcessor implements Processor {

  private KeyValueStore pcountStore ;

   // Do any initialization here
   // such as loading stores or scheduling punctuate
   public void init(ProcessorContext context) {

         // get the store that will store counts
         pcountStore = (KeyValueStore)context.getStateStore("pcounts") ;

        // schedule a punctuate to to periodically send the product counts to a downstream processor
        // every 5 secs


                 // iterate over all values in the pcountStore
                KeyValueIter iter = pCountStore.all()
               while(iter.hasNext()) {
                    KeyValue val = ;
                    context.forward(val.key,val.value) ;

               context.commit() ;

       } ;

     // Called once for every line or message on the consumed topic
      public void process(String k, String line) {
             String productId = getProductId(line) ;
             Long count = pCountStore.get(productId) ;
            if (count == null) {
                pCountStore.put(productId,1) ;
            } else {
               pCountStore.put(productId,count+1) ;


7.0 Conclusion

As you can see from both API examples, it is about processing streams of data, doing some analytics and producing results. No need to poll or deal with lower level details like partitions and consumers. The streams model moves you away from legacy database intensive architectures, where data is written to a database first and then slow inefficient queries try to do analytics.

Some disadvantages of Kafka Streams are:

You are tied to Kafka and have to go through a Kafka topic. Other Streaming libraries like Spark are more generic and might have more analytics features.

There are no ways to pause and resume a stream. If  load suddenly increases or you want to pause the system for maintenance, there is no clean mechanism. In producer/consumer, there is explicit pause/resume API. Other streaming libraries also have some flow control mechanisms.

8.0 Related Blogs

Sunday, August 26, 2018

ElasticSearch Tutorial

ElasticSearch is a distributed , scalable, search and analytics engine.

It is similar to Apache Solr with a difference that is built to be scalable from ground up.

Like Solr, ElasticSearch is built on top of Apache Lucene which is a full text search library.

What is difference between a database and a search engine ? Read this blog.

1.0 Key features

Based on very successful search library Apache Lucene.
Provides the ablity to store and search documents.
Supports full text search.
Schema free.
Ability to analyze data - count , summarize ,aggregate etc.
Horizontally scalable and distributed architecture.
REST API support.
Easy to install and operate.
API support for several languages.

2.0 Concepts

An elasticsearch server process called a node is a single instance of a java process.

A key differentiator for elasticsearch is that it was built to be horizontally scalable from ground up.

In production environment, you generally run multiple nodes. A cluster is a collection of nodes that store your data.

A document is a unit of data that can be stored in elasticsearch. JSON is the format.

An Index is a collection of documents of a particular type. For example you might have one index for customer documents and another for product information. Index is the data structure that helps the search engine find the document fast. The document being stored is analyzed and broken into tokens based on rules. Each token is indexed - meaning - given the token -there is pointer back to the document - just like the index at the back of the book. Full text search or the ability to search on any token or partial token in the document is what differentiates a search engine from a more traditional database.

Elasticsearch documentation sometimes use the term inverted index to refer to their indexes. This author believes that the term "inverted index" is just confusing and this is nothing but an index.

In the real world, you never use just one node. You will use an elasticsearch cluster with multiple nodes. To scale horizontally, elasticsearch partitions the index into shards that get assigned to nodes. For redundancy, the shards are also replicated, so that they are available at multiple nodes.

3.0 Install ElasticSearch

Download from the latest version of elasticsearch. You will download elasticsearch-version.tar.gz.

Untar it to a directory of your choice.

4.0 Start ElasticSearch

For this tutorial we will use just a single node. The rest of the tutorial will use curl to send http requests to a elasticsearch node to demonstrate basic functions. Most of it is self explanatory.

To start elasticsearch type


To confirm it is running

curl -X GET "localhost:9200/_cat/health?v"

5.0 Create an index

Let us create a index person to store person information such as name , sex , age , person etc

curl -X PUT "localhost:9200/person"{"acknowledged":true,"shards_acknowledged":true,"index":"person"}

List the indexes created so far

curl -X GET "localhost:9200/_cat/indices?v"

health status index    uuid                   pri rep docs.count docs.deleted store.size
yellow open   person   AJCSCg0gTXaX6N5g6malnA   5   1          0            0      1.1kb          1.1kb

6.0 Add Documents

Let us add a few documents to the person index.
In the url, _doc is the type of document. It is way to group documents of a particular type
In /person/_doc/1, the number 1 is the id of the document we provided. If we do not provide an id , elasticsearch with generate an id.
You will notice that the data elasticsearch accepts is JSON.

curl -X PUT "localhost:9200/person/_doc/1" -H 'Content-Type: application/json' -d'
  "name": "Big Stalk",
  "interests":"Hiking Cooking Reading"
curl -X PUT "localhost:9200/person/_doc/2" -H 'Content-Type: application/json' -d'
  "name": "Kelly Kidney",
  "interests":"Dancing Cooking Painting"

curl -X PUT "localhost:9200/person/_doc/3" -H 'Content-Type: application/json' -d'
  "name": "Marco Dill",
  "interests":"Sports Reading Painting"

curl -X PUT "localhost:9200/person/_doc/4" -H 'Content-Type: application/json' -d'
  "name": "Missy Ketchat",
  "interests":"Singing Cooking Dancing"

curl -X PUT "localhost:9200/person/_doc/5" -H 'Content-Type: application/json' -d'
  "name": "Hal Spito",
  "interests":"Sports Singing Hiking"


7.0 Search or Query

The query can be provided either as a query parameter or in the body of a GET. Yes, Elasticsearch accepts query data in the body of a GET request. 

7.1 Query string example

To retrieve all documents:

curl -X GET "localhost:9200/person/_search?q=*"

Response is not shown to save space.

Exact match search as query string:

curl -X GET "localhost:9200/person/_search?q=sex:female"

  "name": "Kelly Kidney",
  "interests":"Dancing Cooking Painting"
  "name": "Missy Ketchat",
  "interests":"Singing Cooking Dancing"

7.2 GET body examples

Query syntax when sent as body is much more expressive and rich. It merits a blog of its own.
This query finds persons with singing and dancing in the interest field. This is full text search on a field.

curl -X GET "localhost:9200/person/_search" -H 'Content-Type: application/json' -d'
  "query": {
    "bool": {
      "should": [
        { "match": { "interests": "singing" } },
        { "match": { "interests": "dancing" } }

  "name": "Missy Ketchat",
  "interests":"Singing Cooking Dancing"
  "name": "Hal Spito",
  "interests":"Sports Singing Hiking"
  "name": "Kelly Kidney",
  "interests":"Dancing Cooking Painting"

Below is a range query on a field.

curl -X GET "localhost:9200/person/_search" -H 'Content-Type: application/json' -d'
  "query": {
    "range": {
      "age": [
        { "gte": 30, "lte":40 }


  "name": "Hal Spito",
  "interests":"Sports Singing Hiking"
  "name": "Kelly Kidney",
  "interests":"Dancing Cooking Painting"

8.0 Update a document

$curl -X POST "localhost:9200/person/_doc/5/_update" -H 'Content-Type: application/json' -d'
  "doc": { "name": "Hal Spito Jr" }


After executing the above update, do a search for "Jr". The above document will be returned.

9.0 Delete a document

curl -X DELETE "localhost:9200/person/_doc/1"

This will delete the document with id for 1. Any searches will not return this document anymore

10. Delete Index

curl -X DELETE "localhost:9200/person"

That deletes the index we created.

11. Conclusion

This has been a brief introduction of elasticsearch just enough to get you started. There are lot of more details in each category of APIs. We will explore them in subsequent APIs.