Sunday, June 4, 2017

Implementing distributed throttling using Apache Zookeeper


In a distributed environment, there are a number of components which provides service to the client [1]. One of these components is microservice, which is deployed in a cloud-based environment to provide high performance, scalability, and maintainability. But, achieving the high performance and low latency of the service all the time is challenging because sometimes a few clients churn up the resources. In order to keep the high-performance and low latency of the service with the limited resources, we need to limit the usage of these services. This can be done by using throttling-- to reduce the number of calls to the service by the client.

Throttling is a way to limits the clients to make a number of requests to the microservices. In a distributed environment multiple processes of a job call a microservice, which cause the high SLA. Throttling can be implemented at the client side if they are using the same resources and calling the same service. This type of throttling, multiple processes on different nodes calling the same service, is called Distributed throttling.


In this article, I will explain how to throttle, rate limit the number of calls, a distributed client. Suppose you are working on a spark job which calls the microservice, and the microservices is protected by a proxy server. Sometimes it is difficult to rate limits the service because services could be a third-party service which can not be modified or other software limitation. Also, there is another way to rate limit the calls by using the proxy server [2]. But in this article, we are not going to explore this option.

If your spark job is running on a 10 nodes cluster and each node has 4 cores, there will be a total 40 executors created by the spark--if it is configured to using all cores of a node [3]. For an example, each executor, running a separate task, calling a microservice with the rate 100 calls/ per second. Then cumulated calling rate of the all the executors would be approx. 40*100 calls/sec. To limit this calling rate,  the spark job's executors, are running as a separate process on different nodes, requires synchronizations. To solve this problem, I have implemented distributed throttling using a centralized service called Apache Zookeeper. Apache Zookeeper helps to in synchronizing executors. But, in my example (code), I have created the same scenario on a single node with multiple threads. 

Apache Zookeeper:

Apache Zookeeper is a centralized service to store and maintain configuration throughout the life cycle of the system. It is a distributed hierarchical key-value store, which runs on a cluster called an ensemble [4].  In the Zookeeper cluster, nodes coordinate to each other to maintain consistency based on a majority-- nodes (quorum) should have the same information.  As I mentioned,  Zookeeper stores data in a hierarchical namespace or tree data structure, this namespace, which contains information, is called znodes. The information such as version numbers for data changes, ACL changes, and timestamps, cache validations and coordinated updates. There are many types of znodes: Permanent, Sequential, and Ephemeral node. I will not explain each and every znode because our purpose is to use Zookeeper service to implements distributed throttling. In our example, I am going to use Zookeeper client, Apache Curator, which uses ephemeral and sequential znode internally. By using this centralized service a distributed applications can implement various recipes which are required in a distributed environment like locking, pub-sub, leader election, naming, etc. These services help developers not worry about to implement them from scratch.

Apache Curator:

Apache Curator provides a simple and easy way to communicate with Zookeeper. It provides high-level APIs such as Leader election, Semaphore, etc. As these recipes are a very common problem for distributed applications. Also, these recipes are well tested. It also provides a framework to write a custom recipe [5].

As we discussed earlier, the spark job, running on 10 nodes cluster, calling microservice with the rate of approx.  40,00 calls/sec. Our target is to minimize this calling rate. The question arises is How can we throttle the number of calls to the service? A solution for this problem is to synchronize the spark job's executors. For synchronization, we have used Apache Zookeeper to synchronize these processes of a spark job. 

Solution:

In this example, we will not use any spark job or microservice instead of that the solution will be depicted with a similar scenario using multithreaded approach. If we can synchronize the multiple threads for calling a common method of a different Class (like a resource or microservice) then the big problem can also be synchronized similar way.  However, there are many ways to synchronize the threads in Java or any other language but in our example, we will solve it by Zookeeper. Take an example, the threads T1 and T2 of Class A, running parallelly on a single machine, creating an instance of Class C to call a method foo. The calling rate of each thread to the method foo is 2 calls/sec, and when you measure a total number of calls to the method by both threads is approximately 4 calls/sec. If you want to reduce the number of calls from 4 calls/sec to 2 calls/sec then threads need to be synchronized. For synchronization, I have used Semaphore using Zookeeper. Apache Curator library provides InterProcessSemaphoreV2 class to implements distributed synchronization between multiple processes. The threads will use the InterProcessSemaphoreV2 instance to acquire locks before making a call to the method, in the case of unsuccessful threads will have to wait. A lock information resides inside the Zookeeper with help of InterProcessSemaphoreV2 object the threads will communicate to the Zookeeper to acquire the lock. To reduce the number of calls to the method the InterProcessSemaphoreV2 class provides a constructor for passing a number of locks, which is very important to throttle the number of calls. You can say, the lock argument as a parameter to tune the rate limits. In our example, I have provided one as a total number of locks to the InterProcessSemaphoreV2 class which helped me to reduce the call 2 calls/sec.





