pyrate_limiter.abstracts.bucket module#
Implement this class to create a workable bucket for Limiter to use
- class pyrate_limiter.abstracts.bucket.AbstractBucket#
Bases:
abc.ABC
Base bucket interface Assumption: len(rates) always > 0 TODO: allow empty rates
- failing_rate = None#
- abstract flush()#
Flush the whole bucket - Must remove failing-rate after flushing
- abstract leak(current_timestamp=None)#
leaking bucket - removing items that are outdated
- abstract peek(index)#
Peek at the rate-item at a specific index in latest-to-earliest order NOTE: The reason we cannot peek from the start of the queue(earliest-to-latest) is we can’t really tell how many outdated items are still in the queue
- abstract put(item)#
Put an item (typically the current time) in the bucket return true if successful, otherwise false
- rates#
- class pyrate_limiter.abstracts.bucket.BucketFactory#
Bases:
abc.ABC
Asbtract BucketFactory class. It is reserved for user to implement/override this class with his own bucket-routing/creating logic
- create(clock, bucket_class, *args, **kwargs)#
Creating a bucket dynamically
- Return type
- abstract get(item)#
Get the corresponding bucket to this item
- Return type
- get_buckets()#
Iterator over all buckets in the factory
- Return type
- schedule_leak(new_bucket, associated_clock)#
Schedule all the buckets’ leak, reset bucket’s failing rate
- Return type
- class pyrate_limiter.abstracts.bucket.Leaker(leak_interval)#
Bases:
threading.Thread
Responsible for scheduling buckets’ leaking at the background either through a daemon task(for sync buckets) or a task using asyncio.Task
- aio_leak_task = None#
- async_buckets = None#
- clocks = None#
- daemon = True#
- leak_async()#
- leak_interval = 10000#
- name = "PyrateLimiter's Leaker"#
- register(bucket, clock)#
Register a new bucket with its associated clock
- sync_buckets = None#