Sunday, December 3, 2017

MongoDb Query tutorial and cheatsheet


Mongodb querying is easy and very powerful. But it is handy to have a cheatsheet around when digging for data. In this tutorial, we list and describe some simple useful MongoDB queries.

If you are new to Mongodb, you can read my mongodb introduction.

At the bottom of this page, there is some example json representing some customers.

Copy that to a file say customer.json.

Import into your mongodb database using the command


mongoimport --db yourtestdb --collection customer --file customer.json


1. Find all documents in a collection


> db.customer.find()
{ "_id" : ObjectId("5a22eae84427950fd314ccca"), "firstname" : "Dana", "lastname" : "Dealer", "age" : 60, "sex" : "F", "status" : "Y", "address" : { "city" : "Seattle", "state" : "WA" }, "favorites" : [ "yellow", "orange" ], "recent" : [ { "product" : "p5", "price" : 110 }, { "product" : "p2", "price" : 66 } ] }
{ "_id" : ObjectId("5a22eae84427950fd314cccb"), "firstname" : "Dan", "lastname" : "RunsFra", "age" : 23, "sex" : "M", "status" : "N", "address" : { "city" : "LOS Angeles", "state" : "CA" }, "favorites" : [ "red", "organge" ], "recent" : [ { "product" : "p1", "price" : 85 }, { "product" : "p4", "price" : 8 } ] }
{ "_id" : ObjectId("5a22eae84427950fd314cccc"), "firstname" : "Mike", "lastname" : "North", "age" : 45, "sex" : "M", "status" : "Y", "address" : { "city" : "burlingame", "state" : "CA" }, "favorites" : [ "red", "blue" ], "recent" : [ { "product" : "p1", "price" : 85 }, { "product" : "p2", "price" : 66 } ] }

> 

2. Find all documents based on 1 field equality



> db.customer.find({"lastname":"Dealer"})

{ "_id" : ObjectId("5a22eae84427950fd314ccca"), "firstname" : "Dana", "lastname" : "Dealer", "age" : 60, "sex" : "F", "status" : "Y", "address" : { "city" : "Seattle", "state" : "WA" }, "favorites" : [ "yellow", "orange" ], "recent" : [ { "product" : "p5", "price" : 110 }, { "product" : "p2", "price" : 66 } ] }

3.  Find all documents based on multiple fields AND


AND is implicit

> db.customer.find({"firstname":"Dana","lastname":"Dealer"})

{ "_id" : ObjectId("5a22eae84427950fd314ccca"), "firstname" : "Dana", "lastname" : "Dealer", "age" : 60, "sex" : "F", "status" : "Y", "address" : { "city" : "Seattle", "state" : "WA" }, "favorites" : [ "yellow", "orange" ], "recent" : [ { "product" : "p5", "price" : 110 }, { "product" : "p2", "price" : 66 } ] }

Same query with explicit $and operator

> db.customer.find({$and : [{"firstname":"Dana"},{"lastname":"Dealer"}]})

{ "_id" : ObjectId("5a22eae84427950fd314ccca"), "firstname" : "Dana", "lastname" : "Dealer", "age" : 60, "sex" : "F", "status" : "Y", "address" : { "city" : "Seattle", "state" : "WA" }, "favorites" : [ "yellow", "orange" ], "recent" : [ { "product" : "p5", "price" : 110 }, { "product" : "p2", "price" : 66 } ] }

4. Multiple fields OR


db.customer.find({$or : [{"sex":"F"},{status:"N"}]})
{ "_id" : ObjectId("5a22eae84427950fd314ccca"), "firstname" : "Dana", "lastname" : "Dealer", "age" : 60, "sex" : "F", "status" : "Y", "address" : { "city" : "Seattle", "state" : "WA" }, "favorites" : [ "yellow", "orange" ], "recent" : [ { "product" : "p5", "price" : 110 }, { "product" : "p2", "price" : 66 } ] }

{ "_id" : ObjectId("5a22eae84427950fd314cccb"), "firstname" : "Dan", "lastname" : "RunsFra", "age" : 23, "sex" : "M", "status" : "N", "address" : { "city" : "LOS Angeles", "state" : "CA" }, "favorites" : [ "red", "organge" ], "recent" : [ { "product" : "p1", "price" : 85 }, { "product" : "p4", "price" : 8 } ] }

