Skip to content

Commit be7ac47

Browse files
authored
Add Public Website documentation for RateLimiter (#37949)
* Add Documentation for RateLimiter * Add Autoscaler integration * fix command * fix run command * update overview * fix spotless * fix documentation * remove naming
1 parent 5a65f6e commit be7ac47

7 files changed

Lines changed: 135 additions & 0 deletions

File tree

examples/java/build.gradle

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,3 +158,29 @@ task wordCount(type:JavaExec) {
158158
systemProperties = System.getProperties()
159159
args = ["--output=/tmp/output.txt"]
160160
}
161+
162+
// Run any example by using class name
163+
// this task defines class path based on the runner argument
164+
task exec (type:JavaExec) {
165+
mainClass = System.getProperty("mainClass")
166+
def execArgs = System.getProperty("exec.args")
167+
String runner
168+
if (execArgs) {
169+
def runnerPattern = /runner[ =]([A-Za-z]+)/
170+
def matcher = execArgs =~ runnerPattern
171+
if (matcher) {
172+
runner = matcher[0][1]
173+
runner = runner.substring(0, 1).toLowerCase() + runner.substring(1);
174+
if (!(runner in (preCommitRunners + nonPreCommitRunners))) {
175+
throw new GradleException("Unsupported runner: " + runner)
176+
}
177+
}
178+
}
179+
if (runner) {
180+
classpath = sourceSets.main.runtimeClasspath + configurations."${runner}PreCommit"
181+
} else {
182+
classpath = sourceSets.main.runtimeClasspath
183+
}
184+
systemProperties System.getProperties()
185+
args execArgs ? execArgs.split() : []
186+
}

examples/java/src/main/java/org/apache/beam/examples/RateLimiterSimple.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ public interface Options extends PipelineOptions {
5959
void setRateLimiterDomain(String value);
6060
}
6161

62+
// [START RateLimiterSimpleJava]
6263
static class CallExternalServiceFn extends DoFn<String, String> {
6364
private final String rlsAddress;
6465
private final String rlsDomain;
@@ -111,6 +112,7 @@ public void processElement(ProcessContext c) throws Exception {
111112
c.output("Processed: " + element);
112113
}
113114
}
115+
// [END RateLimiterSimpleJava]
114116

115117
public static void main(String[] args) {
116118
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);

sdks/python/apache_beam/examples/inference/rate_limiter_vertex_ai.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ def run(argv=None):
5252
pipeline_options = PipelineOptions(pipeline_args)
5353
pipeline_options.view_as(SetupOptions).save_main_session = True
5454

55+
# [START RateLimiterVertexPy]
5556
# Initialize the EnvoyRateLimiter
5657
rate_limiter = EnvoyRateLimiter(
5758
service_address=known_args.rls_address,
@@ -67,6 +68,7 @@ def run(argv=None):
6768
project=known_args.project,
6869
location=known_args.location,
6970
rate_limiter=rate_limiter)
71+
# [END RateLimiterVertexPy]
7072

7173
# Input features for the model
7274
features = [[1.0, 2.0, 3.0], [4.0, 5.0, 6.0], [7.0, 8.0, 9.0],

sdks/python/apache_beam/examples/rate_limiter_simple.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
from apache_beam.utils import shared
3030

3131

32+
# [START RateLimiterSimplePython]
3233
class SampleApiDoFn(beam.DoFn):
3334
"""A DoFn that simulates calling an external API with rate limiting."""
3435
def __init__(self, rls_address, domain, descriptors):
@@ -61,6 +62,9 @@ def process(self, element):
6162
yield element
6263

6364

65+
# [END RateLimiterSimplePython]
66+
67+
6468
def parse_known_args(argv):
6569
"""Parses args for the workflow."""
6670
parser = argparse.ArgumentParser()

website/www/site/content/en/documentation/patterns/overview.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,9 @@ Pipeline patterns demonstrate common Beam use cases. Pipeline patterns are based
5959
* [Create a cache on a batch pipeline](/documentation/patterns/shared-class/#create-a-cache-on-a-batch-pipeline)
6060
* [Create a cache and update it regularly on a streaming pipeline](/documentation/patterns/shared-class/#create-a-cache-and-update-it-regularly-on-a-streaming-pipeline)
6161

62+
**Rate limiting patterns** - Patterns for rate limiting DoFns and Transforms in Beam pipelines
63+
* [Rate limiting DoFns and Transforms](/documentation/patterns/rate-limiting)
64+
6265
## Contributing a pattern
6366

6467
To contribute a new pipeline pattern, create [a feature request](https://github.com/apache/beam/issues/new?labels=new+feature%2Cawaiting+triage&template=feature.yml&title=%5BFeature+Request%5D%3A+) and add details to the issue description. See [Get started contributing](/contribute/) for more information.
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
---
2+
title: "Rate limiting patterns"
3+
---
4+
<!--
5+
Licensed under the Apache License, Version 2.0 (the "License");
6+
you may not use this file except in compliance with the License.
7+
You may obtain a copy of the License at
8+
9+
http://www.apache.org/licenses/LICENSE-2.0
10+
11+
Unless required by applicable law or agreed to in writing, software
12+
distributed under the License is distributed on an "AS IS" BASIS,
13+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
See the License for the specific language governing permissions and
15+
limitations under the License.
16+
-->
17+
18+
# Rate limiting patterns
19+
20+
Apache Beam is built to maximize throughput by scaling workloads across thousands of workers. However, this massive parallelism requires coordination when pipelines interact with external systems that enforce strict quotas, such as 3rd-party REST APIs, databases, or internal microservices. Without a centralized rate limiting mechanism, independent workers might exceed the capacity of these systems, resulting in service degradation or broad IP blocking.
21+
22+
## Centralized Rate Limit Service
23+
24+
The recommended approach for global rate limiting in Beam is using a centralized Rate Limit Service (RLS).
25+
26+
A production-ready Terraform module to deploy this service on GKE is available in the beam repository:
27+
[`envoy-ratelimiter`](https://github.com/apache/beam/tree/master/examples/terraform/envoy-ratelimiter)
28+
29+
To deploy the rate-limiting infrastructure on GKE:
30+
31+
1. Update `terraform.tfvars` with your project variables to adjust rules and domains.
32+
2. Run the helper deploy script: `./deploy.sh`
33+
34+
This script automates deployment and, upon completion, returns the Internal Load Balancer IP address for your deployment that you will use in your pipeline.
35+
36+
---
37+
38+
{{< language-switcher java py >}}
39+
40+
## Using RateLimiter
41+
42+
To rate limit requests in your pipeline, you can create a RateLimiter client in your `DoFn`'s setup phase and acquire permits before making calls in the process phase.
43+
44+
{{< paragraph class="language-java" >}}
45+
In Java, use the `RateLimiter` interface and `EnvoyRateLimiterFactory` implementation to coordinate with the Envoy service. Create `RateLimiterOptions` with your service address, initialize the client in @Setup using `EnvoyRateLimiterFactory`, and call `rateLimiter.allow(batchSize)` in @ProcessElement to acquire a batch of permits.
46+
{{< /paragraph >}}
47+
48+
{{< highlight java >}}
49+
{{< code_sample "examples/java/src/main/java/org/apache/beam/examples/RateLimiterSimple.java" RateLimiterSimpleJava >}}
50+
{{< /highlight >}}
51+
52+
{{< paragraph class="language-py" >}}
53+
In Python, use the `EnvoyRateLimiter` and <a href="/documentation/patterns/shared-class/" style="text-decoration: underline;">Shared</a> to coordinate a single client instance shared across threads. Initialize client in `setup()` using `shared`, and call `self.rate_limiter.allow()` in `process()` to acquire rate-limiting permits before executing API calls.
54+
{{< /paragraph >}}
55+
56+
{{< highlight py >}}
57+
{{< code_sample "sdks/python/apache_beam/examples/rate_limiter_simple.py" RateLimiterSimplePython >}}
58+
{{< /highlight >}}
59+
60+
{{< paragraph class="language-py" >}}
61+
If you are using **RunInference** for remote model inference (e.g., Vertex AI), you can pass the `EnvoyRateLimiter` directly to the `ModelHandler`. The model handler coordinates the rate limit internally across your distributed workers.
62+
{{< /paragraph >}}
63+
64+
{{< highlight py >}}
65+
{{< code_sample "sdks/python/apache_beam/examples/inference/rate_limiter_vertex_ai.py" RateLimiterVertexPy >}}
66+
{{< /highlight >}}
67+
68+
---
69+
70+
## Running Example Pipelines with RateLimiter
71+
72+
Once your Rate Limiter Service is deployed and has an Internal IP, you can run your pipeline pointing to that address.
73+
74+
{{< highlight java >}}
75+
# Get the IP from your RLS deployment
76+
export RLS_ADDRESS="<INTERNAL_IP>:8081"
77+
78+
./gradlew :examples:java:exec -DmainClass=org.apache.beam.examples.RateLimiterSimple \
79+
-Dexec.args="--runner=<RUNNER> \
80+
--rateLimiterAddress=${RLS_ADDRESS} \
81+
--rateLimiterDomain=mongo_cps"
82+
{{< /highlight >}}
83+
84+
{{< highlight py >}}
85+
# Get the IP from your RLS deployment
86+
export RLS_ADDRESS="<INTERNAL_IP>:8081"
87+
88+
python -m apache_beam.examples.rate_limiter_simple \
89+
--runner=<RUNNER> \
90+
--rls_address=${RLS_ADDRESS}
91+
{{< /highlight >}}
92+
93+
## AutoScaler Integration
94+
95+
The throttling time and signals from the RateLimiter has to be picked up by the autoscaler. This allows the autoscaler to scale down the workers when the pipeline is being throttled by the external service, preventing unnecessary resource usage.
96+
97+
**Dataflow** currently supports this AutoScaler integration for **Batch RunnerV2**. Note that AutoScaler integration for Streaming mode is a known limitation.

website/www/site/layouts/partials/section-menu/en/documentation.html

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,7 @@
213213
<li><a href="/documentation/patterns/schema/">Schema</a></li>
214214
<li><a href="/documentation/patterns/bqml/">BigQuery ML</a></li>
215215
<li><a href="/documentation/patterns/grouping-elements-for-efficient-external-service-calls/">Grouping elements for efficient external service calls</a></li>
216+
<li><a href="/documentation/patterns/rate-limiting/">Rate limiting DoFns and Transforms</a></li>
216217
<li><a href="/documentation/patterns/shared-class/">Cache using a shared object</a></li>
217218
</ul>
218219
</li>

0 commit comments

Comments
 (0)