1. https://www.researchgate.net/publication/284169446_Using_Throttling_for_Resource_Utilization_Control_in_Component_Based_Distributed_Systems

2. http://blog.serverfault.com/2010/08/26/1016491873/

3. http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/

4. https://zookeeper.apache.org/doc/trunk/zookeeperOver.html

5. http://curator.apache.org/

Wednesday, September 23, 2015

3R: Regression, Residual and R-Squared


This blog aims for basic understanding of regression and its evaluation matrices. Also, I will explain a strategy when to choose the linear or non-linear regression equation for solving a problem. 

Regression: In statistic, regression analysis is used to determine the relationship between an independent variable(s), also known as predictor, and a dependent variable (predicted value). It is widely used for predicting or forecasting continuous values, which is opposed to the classifiers. Regression is classified in two types: linear and non-linear regression. The next question is "What is linear and non-linear equations?"

Linear regression: In simple term, if the data can be fitted by a straight line is called linear regression. As most of us are introduced by simple linear regression is:

Y =Θ(0) + Θ(1) X .........for one variant

Y = Θ(0) + Θ(1) X + Θ(2) X +.....+ Θ (n) X.... for multi variants


Furthermore, if you think only a straight line can be used to represents the linear regression then it is a wrong. How about this equation:

Y = Θ(0) + Θ(1) X^2...line has curve

Although this above equation has squared of X, but still it is linear model because the difference between of the parameters of the equation are still linear (Θ(0), Θ(1)). Linearity depends on the parameters of the equation not depends upon the predictors. Please visits this link for more details Linear vs non-linear.

Non-Linear: While a linear equation has one form, a non-linear equation has multiple forms (curves). Also, in a non-linear equation one predictor can have multiple parameters.

Example:
Y=Θ(1) + (Θ(2) - Θ(1)) * exp(-Θ(3) * X^Θ(4))



To determine the type of regression we will discuss residual plots. Similarly, to find out the fitness of a model ("good-fit") we will use evaluation matrices. Although, there are many evaluation matrices to find out the "good-fit" but we will discuss only R-Squared (R^2) and Means Squared Error (MSE). 

Residual:

Residual is a difference between the actual and observed value (predicted value). 

Here is a formula for residual:              
residual=actual value - predicted value

Yes, I know you are confused with the error, so "what is error ?." Are error and residual same? Answer is "NO", in theoretically both are used to measure the deviation of an observed value from an expected value, but they are different. If you want to know more about it, please read this page "Errors and residuals".

How can these residuals be useful to recognizes the type of regression? As a first step, fit the model using linear regression, and collect the residual of each data point. Once you finished collecting values then draw a scattered graph, which is also know as residual plot. The X-axis is your predicted values, and Y-axis is residual values. In the residual plot, if you observe any pattern then the problem is a non-linear regression, and otherwise there is a no pattern (randomness) in the graph then the problem is a linear regression. Also, the estimator considered to be a good fit if the data points are around to zero in the graph.

Please visit this website (interpreting residual plots) for interpreting the residual graphs with various example.

Mean Squared Error: Assuming that you have already read the difference between an error and a residual. If not it is alright, here it is a definition: "In statistic, the error is the amount of deviation of an observed value from an expected value. In statistic, an error is a deviation from the mean value of the entire data set." Thus, the expected value is un observable because the it a mean not an actual value. However, in the regression analysis an error is a deviation from an observed value.

In regression, What is a mean squared error (MSE)?
MSE is an average of the squares of the errors. It is also called the sum of squares of the residuals. It measures the dispersion (variance) around the true values of the estimator. Ohh just a minute "What is variance?" I know you are thinking about it. Let see the single line definition of the variance: "It measures how far data is dispersed." In another term, the MSE measures how much variance is not covered by the estimator (error).

Notations:
E  : It is an estimator, which predict values
Θ  : It represents the actual values
Θ′ : It represents the predicted values by the estimator E(Θ)
N : Number of data points
SE : Sum of the squares of the errors

MSE formula:
SE = ∑ [(Θ-Θ′)^2]
MSE= SE/ N

