Sunday, December 2, 2018

Apache kafka Streams

Apache Kafka is a popular distributed messaging and streaming open source system. A key differentiator for Kafka is that its distributed broker architecture makes it highly scalable. Earlier versions of Kafka were more about messaging. I have a number of blogs on Kafka messaging some of which are listed below in the related blogs section.

This blog introduces Kafka streams which builds on messaging.

1.0 Introduction


In a traditional Kafka producer/consumer application, producers write messages to a topic and consumers consume the messages. The consumer may process the message and then write it to a database , filesystem or even discard it. For a consumer to write the message back to another topic, it has to create a producer.

Kafka streams is a higher level library that lets you build a processing pipeline on streams of messages where each stream processor reads a message, does some analytics such as counting, categorizing , aggregation etc and then potentially writes a result back to another topic.

2.0 Use Cases


Analytics from e-commerce site usage.

Analytics from any distributed application.

Distributed processing of any kind of event or data steams.

Transforming from monolithic to micro-services architecture.

Moving away from database intensive architectures.

3.0 When to use Kafka streams ?


If yours is a traditional messaging application, where you need the broker to hold on to messages till they get processed by a consumer, then the producer/consumer framework might be suitable. Here kafka competes with ActiveMQ, Websphere MQ and other traditional message brokers. Here the processing of each message is independent of other messages.

If yours is a analytics style application, where you have do different forms of counting, aggregation, slicing /dicing on a stream of data, then Kafka streams might be an appropriate library. Here the processing is for a set of messages in the stream. In this space.  Kafka competes with analytics frameworks like Apache Spark, Storm,  splunk etc.

If you were to use producers/consumers for an analytics style application, you would end up creating many producers/consumer, you would probably have to read and write a database several time, you would need to maintain in memory state and probably use a third party library for analytics primitives. Kafka streams library makes all this easier for you.

4.0 Features


Some key features of Kafka streams are:

Provides an API for stream processing primitives such counting, aggregation, categorization etc. API supports timing windows.

Message processing is one at the time. In producer/consumer , messages are generally processed in batches.

Fault tolerant local state is provided by the library. In consumers, any state has to be managed by the application.

Supports exactly once  or once and only once message message delivery. In producer/consumer, it is at least once delivery.

No need to deal will lower level messaging concepts like partitions, producers, consumers, polling.

Self contained complete library that handles both messaging and processing. No need for other third party libraries like Spark.

5.0 Concepts 


A stream is an unbounded sequence of Kafka messages on a topic.

A stream processor is a piece of code that gets a message, does some processing on it, perhaps stores some in memory state and then writes to another topic for processing by another processor. This is also known as a node.

A stream application is a set of processors where the output of one processor is further processed by one or more other processors. A stream application can depicted as graph with the processors as vertexes and streams/topics as edges.

A source processor is the first node in the topology. It has no upstream processors and gets messages from a topic. A sink processor has no downstream processors and will typically write a result somewhere.

The figure below shows a sample application topology




6.0 Programming model


2 core programming models. Below are some sample code snippets.

6.1 Streams DSL

This is a higher level API build on top of the processor API. Great for beginners.

KStream models the stream of messages. KTable is the in-memory story. You can convert from stream to table and vice versa.

Example: Simple analytics on a stream of pageviews from e-commerce site

// consume from a topic

StreamBuilder builder = new StreamBuilder() ;
KStream pageViewlines = builder.stream("someTopic") ;

// From each line extract productid and create a table key=productid,value=count
// We get page view count by product

KTable productCounts = pageViewlines.flatMapValues(value->getProduct(value))
.groupBy((key,value)->value)
.count() ;

// write the running counts to another topic or storage