5. Comparison operator


db.customer.find({"age":{$lt:30}}   )

{ "_id" : ObjectId("5a22eae84427950fd314cccb"), "firstname" : "Dan", "lastname" : "RunsFra", "age" : 23, "sex" : "M", "status" : "N", "address" : { "city" : "LOS Angeles", "state" : "CA" }, "favorites" : [ "red", "organge" ], "recent" : [ { "product" : "p1", "price" : 85 }, { "product" : "p4", "price" : 8 } ] }

db.customer.find({"age":{$gt:50}}   )

{ "_id" : ObjectId("5a22eae84427950fd314ccca"), "firstname" : "Dana", "lastname" : "Dealer", "age" : 60, "sex" : "F", "status" : "Y", "address" : { "city" : "Seattle", "state" : "WA" }, "favorites" : [ "yellow", "orange" ], "recent" : [ { "product" : "p5", "price" : 110 }, { "product" : "p2", "price" : 66 } ] }


6. Embedded document nested field



db.customer.find({"address.state":"CA"})
{ "_id" : ObjectId("5a22eae84427950fd314cccb"), "firstname" : "Dan", "lastname" : "RunsFra", "age" : 23, "sex" : "M", "status" : "N", "address" : { "city" : "LOS Angeles", "state" : "CA" }, "favorites" : [ "red", "orange" ], "recent" : [ { "product" : "p1", "price" : 85 }, { "product" : "p4", "price" : 8 } ] }

{ "_id" : ObjectId("5a22eae84427950fd314cccc"), "firstname" : "Mike", "lastname" : "North", "age" : 45, "sex" : "M", "status" : "Y", "address" : { "city" : "burlingame", "state" : "CA" }, "favorites" : [ "red", "blue" ], "recent" : [ { "product" : "p1", "price" : 85 }, { "product" : "p2", "price" : 66 } ] }

7. Array element


db.customer.find({"favorites":"blue"})

{ "_id" : ObjectId("5a22eae84427950fd314cccc"), "firstname" : "Mike", "lastname" : "North", "age" : 45, "sex" : "M", "status" : "Y", "address" : { "city" : "burlingame", "state" : "CA" }, "favorites" : [ "red", "blue" ], "recent" : [ { "product" : "p1", "price" : 85 }, { "product" : "p2", "price" : 66 } ] }

8. Array of embedded docs


db.customer.find({"recent.price":{$gt:90}})

{ "_id" : ObjectId("5a22eae84427950fd314ccca"), "firstname" : "Dana", "lastname" : "Dealer", "age" : 60, "sex" : "F", "status" : "Y", "address" : { "city" : "Seattle", "state" : "WA" }, "favorites" : [ "yellow", "orange" ], "recent" : [ { "product" : "p5", "price" : 110 }, { "product" : "p2", "price" : 66 } ] }


9. Project only certain fields - such as only lastname


db.customer.find({},{"lastname":1})
{ "_id" : ObjectId("5a22eae84427950fd314ccca"), "lastname" : "Dealer" }
{ "_id" : ObjectId("5a22eae84427950fd314cccb"), "lastname" : "RunsFra" }
{ "_id" : ObjectId("5a22eae84427950fd314cccc"), "lastname" : "North" }



10. Sort


Ascending by age

db.customer.find({}).sort({"age":1})
{ "_id" : ObjectId("5a22eae84427950fd314cccb"), "firstname" : "Dan", "lastname" : "RunsFra", "age" : 23, "sex" : "M", "status" : "N", "address" : { "city" : "LOS Angeles", "state" : "CA" }, "favorites" : [ "red", "organge" ], "recent" : [ { "product" : "p1", "price" : 85 }, { "product" : "p4", "price" : 8 } ] }
{ "_id" : ObjectId("5a22eae84427950fd314cccc"), "firstname" : "Mike", "lastname" : "North", "age" : 45, "sex" : "M", "status" : "Y", "address" : { "city" : "burlingame", "state" : "CA" }, "favorites" : [ "red", "blue" ], "recent" : [ { "product" : "p1", "price" : 85 }, { "product" : "p2", "price" : 66 } ] }
{ "_id" : ObjectId("5a22eae84427950fd314ccca"), "firstname" : "Dana", "lastname" : "Dealer", "age" : 60, "sex" : "F", "status" : "Y", "address" : { "city" : "Seattle", "state" : "WA" }, "favorites" : [ "yellow", "orange" ], "recent" : [ { "product" : "p5", "price" : 110 }, { "product" : "p2", "price" : 66 } ] }

