Thursday, January 26, 2017

Apache Storm - Custom Bolt

Apache Storm is open source distributed computing system. Storm can be used for - Streaming  Data, Realtime Data Processing and Distributed Messaging ...

In one of my assignment am using Storm for streaming data (one of Storm main feature). Requirement was to stream CDR (Call Data Records) file data to HDFS. As calls originate from different regions, data records has to go to directory in HDFS corresponding to region of call origination.
eg: consider a call records from Karnataka (KA) will be written to directory: <PARETN_DIR>/state=NAME in HDFS.

Later i can define a hive table with state as partition fetch desired records.

To achieve data streaming to specific directory based on record content i chose to create a custom CDRHDFSBolt by extending HDFSBolt API provided by Storm. Except for writeTuple() method i kept all method untouched. I over wrote writeTuple() method to verify tuple content: if it has a value corresponding to desire state, then write tuple to HDFS else discard the tuple.

Bolt Class:
public class CDRHDFSBolt extends HdfsBolt{

    @Override
    public void writeTuple(Tuple tuple) throws IOException{      
        String record = tuple.getString(0);
        String statename = tuple.getString(1);
     
        if(statename.equals("NAME"))
            super.writeTuple(tuple);
     
        this.collector.ack(tuple);
    }
}

Topology Class:


     HdfsBolt bolt = CDRHdfsBolt();
     FileNameFormat fileNameFormat = new DefaultFileNameFormat()
            .withPath("/prac/storm/callrcds/statename=NAME")
            .withExtension(".txt");
     bolt.withFileNameFormat(fileNameFormat);

Note: set HDFS details to bolt.