pyrate_limiter.limiter module#

Limiter class implementation

class pyrate_limiter.limiter.Limiter(argument, clock=<pyrate_limiter.clocks.TimeClock object>, raise_when_fail=True, max_delay=None)#

Bases: object

This class responsibility is to sum up all underlying logic and make working with async/sync functions easily

__init__(argument, clock=<pyrate_limiter.clocks.TimeClock object>, raise_when_fail=True, max_delay=None)#

Init Limiter using either a single bucket / multiple-bucket factory / single rate / rate list

as_decorator()#

Use limiter decorator Use with both sync & async function

Return type

Callable[[Callable[[Any], Tuple[str, int]]], Callable[[Callable[[Any], Any]], Callable[[Any], Any]]]

bucket_factory#
buckets()#

Get list of active buckets

Return type

List[AbstractBucket]

delay_or_raise(bucket, item)#

On try_acquire failed, handle delay or raise error immediately

Return type

Union[bool, Awaitable[bool]]

dispose(bucket)#

Dispose/Remove a specific bucket, using bucket-id or bucket object as param

Return type

bool

handle_bucket_put(bucket, item)#

Putting item into bucket

Return type

Union[bool, Awaitable[bool]]

lock#
max_delay = None#
raise_when_fail#
try_acquire(name, weight=1)#

Try acquiring an item with name & weight Return true on success, false on failure

Return type

Union[bool, Awaitable[bool]]

class pyrate_limiter.limiter.SingleBucketFactory(bucket, clock)#

Bases: pyrate_limiter.abstracts.bucket.BucketFactory

Single-bucket factory for quick use with Limiter

bucket#
clock#
get(_)#

Get the corresponding bucket to this item

Return type

AbstractBucket

wrap_item(name, weight=1)#

Add the current timestamp to the receiving item using any clock backend - Turn it into a RateItem - Can return either a coroutine or a RateItem instance