Descending by age 

db.customer.find({}).sort({"age":-1})
{ "_id" : ObjectId("5a22eae84427950fd314ccca"), "firstname" : "Dana", "lastname" : "Dealer", "age" : 60, "sex" : "F", "status" : "Y", "address" : { "city" : "Seattle", "state" : "WA" }, "favorites" : [ "yellow", "orange" ], "recent" : [ { "product" : "p5", "price" : 110 }, { "product" : "p2", "price" : 66 } ] }
{ "_id" : ObjectId("5a22eae84427950fd314cccc"), "firstname" : "Mike", "lastname" : "North", "age" : 45, "sex" : "M", "status" : "Y", "address" : { "city" : "burlingame", "state" : "CA" }, "favorites" : [ "red", "blue" ], "recent" : [ { "product" : "p1", "price" : 85 }, { "product" : "p2", "price" : 66 } ] }
{ "_id" : ObjectId("5a22eae84427950fd314cccb"), "firstname" : "Dan", "lastname" : "RunsFra", "age" : 23, "sex" : "M", "status" : "N", "address" : { "city" : "LOS Angeles", "state" : "CA" }, "favorites" : [ "red", "organge" ], "recent" : [ { "product" : "p1", "price" : 85 }, { "product" : "p4", "price" : 8 } ] }



Appendix 1 : Sample data


{
"firstname": "Mike",
"lastname": "North",
"age": 45,
"sex": "M",
"status": "Y",
"address": {
"city": "burlingame",
"state": "CA"
},
"favorites": ["red", "blue"],
"recent": [{
"product": "p1",
"price": 85
}, {
"product": "p2",
"price": 66
}]
}
{
"firstname": "Dan",
"lastname": "RunsFra",
"age": 23,
"sex": "M",
"status": "N",
"address": {
"city": "LOS Angeles",
"state": "CA"
},
"favorites": ["red", "orange"],
"recent": [{
"product": "p1",
"price": 85
}, {
"product": "p4",
"price": 8
}]
}
{
"firstname": "Dana",
"lastname": "Dealer",
"age": 60,
"sex": "F",
"status": "Y",
"address": {
"city": "Seattle",
"state": "WA"
},
"favorites": ["yellow", "orange"],
"recent": [{
"product": "p5",
"price": 110
}, {
"product": "p2",
"price": 66
}]
}


Related Blogs :

1. Mongo DB Introduction

Saturday, September 30, 2017

Cloud service vs Software as a service

Everyday we use some awesome cloud services or applications like Gmail, Whatsapp, Waze etc.

If I write a web application and put it on a server that I rent from a hosting service at $3.99 a month, is it a cloud service or is it "software as a service" ?. Or is it just a plain vanilla web application ?

Even if I am write a modern application, and it is hosted on AWS or google cloud, does that automatically make it a "cloud" application ?

Today, no software company says, we are "software as a service". Everyone says they have a cloud service.

In this blog, I describe the characteristics that makes an application a real "cloud" application.

An example of a real cloud application is Gmail. As long as I have a connection to the internet, I am always able to access my mail. I can access it from any browser, any mail client, any phone, any device. I can access my email from any place in the world. A billion other people trying to access their emails at the same time does not affect me. I can still do my email stuff. If I try to get an email that I got 10 years ago, even though I am communicating with some server on the west coast, that may not have that data, gmail will get the data from a server that has stored that email. If that server is down, gmail will get it from another server in the same data center that has a replica of the data. If the entire data center is down, gmail will get it from another data center in the same region. If the entire region is down, gmail might get my email from a server in a data center in completely different region say Europe.

