Sunday, December 2, 2018

Apache kafka Streams

Apache Kafka is a popular distributed messaging and streaming open source system. A key differentiator for Kafka is that its distributed broker architecture makes it highly scalable. Earlier versions of Kafka were more about messaging. I have a number of blogs on Kafka messaging some of which are listed below in the related blogs section.

This blog introduces Kafka streams which builds on messaging.

1.0 Introduction


In a traditional Kafka producer/consumer application, producers write messages to a topic and consumers consume the messages. The consumer may process the message and then write it to a database , filesystem or even discard it. For a consumer to write the message back to another topic, it has to create a producer.

Kafka streams is a higher level library that lets you build a processing pipeline on streams of messages where each stream processor reads a message, does some analytics such as counting, categorizing , aggregation etc and then potentially writes a result back to another topic.

2.0 Use Cases


Analytics from e-commerce site usage.

Analytics from any distributed application.

Distributed processing of any kind of event or data steams.

Transforming from monolithic to micro-services architecture.

Moving away from database intensive architectures.

3.0 When to use Kafka streams ?


If yours is a traditional messaging application, where you need the broker to hold on to messages till they get processed by a consumer, then the producer/consumer framework might be suitable. Here kafka competes with ActiveMQ, Websphere MQ and other traditional message brokers. Here the processing of each message is independent of other messages.

If yours is a analytics style application, where you have do different forms of counting, aggregation, slicing /dicing on a stream of data, then Kafka streams might be an appropriate library. Here the processing is for a set of messages in the stream. In this space.  Kafka competes with analytics frameworks like Apache Spark, Storm,  splunk etc.

If you were to use producers/consumers for an analytics style application, you would end up creating many producers/consumer, you would probably have to read and write a database several time, you would need to maintain in memory state and probably use a third party library for analytics primitives. Kafka streams library makes all this easier for you.

4.0 Features


Some key features of Kafka streams are:

Provides an API for stream processing primitives such counting, aggregation, categorization etc. API supports timing windows.

Message processing is one at the time. In producer/consumer , messages are generally processed in batches.

Fault tolerant local state is provided by the library. In consumers, any state has to be managed by the application.

Supports exactly once  or once and only once message message delivery. In producer/consumer, it is at least once delivery.

No need to deal will lower level messaging concepts like partitions, producers, consumers, polling.

Self contained complete library that handles both messaging and processing. No need for other third party libraries like Spark.

5.0 Concepts 


A stream is an unbounded sequence of Kafka messages on a topic.

A stream processor is a piece of code that gets a message, does some processing on it, perhaps stores some in memory state and then writes to another topic for processing by another processor. This is also known as a node.

A stream application is a set of processors where the output of one processor is further processed by one or more other processors. A stream application can depicted as graph with the processors as vertexes and streams/topics as edges.

A source processor is the first node in the topology. It has no upstream processors and gets messages from a topic. A sink processor has no downstream processors and will typically write a result somewhere.

The figure below shows a sample application topology




6.0 Programming model


2 core programming models. Below are some sample code snippets.

6.1 Streams DSL

This is a higher level API build on top of the processor API. Great for beginners.

KStream models the stream of messages. KTable is the in-memory story. You can convert from stream to table and vice versa.

Example: Simple analytics on a stream of pageviews from e-commerce site

// consume from a topic

StreamBuilder builder = new StreamBuilder() ;
KStream pageViewlines = builder.stream("someTopic") ;

// From each line extract productid and create a table key=productid,value=count
// We get page view count by product

KTable productCounts = pageViewlines.flatMapValues(value->getProduct(value))
.groupBy((key,value)->value)
.count() ;

// write the running counts to another topic or storage

productCounts.toStream.to("productCountsTopic",Produced.with(serdes.String(),serdes.Long()) ;

6.2 Processor API


same example using processor API

public class ProductFromPageViewProcessor implements Processor {

  private KeyValueStore pcountStore ;

   // Do any initialization here
   // such as loading stores or scheduling punctuate
   public void init(ProcessorContext context) {

         // get the store that will store counts
         pcountStore = (KeyValueStore)context.getStateStore("pcounts") ;

        // schedule a punctuate to to periodically send the product counts to a downstream processor
        // every 5 secs

        context.schedule(5000,PunctuationType.STREAM_TIME,(timestamp)->{

                 // iterate over all values in the pcountStore
                KeyValueIter iter = pCountStore.all()
               while(iter.hasNext()) {
                    KeyValue val = iter.next() ;
                    context.forward(val.key,val.value) ;
               }

               context.commit() ;

       } ;


     // Called once for every line or message on the consumed topic
      public void process(String k, String line) {
             String productId = getProductId(line) ;
             Long count = pCountStore.get(productId) ;
            if (count == null) {
                pCountStore.put(productId,1) ;
            } else {
               pCountStore.put(productId,count+1) ;
            }
      }

    }
}


7.0 Conclusion


As you can see from both API examples, it is about processing streams of data, doing some analytics and producing results. No need to poll or deal with lower level details like partitions and consumers. The streams model moves you away from legacy database intensive architectures, where data is written to a database first and then slow inefficient queries try to do analytics.


Some disadvantages of Kafka Streams are:

You are tied to Kafka and have to go through a Kafka topic. Other Streaming libraries like Spark are more generic and might have more analytics features.

There are no ways to pause and resume a stream. If  load suddenly increases or you want to pause the system for maintenance, there is no clean mechanism. In producer/consumer, there is explicit pause/resume API. Other streaming libraries also have some flow control mechanisms.

8.0 Related Blogs