Thursday, August 1, 2013

Making Hadoop to take input/output from MongoDB rather than HDFS.


Introduction 


90% of the data of the world has been generated in past two years and it has been growing exponentially every year. About 80% of this data is unstructured data and its useful only if you add meaning to it. So there is an intense need to analyze, store and query this data in a distributed  and scalable fashion. Hadoop is prooven to achieve this, but there is a limitation that all the data should be stored into Hadoop HDFS beforehand ( This has its advantages though like data locality, fault tolerance etc etc ). So there is an overhead of transferring all the data to HDFS. Coming to databases for storing humongous data, I don’t want to discuss the advantages/disadvantages of using SQL or NoSQL because that goes way beyond the scope of this article. NoSQL databases are gaining traction in storing and querying humongous data in a scalable and efficient way. One of the famous NoSQL databases is MongoDB. MongoDB is a NoSQL database that provides high availability of data with significant performance (latency and throughput).


So the question is can we integrate Hadoop and MongoDB? Can we make Hadoop take input directly from MongoDB and write/update output back into MongoDB. This would make the application distributed, scalable both in terms of computation and storage. This enables processing of large datasets (stored in a Mongo cluster) in a Hadoop cluster using Map -Reduce programming model.



Article goal

As you know Hadoop communicate with HDFS (Hadoop distributed file system) for getting input and storing output. In this article I am going to explain How to replace HDFS with MongoDB using Mongo-Hadoop adapter. Most importantly is  How to update the collection with the analyzed data from Hadoop. 


Example


Driver class code:

JobClient client = new JobClient();
JobConf conf = new JobConf(Driver.class);
MongoConfigUtil.setInputURI(conf, "mongodb://ip-address/
databasename.collectionaname");
MongoConfigUtil.setOutputURI(conf, "mongodb://ip-address/databasename.collectionaname");

/*Mapper  configuration */
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);



job.setInputFormatClass(MongoInputFormat.class);
job.setOutputFormatClass(MongoOutputFormat.class );
               
/*Reducer  configuration*/
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(MongoUpdateWritable.class);




Mongo database configuration

In order to replace the HDFS with Mongo database you have to give input and output uri.
MongoConfigUtil.setInputURI(conf, "mongodb://ip-address/databasename.collectionaname");
MongoConfigUtil.setOutputURI(conf, "mongodb://ip-address/databasename.collectionaname"); 


mongodb://<ip-address/databasename.collectionaname>

ip-adddress: location of mongo db server.

database name: Name of database.

collection name:name of collection in which you want to perform input and output.

 

job.setInputFormatClass(MongoInputFormat.class);

Set input format class as  MongoInputFormat.class.Hadoop will pass the Mongodb object as input to mapper.

job.setOutputFormatClass(MongoOutputFormat.class );

Set output format class as  MongoOutputFormat.class.Reducer should store the object which is sub class of MongoOutput Format class.


Mapper configuration

Provide the type of output key and value class,which passes to reducer class.
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);


Reducer configuration

Provide the type of Key and Value object which is going to store in database. Here in the output class (MongoUpdateWritable) you need to define in the object whether it is going to insert new fields (key-values) in the document or update already existing fields in the document.

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(MongoUpdateWritable.class);

Updation logic inside Reducer class:
BasicBSONObject query=new BasicBSONObject();
query.append("<attribute name>", key);
BasicBSONObject update=new BasicBSONObject();
List<String> finalValues=new ArrayList<String>();
update.append("$set", new BasicBSONObject().append("<attribute name>",finalValues));
MongoUpdateWritable muw=new MongoUpdateWritable(query,update,false,true);
 
 
contex.write(key, muw);