The characteristics of a real cloud service are :

(1)  Location independence

A user of a cloud service must be able to use the service from any location  without any degradation in service.

If the service has just one server in mountain view, then when I travel to China, accessing it is going to be horribly slow.

The location independence comes from geographically distributing servers and replicating data to where it is served.

(2) Scale horizontally


As the service becomes popular and the number of users go up, the number of requests go up, the data size goes up, there should be no degradation in service. It should scale by adding more servers.
Load balancers will distribute requests to a clusters of servers.

(3) Highly available


Service should be available 24*7. You have data replication and redundancy built in. A failure of a server and even a data center should not lead to stoppage of service

(4) Device independence


You should be able to access the service from any device that can access the internet - browser, mobile device, IOT etc.

(5) Self healing


The service infrastructure should monitor itself , detect failures early , so that down times are minimal

(6) Commodity hardware and (open source software)


Given the scale of a real cloud service, even for the large companies, it is affordable only using commodity hardware and software.

(7) Micro services

The software is generally built as micro services that communicate using simple protocols like REST. Monolithic applications are harder to maintain and fix.

Gmail, amazon shopping website, Waze, Whatspp etc are examples of real cloud applications. Under the hood they are powered by real cloud scale infrastructures.

The good news for the rest of us building cloud applications is that we do not have to build every thing from scratch. There are 2 broad options


Option 1 : Rent physical cloud but build software and data infrastructure

First there is the physical cloud : You needs machines either physical or virtual on the internet, distributed and across many regions. This part can be rented from Cloud vendors like Amazon, Google, Microsoft and others. You will not want to build a physical cloud unless you are close to being another Google or Amazon.

Then there is the data and software part. These are the micro service you build, the distributed databases and message brokers you use. You do the management of data , the replication, the software scaling. There are many open source frameworks , databases , caches, message brokers to help.

A good approach is to build and test the software locally with characteristics listed above and then deploy to the physical cloud for production.

The advantage of this approach is the your service will work on a physical cloud from any vendor. It works even if you decide to run it off the internet or "in premise"/intranet.

Option 2: Rent platform as a service

If you prefer not to deal with infrastructure, cloud vendors have combined the physical cloud and software into "platform as a service". Google App engine or AWS lamda , RDS are examples of this.
Here the cloud vendor manages both the physical cloud and software infrastructure and you will  write just the application code. The downside of this approach is vendor lock in. This is appropriate if you do not have the relevant expertise for option 1.

Summary


In summary a "real" cloud application is one that scales horizontally and is highly available with the same quality of service  irrespective of where the user is, what device he uses or how many users are using the service at a time. Simply writing a monolithic application and putting it on amazon ec2 or google compute is not a cloud service.

However if you design and build your application with the characteristics listed above, your application is "cloud" ready. You can deploy it to a physical cloud anytime.




Saturday, September 16, 2017

Cache consistency issues in distributed applications


Your typical enterprise web application is






Going to the database for every read or write is expensive. Developers try to improve read performance by storing values in a cache like memcached or redis.

Cache is in memory storage. Performance is greatly improved by reading from memory than going to secondary storage like disk where database or files.



On reads, the application first checks cache. If the value is found in cache, it read from there. On a cache miss, the app will read from database and then update the cache so the subsequent reads do not go to the database.

On writes,the application needs to write to the database and update the cache as well, so the subsequent reads get the updated value.

The approach of using a cache to improve read performance works very well when your reads greatly outnumber writes. That is say most requests are reading ( say 80%) and few requests update the data.

Frequent writes or updates to data complicate matters. Any writes to the database need to be reflected in the cache.


1.0 Common mistakes with caches:


These problems are mostly caused by multiple clients threads (improperly) updating the cache.

1.1. Race condition between reader / writer threads

Thread 1 wants to read a value.
It goes to cache and does not find it.
It reads the value from DB

Thread 2 updates the value in DB and updates the cache

Thread 1 sets the outdated value in cache.
Until there is another update to the same value, every one is reading the outdated value.


1.2 Race condition between writer threads