productCounts.toStream.to("productCountsTopic",Produced.with(serdes.String(),serdes.Long()) ;

6.2 Processor API


same example using processor API

public class ProductFromPageViewProcessor implements Processor {

  private KeyValueStore pcountStore ;

   // Do any initialization here
   // such as loading stores or scheduling punctuate
   public void init(ProcessorContext context) {

         // get the store that will store counts
         pcountStore = (KeyValueStore)context.getStateStore("pcounts") ;

        // schedule a punctuate to to periodically send the product counts to a downstream processor
        // every 5 secs

        context.schedule(5000,PunctuationType.STREAM_TIME,(timestamp)->{

                 // iterate over all values in the pcountStore
                KeyValueIter iter = pCountStore.all()
               while(iter.hasNext()) {
                    KeyValue val = iter.next() ;
                    context.forward(val.key,val.value) ;
               }

               context.commit() ;

       } ;


     // Called once for every line or message on the consumed topic
      public void process(String k, String line) {
             String productId = getProductId(line) ;
             Long count = pCountStore.get(productId) ;
            if (count == null) {
                pCountStore.put(productId,1) ;
            } else {
               pCountStore.put(productId,count+1) ;
            }
      }

    }
}


7.0 Conclusion


As you can see from both API examples, it is about processing streams of data, doing some analytics and producing results. No need to poll or deal with lower level details like partitions and consumers. The streams model moves you away from legacy database intensive architectures, where data is written to a database first and then slow inefficient queries try to do analytics.


Some disadvantages of Kafka Streams are:

You are tied to Kafka and have to go through a Kafka topic. Other Streaming libraries like Spark are more generic and might have more analytics features.

There are no ways to pause and resume a stream. If  load suddenly increases or you want to pause the system for maintenance, there is no clean mechanism. In producer/consumer, there is explicit pause/resume API. Other streaming libraries also have some flow control mechanisms.

8.0 Related Blogs












Sunday, August 26, 2018

ElasticSearch Tutorial

ElasticSearch is a distributed , scalable, search and analytics engine.

It is similar to Apache Solr with a difference that is built to be scalable from ground up.

Like Solr, ElasticSearch is built on top of Apache Lucene which is a full text search library.

What is difference between a database and a search engine ? Read this blog.

1.0 Key features


Based on very successful search library Apache Lucene.
Provides the ablity to store and search documents.
Supports full text search.
Schema free.
Ability to analyze data - count , summarize ,aggregate etc.
Horizontally scalable and distributed architecture.
REST API support.
Easy to install and operate.
API support for several languages.

2.0 Concepts

An elasticsearch server process called a node is a single instance of a java process.

A key differentiator for elasticsearch is that it was built to be horizontally scalable from ground up.

In production environment, you generally run multiple nodes. A cluster is a collection of nodes that store your data.

A document is a unit of data that can be stored in elasticsearch. JSON is the format.

An Index is a collection of documents of a particular type. For example you might have one index for customer documents and another for product information. Index is the data structure that helps the search engine find the document fast. The document being stored is analyzed and broken into tokens based on rules. Each token is indexed - meaning - given the token -there is pointer back to the document - just like the index at the back of the book. Full text search or the ability to search on any token or partial token in the document is what differentiates a search engine from a more traditional database.

Elasticsearch documentation sometimes use the term inverted index to refer to their indexes. This author believes that the term "inverted index" is just confusing and this is nothing but an index.

In the real world, you never use just one node. You will use an elasticsearch cluster with multiple nodes. To scale horizontally, elasticsearch partitions the index into shards that get assigned to nodes. For redundancy, the shards are also replicated, so that they are available at multiple nodes.

3.0 Install ElasticSearch

Download from https://www.elastic.co/downloads/elasticsearch the latest version of elasticsearch. You will download elasticsearch-version.tar.gz.

Untar it to a directory of your choice.

4.0 Start ElasticSearch


For this tutorial we will use just a single node. The rest of the tutorial will use curl to send http requests to a elasticsearch node to demonstrate basic functions. Most of it is self explanatory.

To start elasticsearch type

install_dir/bin/elasticsearch

To confirm it is running

curl -X GET "localhost:9200/_cat/health?v"

5.0 Create an index


Let us create a index person to store person information such as name , sex , age , person etc

curl -X PUT "localhost:9200/person"{"acknowledged":true,"shards_acknowledged":true,"index":"person"}

List the indexes created so far

curl -X GET "localhost:9200/_cat/indices?v"

health status index    uuid                   pri rep docs.count docs.deleted store.size pri.store.size
yellow open   person   AJCSCg0gTXaX6N5g6malnA   5   1          0            0      1.1kb          1.1kb

6.0 Add Documents


Let us add a few documents to the person index.
In the url, _doc is the type of document. It is way to group documents of a particular type
In /person/_doc/1, the number 1 is the id of the document we provided. If we do not provide an id , elasticsearch with generate an id.
You will notice that the data elasticsearch accepts is JSON.

curl -X PUT "localhost:9200/person/_doc/1" -H 'Content-Type: application/json' -d'
{
  "name": "Big Stalk",
  "sex":"male",
  "age":41,
  "interests":"Hiking Cooking Reading"
}
'
curl -X PUT "localhost:9200/person/_doc/2" -H 'Content-Type: application/json' -d'
{
  "name": "Kelly Kidney",
  "sex":"female",
  "age":35,
  "interests":"Dancing Cooking Painting"
}
'

curl -X PUT "localhost:9200/person/_doc/3" -H 'Content-Type: application/json' -d'
{
  "name": "Marco Dill",
  "sex":"male",
  "age":26,
  "interests":"Sports Reading Painting"
}
'

curl -X PUT "localhost:9200/person/_doc/4" -H 'Content-Type: application/json' -d'
{
  "name": "Missy Ketchat",
  "sex":"female",
  "age":22,
  "interests":"Singing Cooking Dancing"
}
'

curl -X PUT "localhost:9200/person/_doc/5" -H 'Content-Type: application/json' -d'
{
  "name": "Hal Spito",
  "sex":"male",
  "age":31,
  "interests":"Sports Singing Hiking"
}

'

7.0 Search or Query

The query can be provided either as a query parameter or in the body of a GET. Yes, Elasticsearch accepts query data in the body of a GET request. 


7.1 Query string example


To retrieve all documents:

curl -X GET "localhost:9200/person/_search?q=*"

Response is not shown to save space.

Exact match search as query string:

curl -X GET "localhost:9200/person/_search?q=sex:female"

{"took":14,"timed_out":false,"_shards":{"total":5,"successful":5,"skipped":0,"failed":0},"hits":{"total":2,"max_score":0.18232156,"hits":[{"_index":"person","_type":"_doc","_id":"2","_score":0.18232156,"_source":
{
  "name": "Kelly Kidney",
  "sex":"female",
  "age":35,
  "interests":"Dancing Cooking Painting"
}
},{"_index":"person","_type":"_doc","_id":"4","_score":0.18232156,"_source":
{
  "name": "Missy Ketchat",
  "sex":"female",
  "age":22,
  "interests":"Singing Cooking Dancing"
}


7.2 GET body examples


Query syntax when sent as body is much more expressive and rich. It merits a blog of its own.
This query finds persons with singing and dancing in the interest field. This is full text search on a field.

curl -X GET "localhost:9200/person/_search" -H 'Content-Type: application/json' -d'
{
  "query": {
    "bool": {
      "should": [
        { "match": { "interests": "singing" } },
        { "match": { "interests": "dancing" } }
      ]
    }
  }
}'

{"took":15,"timed_out":false,"_shards":{"total":5,"successful":5,"skipped":0,"failed":0},"hits":{"total":3,"max_score":0.87546873,"hits":[{"_index":"person","_type":"_doc","_id":"4","_score":0.87546873,"_source":
{
  "name": "Missy Ketchat",
  "sex":"female",
  "age":22,
  "interests":"Singing Cooking Dancing"
}
},{"_index":"person","_type":"_doc","_id":"5","_score":0.2876821,"_source":
{
  "name": "Hal Spito",
  "sex":"male",
  "age":31,
  "interests":"Sports Singing Hiking"
}
},{"_index":"person","_type":"_doc","_id":"2","_score":0.18232156,"_source":
{
  "name": "Kelly Kidney",
  "sex":"female",
  "age":35,
  "interests":"Dancing Cooking Painting"
}

Below is a range query on a field.

curl -X GET "localhost:9200/person/_search" -H 'Content-Type: application/json' -d'
{
  "query": {
    "range": {
      "age": [
        { "gte": 30, "lte":40 }

      ]
    }
  }
}'

{"took":1,"timed_out":false,"_shards":{"total":5,"successful":5,"skipped":0,"failed":0},"hits":{"total":2,"max_score":1.0,"hits":[{"_index":"person","_type":"_doc","_id":"5","_score":1.0,"_source":
{
  "name": "Hal Spito",
  "sex":"male",
  "age":31,
  "interests":"Sports Singing Hiking"
}
},{"_index":"person","_type":"_doc","_id":"2","_score":1.0,"_source":
{
  "name": "Kelly Kidney",
  "sex":"female",
  "age":35,
  "interests":"Dancing Cooking Painting"
}
}]}}

8.0 Update a document



$curl -X POST "localhost:9200/person/_doc/5/_update" -H 'Content-Type: application/json' -d'
{
  "doc": { "name": "Hal Spito Jr" }
}

'

After executing the above update, do a search for "Jr". The above document will be returned.


9.0 Delete a document



curl -X DELETE "localhost:9200/person/_doc/1"

This will delete the document with id for 1. Any searches will not return this document anymore

10. Delete Index

curl -X DELETE "localhost:9200/person"
{"acknowledged":true}

That deletes the index we created.


11. Conclusion


This has been a brief introduction of elasticsearch just enough to get you started. There are lot of more details in each category of APIs. We will explore them in subsequent APIs. 













Saturday, June 23, 2018

Search vs Database : Do I need a search engine ?

Since the beginning of time, applications have been developed with a database at backend to store application data.

Relational databases like Oracle, Mysql etc took databases to the next level with the relation model, transaction, SQL. These are hugely successful for the last 30+ years.

In the last 10+ years, Big data databases like HBase, Cassandra, MongoDb etc arrived to solve data at scale issues which was not handled by the relational databases. These databases handled scale, high availability and replication better than relational database.

In the last 10 years, also available are search engines like Apache Solr and ElasticSearch that also store your data like a database, but offer much better search and analytics than a traditional database.

So when do you use a database and when to use a search engine ? This is what is discussed in this blog. Or do you need both ?

Some differences between a database and search engine are :

1.0 Indexes


In a database, to search efficiently, you define indexes. But then you are required to search based on index key. If you search with some other fields, the index cannot be used and the search is inefficient.

A search engine by default will index by all fields. This gives tremendous flexibility. If you add a new type of search to your application, you do not need a new index.

2.0 Full text search


A search engine excels at full text search.

Say you have document one with line "Hello from england".
And another document with line "Hello from england and europe".

A search for the term "england" will return 2 documents. A search for term "europe" will  return second document.

Databases on the other hand are more convenient for exact value search.

3.0 Flexible document format


Databases are limited in the structure of data - such row and columns or key/value pairs.

Search engines generally consume a wider variety of documents. While json is the most popular format for documents that a search engine consumes, third party libraries are available to parse word docs, pdfs etc for consumption by search engines.

4.0 Analysis and Mapping


Every document stored in a search engine goes through a process of analysis and mapping.

So if you store a document "the Hello 21 from England on 2018-06-15 *", it make get tokenized based on space, certain tokens like * or "the" could get discarded, all the other tokens made lowercase, 21 recognized as a integer, 2018-06-15 recognized as a date.

When you search, the search query goes through a similar process.

The benefit of this process is that whether you search for Hello or hello or hElLo, the document is found. Whether you search for england or UK or Britain, the document is still found. Whether you search for 2018-06-15 or 15 July 2018, the document is still found.

5.0 Write once read many times


As mentioned above, search engine is very efficient for search and or in other words better for reading.

However, the analysis and indexing and storage process for a search engine can be expensive. Update to a document could lead to reindexing.

For this reason, search engines are better suited when your documents are written once, updated rarely, but need to be searched and read many times.


6.0 Database better at OLTP


For reason mentioned above, Search engines become inefficient if the documents they store are updated frequently as would done in an online transaction processing system.

A traditional database is more suited for such usage scenarios.

Another place where a traditional database is better where ACID or even less transactional integrity is important.

7.0 Analytics


The popular open source search engines ElasticSearch and Apache Solr have done a great job making it easy do analytics - from basic counting, aggregation, summarization, faceting etc.

Analytics on data is much easier and powerful in a search engine than a database

8.0 Summary


If

your queries change frequently
your need to search on fields that change
you need to search on a large variety of fields
you have variety of document formats
you need full text search
you need analytics
your data access pattern is write/update few times but read many many times

then, a search engine in your architecture will certainly help.

Note that it does not have to be one or the other. Most modern architectures use both a database and search engine. Depending on the use case you may choose to store some data in database and other data in a search engine. Or you may choose for store your data in both a search engine for better querying  and a database for transactional integrity.

Tuesday, April 3, 2018

Tomcat vs Dropwizard

For the last 15 years, for Java web applications, Apache Tomcat has been the gold standard as web application server.

More recently, for cloud and micro services architecture, that require deployment of a large number of services, a number of newer frameworks are replacing traditional application servers like Tomcat.

One such framework is Dropwizard. Instead of giving your application to a complex application server, Dropwizard brings an embedded HTTP server Jetty into your plain Java application and significantly simplifies the development model.

While both enable you to achieve the same end goal of building Java web services and applications, they are different in many ways.

1. Infrastructure


With Tomcat, the web container infrastructure is separate from the application. Tomcat is a packaged separately and runs as it own process. The application is developed and packaged separately as a war. It is then deployed to the tomcat.

Dropwizard on the other hand is a like a library that you add as a dependency to your application. Dropwizard bundles the web server Jetty that will be embedded in your application.

2. Operating system processes


With Tomcat, there is one Java process for many applications. It is more difficult to tune the JVM for production for issues like garbage collection, since they depend on application characteristic.

With Dropwizard, there is one Java process for one application. Easier to tune the JVM. Process can be managed easily using linux tools.

3. Development model


With tomcat, you code classes as per Servlets  or JAX-RS specifications, but in the end, you produce a war file.

With Dropwizard, the application you write is a normal java application that starts from the main method.  You still code JAX-RS web resource class or Servlets (rare). But in the end you produce a simple jar and run the application by invoking the class that has the main method.

4. Monolithic vs Micro services


With Tomcat , you can deploy multiple application wars to the same JVM. This can lead to a monolithic process that is running multiple applications. Harder to manage in production as application characteristics vary.

With Dropwizard, then model is suited to building micro services. One process for one application or service. Since running is as simple as running a java class with a main method, you run one for each micro service. Easier to manage in production.

5. Class loading


In addition to JVM provided bootstrap, extension and system class loaders, Tomcat has to have application class loaders to load classes from application wars and provide isolation between applications. While many tomcat developers never deal with this, it does sometimes lead to class loading issues.

Dropwizard based applications have only the JVM provided class loaders unless the developer writes additional classloaders. This reduces complexity.

6. Debugging and integration with Ide


Some IDEs claim to be able to do it. But given the resources Tomcat takes, debugging by running tomcat in the IDE is a real pain. Remote debugging is the only real option.

With Dropwizard , you are developing just a plain JAVA application. So it real easy to run and debug the application from within the IDE.

7. Fringe benefits


In addition to Jetty, Dropwizard bundles number of other libraries like Jersey, Jackson, Guava, Logback that are necessary to web services development. It also provides a very simple yaml based configuration model for your application.

For reasons mentioned above application servers based technologies having been dying for that last few years and Tomcat is not  immune to the paradigm shift. If you are developing REST based micro-services for the cloud, Dropwizard is a compelling choice.