How to analyze MSE value?  MSE value is high then model is not "good-fit", while the value is low the model is well fitted. However, the value is near to zero means the estimator overfitted. The good estimator's MSE value lies between 20-30 percentage of the total squared error.

Note: Variance and MSE are different, Please read this article mean square error and variance.

R-Squared: R-squared value considered as an accuracy of a model. It's value lies from 0 to 1. The higher value represents "good-fit" model, while the low value (near to zero) is a bad fitted model. However, the higher value always do not considered the "good-fit" model. It helps us to measure the  amount of variance covered by the model. 

Formula: 1- (u/v)

u: It is a mean squared error (MSE)--It measures the variance, which is not covered by the model

v: It is over all variance of the dependent variable.

u/v: What's percentage of variance is not covered by the model.

If you want to calculate the amount (in percentage) covered by the model then take the difference between 1 and u/v.

1-u/v: It represents the total variance you much variance covered by the model


Thursday, August 1, 2013

Making Hadoop to take input/output from MongoDB rather than HDFS.


Introduction 


90% of the data of the world has been generated in past two years and it has been growing exponentially every year. About 80% of this data is unstructured data and its useful only if you add meaning to it. So there is an intense need to analyze, store and query this data in a distributed  and scalable fashion. Hadoop is prooven to achieve this, but there is a limitation that all the data should be stored into Hadoop HDFS beforehand ( This has its advantages though like data locality, fault tolerance etc etc ). So there is an overhead of transferring all the data to HDFS. Coming to databases for storing humongous data, I don’t want to discuss the advantages/disadvantages of using SQL or NoSQL because that goes way beyond the scope of this article. NoSQL databases are gaining traction in storing and querying humongous data in a scalable and efficient way. One of the famous NoSQL databases is MongoDB. MongoDB is a NoSQL database that provides high availability of data with significant performance (latency and throughput).


So the question is can we integrate Hadoop and MongoDB? Can we make Hadoop take input directly from MongoDB and write/update output back into MongoDB. This would make the application distributed, scalable both in terms of computation and storage. This enables processing of large datasets (stored in a Mongo cluster) in a Hadoop cluster using Map -Reduce programming model.



Article goal

As you know Hadoop communicate with HDFS (Hadoop distributed file system) for getting input and storing output. In this article I am going to explain How to replace HDFS with MongoDB using Mongo-Hadoop adapter. Most importantly is  How to update the collection with the analyzed data from Hadoop. 


Example


Driver class code:

JobClient client = new JobClient();
JobConf conf = new JobConf(Driver.class);
MongoConfigUtil.setInputURI(conf, "mongodb://ip-address/
databasename.collectionaname");
MongoConfigUtil.setOutputURI(conf, "mongodb://ip-address/databasename.collectionaname");

/*Mapper  configuration */
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);



job.setInputFormatClass(MongoInputFormat.class);
job.setOutputFormatClass(MongoOutputFormat.class );
               
/*Reducer  configuration*/
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(MongoUpdateWritable.class);




Mongo database configuration

In order to replace the HDFS with Mongo database you have to give input and output uri.
MongoConfigUtil.setInputURI(conf, "mongodb://ip-address/databasename.collectionaname");
MongoConfigUtil.setOutputURI(conf, "mongodb://ip-address/databasename.collectionaname"); 


mongodb://<ip-address/databasename.collectionaname>

ip-adddress: location of mongo db server.

database name: Name of database.

collection name:name of collection in which you want to perform input and output.

 

job.setInputFormatClass(MongoInputFormat.class);

Set input format class as  MongoInputFormat.class.Hadoop will pass the Mongodb object as input to mapper.

job.setOutputFormatClass(MongoOutputFormat.class );

Set output format class as  MongoOutputFormat.class.Reducer should store the object which is sub class of MongoOutput Format class.


Mapper configuration

Provide the type of output key and value class,which passes to reducer class.
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);


Reducer configuration

Provide the type of Key and Value object which is going to store in database. Here in the output class (MongoUpdateWritable) you need to define in the object whether it is going to insert new fields (key-values) in the document or update already existing fields in the document.

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(MongoUpdateWritable.class);

Updation logic inside Reducer class:
BasicBSONObject query=new BasicBSONObject();
query.append("<attribute name>", key);
BasicBSONObject update=new BasicBSONObject();
List<String> finalValues=new ArrayList<String>();
update.append("$set", new BasicBSONObject().append("<attribute name>",finalValues));
MongoUpdateWritable muw=new MongoUpdateWritable(query,update,false,true);
 
 
contex.write(key, muw);