post image February 10, 2022 | 7 min Read

Rate limiting actor on Ray via Leaky Bucket

This post is part of a series to show the use of Ray for data ingestion and data analysis.

We will implement a rate-limiting actor based on the leaky bucket algorithm used initially in packet-switching telecommunications. The actor will be implemented in Ray - a distributed Python runtime - to issue REST API requests concurrently while the rate-limiter will throttle requests if we have exhausted the APIs rate limit. Likewise, it will let requests resume when new request capacity becomes available.

Before we start implementing the algorithm, let’s talk briefly about the function and use cases of limits and quotas in the context of REST API. We will also introduce Ray as a platform for distributed data processing and the role of actors in a distributed system.

Rate limits and Quotas

Often publicly exposed REST APIs nowadays are protected by rate limits or quotas to deal with short-term request volume and long-term service usage respectively. Rate limits restrict the calls that can be issued against an API over a certain short time slot. While quotas do the same for long time intervals. A typical time interval for a rate limit would be calls per second or calls per minute. For quotas, we usually talk about monthly use. Both methods are used to ensure a service’s availability and fair use among a group of users. The mapping between users and rate-limit is usually not a one-to-one relation. Users could have different endpoint tokens or the rate limit is not associated at all with a token. Instead, the requesters’ IP addresses are used for rate-limiting and quotas policies. In some scenarios, the user of a service might impose a rate limit or quota on his side, for example, to limit infrastructure cost on his behalf.

If an API’s client issues too many requests the service will respond with a 429 HTTP status code indicating a rate limit exhaustion. In addition, the service can indicate a time interval after which new requests are permitted again. Some rate limiters use this information from the service to discover the rate limit of the service and adopt themself in the case of a rate limit exhaustion. In this post, we will use a different approach by avoiding a 429 response in the first place. For this, to work, we need to know explicitly the rate limit of a service. This information can be often found in the service-level agreement (SLA) of a service.

Ray and Actors

Ray is a general-purpose distributed Python runtime. As such GIL Python programs can be scaled from a single CPU core performance to multi-core multi-node parallel and concurrent performance. Even newer async/await thread-level concurrent Python programs profit from multi-node distribution. Ray’s core is written in C++ and can be also targeted from that language. Initially used mainly for distributed hyperparameter tuning in machine learning (ML) Ray expanded into other areas. In recent versions, workflow processing and data processing frameworks got added to Ray which are implemented in Python on top of Ray core. Like many recent frameworks, Ray uses distributed Apache Arrow for fast data sharing and exchange.

The actor model was introduced in 1973 by Carl Hewitt as a new model of computation. Similar to the central position of an object in the more familiar object-oriented programming (OOP) model an actor represents all entities of a program. That is numbers, constants, functions, and so on are represented as actors. Actors have a few properties

  • an actor can create other actors
  • an actor can send messages to other actors
  • an actor can choose a behavior to react to the next message it receives

Newer languages implementations that have built-in support for the actor model are Erlang, IO, Pony, and Gleam. Besides that for many languages libraries exists that provide actor functionality on top of an existing language runtime. The message-passing paradigm abstracts away the mechanics of function call conventions in traditional languages. It easily lends itself to concurrency, parallelism, and distributed systems.

Leaky Bucket Algorithm

The leaky bucket algorithm we are about to use to implement our rate limiter is relatively easy to understand. It works like a water bucket with a small hole leaking water. We can fill the bucket to its capacity. This state is analog to having reached a rate limit. If we fill in more water, the bucket overflows. New requests get ignored. Over time, the bucket will leak a certain amount of water. And we can fill in more water. If we never reach the capacity of the bucket, we do not get to the rate limit.

Implementation

In Ray, we can create an actor by annotating a Python class with @ray.remote. Access to actor methods from different remote functions will be serial so we do not need to worry about concurrent data access by different remote functions. Ray actors are backed by a running process. If the process dies the actor’s state will be lost. The Ray workflow framework provides persistent actors called virtual actors. But for our use case, we stick to the default actor type.

Let us now start with the creation of an actor in Ray and add some local state to it.

import ray

@ray.remote
class LeakyBucketActor:
    capacity: int
    rate: int
    interval: int

    def __init__(self, capacity=60, rate=1, interval=1) -> None:
        self.capacity = capacity
        self.rate = rate
        self.interval = interval

    def acquire(self) -> None:
        return

The LeakyBucketActor class has now a constructor that will create a new Actor instance. The constructor method was parameterized to allow setting up different rate limits and capacities for each actor. Capacity specifies the initial amount of requests we can make till the API will start throttling requests. With the parameters rate and interval, we can customize the ratio of regeneration. Per interval in seconds the actor can make rate new request.

The actor needs to be aware of the last time a request was made to the API to calculate the number of requests that were possible in this period of time but went by unused or that a now again possible done to the request rate of the API. This unused capacity helps us rebuild our capacity if we reached the rate limit before.

So let’s add a variable holding the timestamp of the time we last called the actor’s acquire method. The acquire method will be used as a guard before we make a request to an API. It will enforce rate limiting on our side. The acquire method will loop for now and will recalculate the elapsed time since the last REST API request.

import time

@ray.remote
class LeakyBucketActor:
    last_leak: float

    def __init_(self) -> None:
        self.last_leak = time.monotonic()

    def acquire(self) -> None:
        while True:
            now = time.monotonic()
            elapsed = now - self.last_leak

We can now start calculating the regeneration we got since the last call to the REST API and also add the logic to let the method return if no rate limit needs to be enforced. Before we return to the actor caller we update the last call time and the new amount to the new values.

@ray.remote
class LeakyBucketActor:
    amount: int

    def __init__(self) -> None:
        self.amount = 0

    async def acquire(self) -> None:
        while True:
            now = time.monotonic()
            elapsed = now - self.last_leak
            new_capacity = max(
                int(self.amount - decrement) + 1, 0)
            self.amount = new_capacity

            if self.amount + n > self.capacity:
                time.sleep(self.interval)
            else:
                self.last_leak = now
                self.amount += 1
                return

The complete code can be found on GitHub. We now have a simple actor that functions as a rate limiter. To use it in our program we need to create an actor with the rate limits we require and call its acquire method before we make a REST API request.

import requests

@ray.remote
def api_call(rate_limiter):
    ray.get(rate_limiter.acquire.remote())
    requests.get("")

if __name__ == "__main__":
    capacity = 200
    rate_limiter = LeakyBucketActor.remote(
        capacity=capacity,
        rate=2
    )

    calls = [api_call.remote() for _ in range(capacity * 2)]
    ray.wait(calls)

In the next post in this series, we will extend the actor to allow for different request weights and tags. This allows the reuse of the same rate limit actor for different APIs which exhibit the same rate limit. This is often the case if you use different services from the same provider. We will also add metrics to the actor and start testing it.

[1] https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/429

comments powered by Disqus