Rate limiting actor on Ray via Leaky Bucket
This post is part of a series to show the use of Ray for data ingestion and data analysis.
We will implement a rate-limiting actor based on the leaky bucket algorithm used initially in packet-switching telecommunications. The actor will be implemented in Ray - a distributed Python runtime - to issue REST API requests concurrently while the rate-limiter will throttle requests if we have exhausted the APIs rate limit. Likewise, it will let requests resume when new request capacity becomes available.
Before we start implementing the algorithm, let’s talk briefly about the function and use cases of limits and quotas in the context of REST API. We will also introduce Ray as a platform for distributed data processing and the role of actors in a distributed system.
Rate limits and Quotas
Often publicly exposed REST APIs nowadays are protected by rate limits or quotas to deal with short-term request volume and long-term service usage respectively. Rate limits restrict the calls that can be issued against an API over a certain short time slot. While quotas do the same for long time intervals. A typical time interval for a rate limit would be calls per second or calls per minute. For quotas, we usually talk about monthly use. Both methods are used to ensure a service’s availability and fair use among a group of users. The mapping between users and rate-limit is usually not a one-to-one relation. Users could have different endpoint tokens or the rate limit is not associated at all with a token. Instead, the requesters’ IP addresses are used for rate-limiting and quotas policies. In some scenarios, the user of a service might impose a rate limit or quota on his side, for example, to limit infrastructure cost on his behalf.
If an API’s client issues too many requests the service will respond with a 429 HTTP status code indicating a rate limit exhaustion. In addition, the service can indicate a time interval after which new requests are permitted again. Some rate limiters use this information from the service to discover the rate limit of the service and adopt themself in the case of a rate limit exhaustion. In this post, we will use a different approach by avoiding a 429 response in the first place. For this, to work, we need to know explicitly the rate limit of a service. This information can be often found in the service-level agreement (SLA) of a service.
Ray and Actors
Ray is a general-purpose distributed Python runtime. As such GIL Python programs can be scaled from a single CPU core performance to multi-core multi-node parallel and concurrent performance. Even newer async/await thread-level concurrent Python programs profit from multi-node distribution. Ray’s core is written in C++ and can be also targeted from that language. Initially used mainly for distributed hyperparameter tuning in machine learning (ML) Ray expanded into other areas. In recent versions, workflow processing and data processing frameworks got added to Ray which are implemented in Python on top of Ray core. Like many recent frameworks, Ray uses distributed Apache Arrow for fast data sharing and exchange.
The actor model was introduced in 1973 by Carl Hewitt as a new model of computation. Similar to the central position of an object in the more familiar object-oriented programming (OOP) model an actor represents all entities of a program. That is numbers, constants, functions, and so on are represented as actors. Actors have a few properties
- an actor can create other actors
- an actor can send messages to other actors
- an actor can choose a behavior to react to the next message it receives
Newer languages implementations that have built-in support for the actor model are Erlang, IO, Pony, and Gleam. Besides that for many languages libraries exists that provide actor functionality on top of an existing language runtime. The message-passing paradigm abstracts away the mechanics of function call conventions in traditional languages. It easily lends itself to concurrency, parallelism, and distributed systems.
Leaky Bucket Algorithm
The leaky bucket algorithm we are about to use to implement our rate limiter is relatively easy to understand. It works like a water bucket with a small hole leaking water. We can fill the bucket to its capacity. This state is analog to having reached a rate limit. If we fill in more water, the bucket overflows. New requests get ignored. Over time, the bucket will leak a certain amount of water. And we can fill in more water. If we never reach the capacity of the bucket, we do not get to the rate limit.
Implementation
In Ray, we can create an actor by annotating a Python
class with @ray.remote
. Access to actor methods
from different remote functions will be serial
so we do not need to worry about concurrent
data access by different remote functions.
Ray actors are backed by a running process. If the
process dies the actor’s state will be lost. The Ray workflow
framework provides persistent actors called virtual
actors. But for our use case, we stick to the default actor
type.
Let us now start with the creation of an actor in Ray and add some local state to it.
import ray
@ray.remote
class LeakyBucketActor:
capacity: int
rate: int
interval: int
def __init__(self, capacity=60, rate=1, interval=1) -> None:
self.capacity = capacity
self.rate = rate
self.interval = interval
def acquire(self) -> None:
return
The LeakyBucketActor class has now a constructor that will create
a new Actor instance. The constructor method was parameterized
to allow setting up different rate limits and capacities
for each actor. Capacity
specifies the initial amount
of requests we can make till the API will start throttling
requests. With the parameters rate
and interval
, we
can customize the ratio of regeneration. Per interval
in seconds
the actor can make rate
new request.
The actor needs to be aware of the last time a request was made to the API to calculate the number of requests that were possible in this period of time but went by unused or that a now again possible done to the request rate of the API. This unused capacity helps us rebuild our capacity if we reached the rate limit before.
So let’s add a variable holding the timestamp
of the time we last called the actor’s acquire
method. The acquire
method will be used as a guard before we make a request
to an API. It will enforce rate limiting on our
side. The acquire method will loop
for now and will
recalculate the elapsed time
since the last REST API request.
import time
@ray.remote
class LeakyBucketActor:
last_leak: float
def __init_(self) -> None:
self.last_leak = time.monotonic()
def acquire(self) -> None:
while True:
now = time.monotonic()
elapsed = now - self.last_leak
We can now start calculating the regeneration we got since the last call to the REST API and also add the logic to let the method return if no rate limit needs to be enforced. Before we return to the actor caller we update the last call time and the new amount to the new values.
@ray.remote
class LeakyBucketActor:
amount: int
def __init__(self) -> None:
self.amount = 0
async def acquire(self) -> None:
while True:
now = time.monotonic()
elapsed = now - self.last_leak
new_capacity = max(
int(self.amount - decrement) + 1, 0)
self.amount = new_capacity
if self.amount + n > self.capacity:
time.sleep(self.interval)
else:
self.last_leak = now
self.amount += 1
return
The complete code can be found on GitHub. We now have a simple
actor that functions as a rate limiter. To use it
in our program we need to create an actor with the rate limits
we require and call its acquire
method before we make a REST API
request.
import requests
@ray.remote
def api_call(rate_limiter):
ray.get(rate_limiter.acquire.remote())
requests.get("")
if __name__ == "__main__":
capacity = 200
rate_limiter = LeakyBucketActor.remote(
capacity=capacity,
rate=2
)
calls = [api_call.remote() for _ in range(capacity * 2)]
ray.wait(calls)
In the next post in this series, we will extend the actor to allow for different request weights and tags. This allows the reuse of the same rate limit actor for different APIs which exhibit the same rate limit. This is often the case if you use different services from the same provider. We will also add metrics to the actor and start testing it.
[1] https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/429