Sunday, June 4, 2017

Implementing distributed throttling using Apache Zookeeper


In a distributed environment, there are a number of components which provides service to the client [1]. One of these components is microservice, which is deployed in a cloud-based environment to provide high performance, scalability, and maintainability. But, achieving the high performance and low latency of the service all the time is challenging because sometimes a few clients churn up the resources. In order to keep the high-performance and low latency of the service with the limited resources, we need to limit the usage of these services. This can be done by using throttling-- to reduce the number of calls to the service by the client.

Throttling is a way to limits the clients to make a number of requests to the microservices. In a distributed environment multiple processes of a job call a microservice, which cause the high SLA. Throttling can be implemented at the client side if they are using the same resources and calling the same service. This type of throttling, multiple processes on different nodes calling the same service, is called Distributed throttling.


In this article, I will explain how to throttle, rate limit the number of calls, a distributed client. Suppose you are working on a spark job which calls the microservice, and the microservices is protected by a proxy server. Sometimes it is difficult to rate limits the service because services could be a third-party service which can not be modified or other software limitation. Also, there is another way to rate limit the calls by using the proxy server [2]. But in this article, we are not going to explore this option.

If your spark job is running on a 10 nodes cluster and each node has 4 cores, there will be a total 40 executors created by the spark--if it is configured to using all cores of a node [3]. For an example, each executor, running a separate task, calling a microservice with the rate 100 calls/ per second. Then cumulated calling rate of the all the executors would be approx. 40*100 calls/sec. To limit this calling rate,  the spark job's executors, are running as a separate process on different nodes, requires synchronizations. To solve this problem, I have implemented distributed throttling using a centralized service called Apache Zookeeper. Apache Zookeeper helps to in synchronizing executors. But, in my example (code), I have created the same scenario on a single node with multiple threads. 

Apache Zookeeper:

Apache Zookeeper is a centralized service to store and maintain configuration throughout the life cycle of the system. It is a distributed hierarchical key-value store, which runs on a cluster called an ensemble [4].  In the Zookeeper cluster, nodes coordinate to each other to maintain consistency based on a majority-- nodes (quorum) should have the same information.  As I mentioned,  Zookeeper stores data in a hierarchical namespace or tree data structure, this namespace, which contains information, is called znodes. The information such as version numbers for data changes, ACL changes, and timestamps, cache validations and coordinated updates. There are many types of znodes: Permanent, Sequential, and Ephemeral node. I will not explain each and every znode because our purpose is to use Zookeeper service to implements distributed throttling. In our example, I am going to use Zookeeper client, Apache Curator, which uses ephemeral and sequential znode internally. By using this centralized service a distributed applications can implement various recipes which are required in a distributed environment like locking, pub-sub, leader election, naming, etc. These services help developers not worry about to implement them from scratch.

Apache Curator:

Apache Curator provides a simple and easy way to communicate with Zookeeper. It provides high-level APIs such as Leader election, Semaphore, etc. As these recipes are a very common problem for distributed applications. Also, these recipes are well tested. It also provides a framework to write a custom recipe [5].

As we discussed earlier, the spark job, running on 10 nodes cluster, calling microservice with the rate of approx.  40,00 calls/sec. Our target is to minimize this calling rate. The question arises is How can we throttle the number of calls to the service? A solution for this problem is to synchronize the spark job's executors. For synchronization, we have used Apache Zookeeper to synchronize these processes of a spark job. 

Solution:

In this example, we will not use any spark job or microservice instead of that the solution will be depicted with a similar scenario using multithreaded approach. If we can synchronize the multiple threads for calling a common method of a different Class (like a resource or microservice) then the big problem can also be synchronized similar way.  However, there are many ways to synchronize the threads in Java or any other language but in our example, we will solve it by Zookeeper. Take an example, the threads T1 and T2 of Class A, running parallelly on a single machine, creating an instance of Class C to call a method foo. The calling rate of each thread to the method foo is 2 calls/sec, and when you measure a total number of calls to the method by both threads is approximately 4 calls/sec. If you want to reduce the number of calls from 4 calls/sec to 2 calls/sec then threads need to be synchronized. For synchronization, I have used Semaphore using Zookeeper. Apache Curator library provides InterProcessSemaphoreV2 class to implements distributed synchronization between multiple processes. The threads will use the InterProcessSemaphoreV2 instance to acquire locks before making a call to the method, in the case of unsuccessful threads will have to wait. A lock information resides inside the Zookeeper with help of InterProcessSemaphoreV2 object the threads will communicate to the Zookeeper to acquire the lock. To reduce the number of calls to the method the InterProcessSemaphoreV2 class provides a constructor for passing a number of locks, which is very important to throttle the number of calls. You can say, the lock argument as a parameter to tune the rate limits. In our example, I have provided one as a total number of locks to the InterProcessSemaphoreV2 class which helped me to reduce the call 2 calls/sec.





1. https://www.researchgate.net/publication/284169446_Using_Throttling_for_Resource_Utilization_Control_in_Component_Based_Distributed_Systems

2. http://blog.serverfault.com/2010/08/26/1016491873/

3. http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/

4. https://zookeeper.apache.org/doc/trunk/zookeeperOver.html

5. http://curator.apache.org/

No comments:

Post a Comment