Minor variation of  1.1

 At time t1, thread1 updates database value x to x1

At time t2, thread2 update database value to x2.
thread2 updates cache value to x2.

thread1 overwrites x2 to x1.

Subsequent readers are reading an incorrect value x1.

Soln : locking x in cache, update database, update cache , release lock on x
downside : locking in 2 places cache and db deadlocks

1.3  Cache not cleaned up on database rollback

This happens when cache is updated prior to database transaction commit.

thread 1 update value in db
before the transaction commits, it updates the cache
transaction rolls back
cache has outdated value

1.4 Reading before commit

This is a rare situation that could happen when cache is updated post database transaction commit.

Thread 1 is in the process of updating a value x.
x is uncommitted.
cache is not updated.

Other parts of code in Thread read the value from cache for other purposes. They reading an out dated value.

Soln: A thread that needs to reuse values it changed should store values locally and use from local until the value is committed to both database and cache.

2.0 Strategies for elimination cache race conditions :

2.1 Locking the value in cache


The strategy is

-- lock the value to be updated in cache
-- update in database
-- update in cache
-- unlock the cache lock

While this can work, the disadvantage of this approach is

-- locking twice. Database transaction does some locking. Now we have additional locking in cache. Negative for performance
-- Improper locking can lead to deadlocks

2.2 Checking timestamps and/or previous values


In the cache , in addition to value, store the update timestamp from db.
Before updating the cache, check the timestamp and only update if you have a latter timestamp.

If you do not want the overhead of storing timestamp in cache, another approach could

-- 1 previous value =   read the cache value before db operation
-- 2 do the database operation
    3 new value =   get the latest db value
-- 4 compare and swap : set new value in cache, if current cache value == previous value
-- 5  if 4 succeeded , we are done
-- 6 previous value = current cache. Goto 3

2.3 Update cache using an updater thread


Any thread with a db operation like create , update, or even a read after a cache miss, does not directly update the cache.

Instead the request to update cache is put on a queue. Another thread reads the message one by one and updates the cache.

A disadvantage is that there is time delay before the updated value is available in  cache. Also in the case of cache misses, you might see multiple messages in the queue for the same cache update.

This is the preferred solution. If you can tolerate the time delay, it can eliminate race conditions and is easy to implement.

2.4 Versioning


We can steal ideas from MVCC which is used in database

The locking strategy in 1 locks both readers and writers.

We can improve on this by not requiring reads to locks.

Readers reads the latest snapshot value.
Writers lock not the value but a copy of the value. We allow only one copy additional writers will be blocked.
When the write is done with update ( commit), the updated copy is copied to the snapshot.

You can reduce the locking on writer even further by each writer his copy. Also assign say a version or transaction id to each copy. When a transaction commits, copy the value to snapshot.

3.0 Conclusion


In summary, consistency problems can arise due to multiple threads updating a cache and the backing database. Option 3 , updating the cache using a single update thread and fix these issues. This is a simple solution that will work for most scenarios. Option 2 is a non locking technique. Option 1 locking is the least scalable.Option 4 versioning is the most work to implement.      


Tuesday, July 4, 2017

Distributed Consensus: Raft


In the Paxos blog, we discussed the distributed consensus problem and how the Paxos algorithm describes a method for a cluster of servers to achieve consensus on a decision.

However the Paxos as a protocol is hard to understand and even harder to implement.

Examples of problems that need consensus are :

- Servers needing to agree on a value, such as whether a distributed lock is acquired or not.
- Servers needing to agree on order of events.
- Server need to agree on the state of a configuration value
- Any problem where you need 100% consistency in a distributed environment.

in a highly available environment.

The raft algorithm described https://raft.github.io/ solves the same problem, It is easier to understand and implement.

Overview

The key elements of raft  are:

Leader Election
Log Replication
Consistency ( in the spec they refer to this as safety)

A server in the cluster is the leader. All other servers are followers.

The leader is elected by majority vote.

Every consensus decision begins with the leader sending a value to followers.

If a majority of followers respond having received the value, the leader commits the value and then tells all servers to commit the value.

Clients communicate with leader only.

