Monday, December 23, 2013

Streaming Big Data using Storm

Storm is an open source distributed system for processing streams of data. In MapReduce and Hadoop tutorial we discussed the use of Hadoop MapReduce to process large quantities of data. MapReduce is a batch system that processes input files and produces output files. When the input files are processed, the map reduce program stops. When there are new input files, the program must be restarted. In the real world, data is produced continuously and results from processing are required immediately. A batch processing system like Map reduce introduces delays that are not tolerable.

In Storm, the framework components and data processing code provided by you is continuously running and processing streams of data, there by producing results in real time.

Storm cluster has two types of nodes. The master node also called nimbus distributes code to other nodes in the cluster, assigns tasks and monitors completion of tasks. Worker nodes execute the user code. Coordination between Master and workers happens using zookeeper.

Data processing code that is to be executed on a Storm cluster is packaged into what is called a topology. A topology is a graph where each node has some code to be executed and the edges indicate the next node(s) that should process the data.

The basic abstraction for data to be processed is the stream which is a sequence of tuples. A tuple is a set of elements. A Spout is a piece of code that produces tuples. A Bolt consumes tuples produced by a spout and does some processing on it and potentially emits processed tuples that can be consumed by other bolts. Spouts and bolts are combined to form a topology which is deployed to a storm cluster for execution.




Figure 1 is a basic storm topology. Spout A is code that reads messages from a message queue and makes each message available to the topology as tuples. The output of Spout A is sent to Bolt1 and Bolt2 which do some processing on the tuples. From the outputs of Bolt1 and Bolt2, some specific fields are sent to Bolt3 and others to Bolt4. Bolts 3 and 4 might  do some aggregation and write some values to a database or another message queue.

Spouts and bolts execute in parallel. Their instances are distributed across worker nodes in the cluster.

Storm guarantees that messages produced by a spout will be completely processed through the entire topology. In Figure 1, Spout A reads a message from a message queue. Each bolt has to acknowledge that it has processed the tuples it received. The message is not deleted from the queue until acknowledgements are received by spout A. If spout A receives a failure message instead of an acknowledgement, it would re deliver the message. Bolt 1 and bolt 2 may produce additional tuples that are delivered to Bolt 3 and 4. The new tuples need to be acknowledged as well for the processing of the original message to be considered complete. Storm maintains a graph that describes how the tuples are related, so that acknowledgements are handled correctly.

You can achieve realtime MapReduce functionality by coding some bolts to function as mappers and others to function as reducers. By using field groupings, you can tell Storm to send certain tuples to a specific Bolt task that does reducer work like aggregation.

To implement a Spout, you can extend the class BaseRichSpout and override the nextTuple method. To emit tuples, you call the emit method.

public class SpoutA extends BaseRichSpout {

     private SpoutOutputCollector _collector ;

     public void open(Map conf, TopologyContext ctx, SpoutOutputCollector c) {

          _collector = c ;
     }


     public void nextTuple() {

           // get a message from a queue
           // convert to a list of values
           this._collector.emit(new Values(value1,value2,value3,value4)) ;

    }

}

To implement a Bolt, you can extend the class BaseRichBolt and override the execute method

public class Bolt1 extends BaseRichBolt {

    private OutputCollector _collector ;

    public void prepare(Map conf, TopologyContext ctx, OutputCollector c) {

         _collector = c ;

     }
   
     public void execute(Tuple input) {

           // process values in input
         
           // write to q or db
           // or emit values

          _collector.emit(value1, value2) ;

          _collector.ack(input) ;  // acknowledge
    }

}

To Build a topology and main program :


public class SimpleTopology {

      public static void main(String[] args) {

              TopologyBuilder b = new TopologyBuilder() ;
              b.setSpout("source",new SpoutA(),10) ;

              b.setBolt("bolt1", new Bolt1() ,10).shuffleGrouping("source") ;
              b.setBolt("bolt2",new Bolt2(),10).shuffleGrouping("source") ;
              // shuffleGrouping means output from source is randomly distribute to bolt tasks
              b.setBolt("bolt3",new Bolt3(),10).fieldGrouping("bolt1",new   
              Values("field1").fieldGrouping("bolt2",new Values("field2") ;
              StormTopology st = b.createTopology() ;
              Config conf = new Config() ;

               LocalCluster cluster = new LocalCluster() ;
               cluster.submitTopology("sample",conf,st) ;
               Utils.sleep(1200000) ;
               cluster.killTopology("sample") ;
               cluster.shutdown() ;
      }

}

If you need a bolt to receive only certain fields, you can use FieldGrouping instead of shuffleGrouping.

To conclude, Storm is a framework that enables realtime processing of Big Data in a distributed environment. If you are currently using Map Reduce, then Storm might enable you to do some of that processing in real time.