PyrateLimiter¶
PyrateLimiter
A fast, async-friendly rate limiter for Python — Leaky-Bucket algorithm with pluggable backends.
Documentation · Quickstart · Backends · Migrating from v3
[!NOTE]
Upgrading from v3.x? See the Migration Guide for breaking changes.
Features¶
🪣 Leaky-bucket algorithm — smooth, well-understood rate limiting.
⏱️ Multiple rates at once — e.g. 5/second and 1000/hour on the same key.
🔑 Per-key limits — track different services, users, or resources independently.
🧩 Pluggable backends — in-memory, SQLite, Redis (sync and async), Postgres, and multiprocessing.
⚡ Sync & async — the same API works in both; async paths never block the event loop.
🎀 Direct calls or decorators —
limiter.try_acquire(...)or@limiter.as_decorator(...).🚦 Blocking or non-blocking — wait for a permit (with optional timeout) or fail fast.
Installation¶
PyrateLimiter requires Python 3.10+.
pip install pyrate-limiter
# or
conda install --channel conda-forge pyrate-limiter
Optional backends pull in their own drivers:
pip install "pyrate-limiter[all]" # redis + psycopg (Postgres) + filelock
Quickstart¶
Limit to 5 requests per 2 seconds:
from pyrate_limiter import Duration, Rate, Limiter
# A Limiter with a single rate, backed by an in-memory bucket
limiter = Limiter(Rate(5, Duration.SECOND * 2))
# Blocking (default): waits until a permit is available
for i in range(6):
limiter.try_acquire("my-resource")
print(f"acquired {i}")
# Non-blocking: returns False immediately when the bucket is full
if not limiter.try_acquire("my-resource", blocking=False):
print("rate limited!")
Prefer a one-liner? The limiter_factory covers the common cases:
from pyrate_limiter import Duration, limiter_factory
limiter = limiter_factory.create_inmemory_limiter(rate_per_duration=5, duration=Duration.SECOND)
limiter.try_acquire("my-resource")
How it works¶
flowchart TB
user([Your application]):::ext
user -->|"limit a key: try_acquire · try_acquire_async · @as_decorator"| limiter
subgraph pyrate["PyrateLimiter"]
direction TB
limiter["Limiter<br/>public API"]:::api
factory["BucketFactory<br/>routes each key to its bucket"]:::core
leaker["Leaker<br/>background cleanup"]:::leak
clock(["Clock<br/>time source"]):::clk
limiter --> factory
factory -->|routes to| backends
leaker -.->|periodic leak| backends
backends -->|reads time from| clock
subgraph backends["Bucket backend — choose one"]
direction LR
mem["InMemory"]:::bkt
sqlite["SQLite"]:::bkt
redis["Redis<br/>sync · async"]:::bkt
pg["Postgres"]:::bkt
mp["Multiprocess"]:::bkt
end
end
classDef api fill:#E5484D,color:#ffffff,stroke:#E5484D;
classDef core fill:#242A33,color:#ffffff,stroke:#242A33;
classDef bkt fill:#EEF2F6,color:#242A33,stroke:#CBD5E1;
classDef clk fill:#ffffff,color:#242A33,stroke:#242A33;
classDef leak fill:#F2C94C,color:#242A33,stroke:#E0B53C;
classDef ext fill:#ffffff,color:#242A33,stroke:#9AA5B1;
The bucket analogy — this library implements the Leaky Bucket algorithm:
A bucket represents a fixed capacity (a service, an API quota, …).
The bucket fills as requests arrive and leaks at a constant rate — the permitted request rate.
When the bucket is full, new requests are delayed (blocking) or rejected (non-blocking).
Core concepts¶
Component |
Role |
|---|---|
|
Timestamps incoming items. Only needs |
|
Stores timestamped items; enforces the rates; |
|
Timestamps & routes each item to the right bucket; schedules background leaking. |
|
The friendly façade. Sync/async, blocking/non-blocking, direct call or decorator; thread-safe via |
For simple cases you only ever touch Limiter and Rate — the rest is wired up for you.
Defining rates & buckets¶
An API might allow 500/hour, 1000/day, and 10000/month. Express each as a Rate(limit, interval):
from pyrate_limiter import Duration, Rate
rates = [
Rate(500, Duration.HOUR), # 500 requests per hour
Rate(1000, Duration.DAY), # 1000 requests per day
Rate(10000, Duration.WEEK * 4), # ~10000 requests per month
]
[!IMPORTANT] Rates must be ordered generous-to-tight: increasing interval, increasing limit, and a non-increasing
limit/intervalratio. Ill-formed lists raiseValueErrorat construction. Check a list yourself withvalidate_rate_list(rates).
Pass the rates straight to a Limiter (uses an in-memory bucket), or build a specific bucket:
from pyrate_limiter import InMemoryBucket, Limiter
limiter = Limiter(rates) # shortcut: in-memory bucket
# equivalent to:
limiter = Limiter(InMemoryBucket(rates))
limiter.try_acquire("hello world")
See Backends for Redis, SQLite, Postgres, and multiprocessing.
Everyday usage¶
Blocking, non-blocking & timeout¶
try_acquire blocks by default until a permit frees up:
from pyrate_limiter import Rate, Limiter, Duration
limiter = Limiter(Rate(3, Duration.SECOND))
for i in range(5):
limiter.try_acquire("item") # blocks when the bucket is full
Fail fast instead with blocking=False:
if not limiter.try_acquire("item", blocking=False):
print("rate limited!")
In async code use try_acquire_async, optionally with a timeout (seconds):
acquired = await limiter.try_acquire_async("item", timeout=5)
if not acquired:
print("timed out waiting for a permit")
The buffer_ms Limiter parameter (default 50) adds a small slack to absorb clock drift:
from pyrate_limiter import Rate, Duration, InMemoryBucket, Limiter
bucket = InMemoryBucket([Rate(5, Duration.SECOND)])
limiter = Limiter(bucket, buffer_ms=100)
Weight¶
Items can carry weight (default 1). An item of weight W consumes W unit-slots atomically — either all W fit or none do:
BigItem(weight=5, name="item") → 5 × item(weight=1, name="item", same timestamp)
limiter.try_acquire("the-sun", weight=10)
Decorator¶
as_decorator wraps any sync or async function:
from pyrate_limiter import Rate, Duration, Limiter
limiter = Limiter(Rate(5, Duration.SECOND))
@limiter.as_decorator(name="api_call", weight=1)
def handle_something(*args, **kwargs):
...
@limiter.as_decorator(name="background_job", weight=2)
async def handle_something_async(*args, **kwargs):
...
Context manager¶
Limiter releases its resources (background leak tasks, connections) on exit:
from pyrate_limiter import Rate, Duration, Limiter
with Limiter(Rate(5, Duration.SECOND)) as limiter:
limiter.try_acquire("item")
# resources released here
# …or close manually
limiter = Limiter(Rate(5, Duration.SECOND))
try:
limiter.try_acquire("item")
finally:
limiter.close()
asyncio & event loops¶
Use try_acquire_async so waiting uses asyncio.sleep instead of blocking the loop. With a sync bucket, wrap it in BucketAsyncWrapper:
from pyrate_limiter import BucketAsyncWrapper, InMemoryBucket, Rate, Duration, Limiter
limiter = Limiter(BucketAsyncWrapper(InMemoryBucket([Rate(5, Duration.SECOND)])))
await limiter.try_acquire_async("item")
Backends¶
Backend |
Sync |
Async |
Persistent |
Multi-process |
Best for |
|---|---|---|---|---|---|
InMemoryBucket |
✅ |
(wrap) |
❌ |
❌ |
single process, fastest |
SQLiteBucket |
✅ |
❌ |
✅ |
✅ (file lock) |
persistence / one host, many processes |
RedisBucket |
✅ |
✅ |
✅ |
✅ |
distributed across hosts |
PostgresBucket |
✅ |
❌ |
✅ |
✅ |
distributed, already on Postgres |
MultiprocessBucket |
✅ |
(wrap) |
❌ |
✅ |
a single |
BucketAsyncWrapper |
— |
✅ |
— |
— |
make any sync bucket async-safe |
Every bucket takes a List[Rate].
InMemoryBucket¶
from pyrate_limiter import InMemoryBucket, Rate, Duration
bucket = InMemoryBucket([Rate(5, Duration.MINUTE * 2)])
RedisBucket¶
Stores items in a sorted set (key = item name, score = timestamp). Use the init classmethod — it works for sync and async clients (just await it for async):
from pyrate_limiter import RedisBucket, Rate, Duration
rates = [Rate(5, Duration.MINUTE * 2)]
# sync
from redis import ConnectionPool, Redis
redis_db = Redis(connection_pool=ConnectionPool.from_url("redis://localhost:6379"))
bucket = RedisBucket.init(rates, redis_db, "bucket-key")
# async
from redis.asyncio import ConnectionPool as AsyncPool, Redis as AsyncRedis
redis_db = AsyncRedis(connection_pool=AsyncPool.from_url("redis://localhost:6379"))
bucket = await RedisBucket.init(rates, redis_db, "bucket-key")
SQLiteBucket¶
Persists state to SQLite (sync only):
from pyrate_limiter import SQLiteBucket, Rate, Duration, Limiter
rate = Rate(5, Duration.MINUTE)
# set use_file_lock=True to share one DB file across processes on a host
bucket = SQLiteBucket.init_from_file([rate], use_file_lock=False)
limiter = Limiter(bucket)
init_from_file(rates, table="rate_bucket", db_path=None, create_new_table=True, use_file_lock=False) — db_path=None uses a temp file; use_file_lock=True uses filelock for multi-process access on a single host.
PostgresBucket¶
Requires psycopg[pool] (install via the [all] extra). Sync only. Use the built-in PostgresClock, or a custom time source:
from pyrate_limiter import PostgresBucket, Rate, PostgresClock
from psycopg_pool import ConnectionPool
pool = ConnectionPool("postgresql://postgres:postgres@localhost:5432")
bucket = PostgresBucket(pool, "my_bucket_table", [Rate(3, 1000), Rate(4, 1500)])
MultiprocessBucket¶
Shares a ListProxy across a multiprocessing pool / ProcessPoolExecutor, guarded by a multiprocessing lock. See in_memory_multiprocess.py.
Under contention
bucket.waitingestimates can be off, so prefertry_acquire(..., blocking=True)(the default) — the item keeps retrying instead of returningFalseon a transient miss.
BucketAsyncWrapper¶
Wraps a sync bucket so every method returns an awaitable, letting the Limiter use asyncio.sleep during delays. See asyncio & event loops.
Web request rate limiting¶
Drop-in helpers for the popular HTTP clients live in pyrate_limiter.extras:
AIOHTTP
from pyrate_limiter import Duration, limiter_factory
from pyrate_limiter.extras.aiohttp_limiter import RateLimitedSession
limiter = limiter_factory.create_inmemory_limiter(rate_per_duration=2, duration=Duration.SECOND)
session = RateLimitedSession(limiter)
HTTPX
import httpx
from pyrate_limiter import Duration, limiter_factory
from pyrate_limiter.extras.httpx_limiter import AsyncRateLimiterTransport, RateLimiterTransport
limiter = limiter_factory.create_inmemory_limiter(rate_per_duration=1, duration=Duration.SECOND)
with httpx.Client(transport=RateLimiterTransport(limiter=limiter)) as client:
client.get("https://example.com")
async with httpx.AsyncClient(transport=AsyncRateLimiterTransport(limiter=limiter)) as client:
await client.get("https://example.com")
Requests
from pyrate_limiter import Duration, limiter_factory
from pyrate_limiter.extras.requests_limiter import RateLimitedRequestsSession
limiter = limiter_factory.create_inmemory_limiter(rate_per_duration=2, duration=Duration.SECOND)
session = RateLimitedRequestsSession(limiter)
Advanced usage¶
Custom routing with BucketFactory¶
When items must be routed to different buckets (per user, per endpoint, …), implement a BucketFactory. At minimum, define wrap_item and get:
from pyrate_limiter import (
AbstractBucket, BucketFactory, RateItem, MonotonicClock,
InMemoryBucket, Rate, Duration, Limiter,
)
class SingleRouteFactory(BucketFactory):
def __init__(self, clock, bucket):
self.clock = clock
self.bucket = bucket
self.schedule_leak(bucket) # run background leaking for this bucket
def wrap_item(self, name: str, weight: int = 1) -> RateItem:
return RateItem(name, self.clock.now(), weight=weight)
def get(self, _item: RateItem) -> AbstractBucket:
return self.bucket
To create buckets on demand, use self.create(bucket_class, *args, **kwargs) — it builds the bucket and schedules its leak:
class PerNameFactory(BucketFactory):
def __init__(self, clock):
self.clock = clock
self.buckets = {}
def wrap_item(self, name: str, weight: int = 1) -> RateItem:
return RateItem(name, self.clock.now(), weight=weight)
def get(self, item: RateItem) -> AbstractBucket:
if item.name not in self.buckets:
self.buckets[item.name] = self.create(InMemoryBucket, [Rate(5, Duration.SECOND)])
return self.buckets[item.name]
Then hand the factory to a Limiter:
limiter = Limiter(SingleRouteFactory(MonotonicClock(), InMemoryBucket([Rate(5, Duration.SECOND)])))
limiter.try_acquire("the-earth")
limiter.try_acquire("the-sun", weight=100)
Custom & distributed clocks¶
In v4 each bucket owns its time source via bucket.now() — the Limiter no longer takes a clock= parameter. To make distributed workers agree on “now” (e.g. a shared Redis/DB clock), either override now() on a bucket subclass (works on every backend, keeps leak consistent), or assign a clock to buckets that delegate to self._clock (e.g. InMemoryBucket, PostgresBucket):
from pyrate_limiter import AbstractClock, InMemoryBucket, RedisBucket, Rate, Duration
class RedisClock(AbstractClock):
def __init__(self, redis):
self.redis = redis
def now(self) -> int:
seconds, microseconds = self.redis.time()
return seconds * 1000 + microseconds // 1000
# Option A — override now() (recommended)
class RedisTimeBucket(RedisBucket):
def now(self) -> int:
seconds, microseconds = self.redis.time()
return seconds * 1000 + microseconds // 1000
# Option B — inject a clock into a bucket that uses self._clock
bucket = InMemoryBucket([Rate(5, Duration.SECOND)])
bucket._clock = RedisClock(redis_client)
Built-in clocks: MonotonicClock (default), MonotonicAsyncClock, PostgresClock, SQLiteClock.
Leaking¶
Buckets shouldn’t hold items forever. Each bucket implements leak(current_timestamp=None) to drop expired items, and BucketFactory.schedule_leak(bucket) runs that in the background (default interval 10 s):
factory.schedule_leak(bucket) # background leak for this bucket
Change the interval (in milliseconds) via the leak_interval property:
class MyFactory(BucketFactory):
def __init__(self, clock, buckets):
self.clock = clock
self.leak_interval = 5000 # leak every 5s
for bucket in buckets:
self.schedule_leak(bucket)
Concurrency¶
Locking is handled at the Limiter level. try_acquire takes a thread RLock; try_acquire_async takes a loop-local asyncio.Lock in front of the RLock; MultiprocessBucket adds a multiprocessing lock on top. (SQLiteBucket manages its own locking.)
Custom backends¶
Implement pyrate_limiter.AbstractBucket to add your own backend. The test suite doubles as a conformance spec:
Fork the repo.
Implement your bucket against
AbstractBucket.Add a
create_buckettotests/conftest.pyand wire it into thecreate_bucketfixture.Run the suite — if it passes, your backend is good to go.
Examples¶
asyncio_ratelimit.py — rate-limiting asyncio tasks
asyncio_decorator.py — the decorator with async functions
httpx_ratelimiter.py — HTTPX, sync / async / multiprocess
in_memory_multiprocess.py — multiprocessing with an in-memory bucket
sqlite_filelock_multiprocess.py — multiprocessing with SQLite + a file lock
Full docs at pyratelimiter.readthedocs.io · Contributing · Changelog
Reference Documentation¶
- API Reference
- pyrate_limiter.abstracts package
- pyrate_limiter.abstracts.bucket module
- pyrate_limiter.abstracts.rate module
- pyrate_limiter.abstracts.wrappers module
- pyrate_limiter.buckets package
- pyrate_limiter.buckets.in_memory_bucket module
- pyrate_limiter.buckets.mp_bucket module
- pyrate_limiter.buckets.postgres module
- pyrate_limiter.buckets.redis_bucket module
- pyrate_limiter.buckets.sqlite_bucket module
- pyrate_limiter.clocks module
- pyrate_limiter.extras package
- pyrate_limiter.extras.aiohttp_limiter module
- pyrate_limiter.extras.httpx_limiter module
- pyrate_limiter.extras.requests_limiter module
- pyrate_limiter.limiter module
- pyrate_limiter.limiter_factory module
- pyrate_limiter.utils module
- Migrating to PyrateLimiter 4.0
- Changelog
- [4.3.1]
- [4.3.0]
- [4.0.0]
- [3.9.0]
- [3.8.1]
- [3.8.0]
- [3.7.1]
- [3.7.0]
- [3.6.2]
- [3.6.1]
- [3.6.0]
- [3.5.1]
- [3.5.0]
- [3.4.1]
- [3.4.0]
- [3.3.0]
- [3.2.1] - 2024-02-13
- [3.1.1] - 2024-01-02
- [3.1.0] - 2023-08-28
- [3.0.2] - 2023-08-28
- [3.0.0] - 2023-08-28
- [2.10.0] - 2023-02-26
- [2.9.1] - 2023-02-26
- [2.8.5] - TBD
- [2.8.4] - 2022-11-23
- [2.8.3] - 2022-10-17
- [2.8.2] - 2022-09-24
- [2.8.1] - 2022-04-11
- [2.8.0] - 2022-04-10
- [2.7.0] - 2022-04-06
- [2.6.3] - 2022-04-05
- [2.6.2] - 2022-03-30
- [2.6.1] - 2022-03-30
- [2.6.0] - 2021-12-08
- [2.5.0] - 2021-12-08
- [2.4.6] - 2021-09-30
- [2.3.6] - 2021-09-23
- [2.3.5] - 2021-09-22
- [2.3.4] - 2021-06-01
- [2.3.3] - 2021-05-08
- [2.3.2] - 2021-05-06
- [2.3.1] - 2021-04-26
- [2.3.0] - 2021-03-01
- [2.2.2] - 2021-03-03
- [2.2.1] - 2021-03-02
- [2.2.0] - 2021-02-26
- [2.1.0] - 2021-02-21
- [2.0.3] - 2020-06-01
- [2.0.2] - 2020-06-01
- [2.0.1] - 2020-06-01
- [2.0.0] - 2019-12-29
- [1.1.0] - 2019-12-17
- Contributing Guide
- pyrate_limiter