If a leader crashes, another leader is elected. Messages between leaders and followers enable the followers to determine if leader is still alive.

If a follower does not receive messages from the leader for a certain period, it can try to become a leader by soliciting votes.

If multiple leaders try to get elected at the same time, it is possible there is no majority. In such situations, the candidates try to get elected again after a random delay.

 In Raft, time is set of sequential terms. Term is time of certain length. Leadership is for a term.

2 Main RPC messages between leader and followers :

Request Vote : sent when a candidate solicits vote.
Append Entry : sent by leader to replicate a log entry

Scenario 1 : leader election cluster start up

let us say 5 servers s1 to s5.

Every server is a follower.

No one is getting messages from a leader.

s1 and s3 decide to become leaders (called candidates) and send message to other servers to solicit vote.
Servers always vote for themselves.
s2 and s4 respond to s1.
s1 is elected leader

Scenario 2 : log replication


A client connects to a leader s1 to set x = 3.

s1 writes x=3 to its log. But its state is unchanged.

At this point the change is uncommitted.

s1 sends appendEntry message to all followers that x =3. Each follower writes that entry to log.

Followers respond to s1 that change is written to log.

When majority of followers respond, s1 commits the change. It applies the change to its state so x is now 3.

Followers are told to commit by piggybacking the last committed entry, in the next appendEntry message. Followers commit the entry by applying the change to its state.

When a change is committed, all previous changes are considered committed.

The next appendEntry message from leader to followers will include the previous committed entry. The servers can they commit any previous entries they have not yet committed.

The cluster of servers has consensus.


Scenario 3 : Leader goes down


When a leader goes down, one or more of the followers can detect that there are no messages from the leader and decide to be come a candidate by soliciting votes.

But the leader that just went down has the accurate record of  committed entries, that some of the followers might not have.

If a follower that was behind on committed entries became a leader, it could force other servers with later committed entries to overwrite their entries. That should not be allowed. Committed entries should never change.

Raft prevents this situation by requiring candidate to send with the requestVote message the term and index of the latest message it accepted from the previous leader. Each Follower rejects requestVote with a term/index lower than its highest term/index.

Since a leader only commits entries accepted by a majority of servers and a majority of servers is required to get elected, it follows that a majority or a least half of the remaining servers have accepted that highest committed entry of the leader that went down.Thus a follower that does not have the highest committed entry from the previous leader can never get elected.

Scenario 4 : Catch up for servers that have been down


It is possible for a follower to miss committed entries either because it went down or did not receive the message.

To ensure followers catch up and stay consistent with leaders, RAFT has a consistency check.

 Every append entry message also includes term and index of previous message in leaders log. If it does not match in the follower, the follower rejects the new message. When AppendEntry is accepted by a server, it means leader and server have identical entries.


When a follower rejects an appendEntry, the server retries that follower with a previous entry. This continues until the follower accepts an entry. Once an entry is accepted, the leader will again send subsequent entries that will be accepted.

Leader maintains a nextIndex  for each follower. This is the index in the log that the leader will send to the follower next.

Scenario 5 : Cluster membership changes


Cluster membership changes refers to a bunch of servers being added or removed from the cluster.
May be even the current leader is no longer in new configuration.

This needs some attention because it is possible we end up with 2 leaders and 2 majorities.

Raft takes a two phase approach

First switch to joint consensus
   -- entries committed to servers in both configuration
  --  2 leaders one from each configuration
  -- majority from each configuration needs to approve stuff

Second switch to new configuration

Leader receives request to change configuration to new.
Leader uses appendEntry to send old,new config pair to followers
Once (old,new) is committed , we are in joint consensus period.
Leader then sends appendEntry for new configuration.
Once committed, new configuration is in effect.

Summary

Consensus is required when you need 100% consistency in a distributed environment that is also highly available. Raft simplifies distributed consensus by breaking the problem into leader election and log replication. Easier to understand means easier to implement and use to solve real world problems.

A future blog will go into an implementation.

References:

1. In Search of an Understandable Consensus Algorithm by Diego Ongaro and John Ousterhout Stanford University. https://raft.github.io/raft.pdf

Related Blogs: