post image February 13, 2022 | 3 min Read

Adding more features to the rate limiting actor

This post is part of a series to show the use of Ray for data ingestion and data analysis.

In the last post in this series, we implemented an actor in Ray to enforce a rate limit on REST API calls. Today we will extend the actor to give requests different weights and labels so we can use the actor for different API endpoints and model different kinds of requests.

To gain more insights into the performance and progress of a workload running on the rate limit actor, we will amend the actor with a metric and observe the progress of a workload.

At the end of this post, we will show how to monkey patch an alpaca and test our rate limiter with Nginx.

Weights and Labels

We can extend the acquire method of the actor with an integer parameter to account for different kinds of requests. This feature is helpful to model extra request costs. REST API requests with higher weights will decrease the overall rate limit.

@ray.remote
class LeakyBucketActor:
    async def acquire(self, weight=1) -> None:
            if self.amount + weight > self.capacity:
                time.sleep(self.interval)
            else:
                self.last_leak = now
                self.amount += weight
                return

As a second extension, we can label each request we make. Labeling allows us to use the same rate limiter for different APIs. Each label will thus have its bucket. In this way, one rate limit actor can hold and calculate the limits for multiple buckets. The only condition is that the APIs share an identical request rate. Or that it is possible to emulate the required rate limits with additional weights added to the requests.

@ray.remote
class LeakyBucketActor:
    async def acquire(self, label="default", weight=1) -> None:
            amount = self.amount.get(label, 0)
            if amount + weight > self.capacity:
                time.sleep(self.interval)
            else:
                self.last_leak = now
                self.amount[label] = amount + weight
                return

Monitoring

Especially when executing long-running processes, it is helpful to have insights into the performance and progress of a workload. Ray has out-of-the-box support for custom metrics. When the Ray head node starts with the metrics exporter enabled, it exports a JSON file with the endpoints of the metrics in Prometheus format. Using this file, you can configure Prometheus to scrape the metrics.

ray start --head --metrics-export-port=8080
echo <<<-EOF > /tmp/prometheus.yml
global:
  scrape_interval:     2s
  evaluation_interval: 2s

scrape_configs:
- job_name: 'ray'
  file_sd_configs:
  - files:
    - '/tmp/ray/prom_metrics_service_discovery.json'
EOF
prometheus -c /tmp/prometheus.yml

To add the metric itself, use the helper functions from ray.util.metrics.

from ray.util.metrics import Counter

@ray.remote
class LeakyBucketActor:
    def __init__(self, capacity=200, rate=3) -> None:
        self._counter = Counter(
            "number_of_calls",
            description="Number of call to actor methods",
            tag_keys=("actor_name",),
        )
        self._counter.set_default_tags({"actor_name": "LeakyBucketActor"})

    async def acquire(self, label="default", weight=1) -> None:
        self._counter.inc()

Monkey patching

Often we need to integrate Ray workloads with existing Python libraries. While some libraries support plugging into their functionality via hooks or middleware, others do not provide many opportunities for customization. The last resort is to patch the source code of the library in question. If the code changes are minor, this can also be done by overwriting or amending functions at run time.

In the following example, we will patch the Alpaca client SDK to call your actor method to throttle requests before we hit the rate limit of the Alpaca REST API. Alpaca is a trading platform that also provides REST APIs to download historical trading data for various stocks. Free tier accounts have a rate limit of 200 requests per minute. We will assume a rate limit of 3 requests per second. The full source code of this example can be found on GitHub.

from alpaca_trade_api.rest import REST

class CustomREST(REST):
    def __init__(self, actor, key_id, secret_key):
        super().__init__(key_id, secret_key)
        self._actor = actor

    def _one_request(self, method: str, url: URL, opts: dict, retry: int):
        ray.get(self._actor.acquire.remote())
        retry_codes = self._retry_codes
        ...

if __name__ == "__main__":
    actor = LeakyBucketActor.remote()
    api = CustomREST(actor=actor)
    df = api.get_quotes("AAPL", "2022-01-03", "2022-01-04").df

Testing

Nginx

[1] https://docs.ray.io/en/latest/ray-metrics.html

comments powered by Disqus