This document describes the current stable version of Kombu (5.0). For development docs, go here.
Source code for kombu.utils.limits
"""Token bucket implementation for rate limiting."""
from collections import deque
from time import monotonic
__all__ = ('TokenBucket',)
[docs]class TokenBucket:
"""Token Bucket Algorithm.
See Also:
https://en.wikipedia.org/wiki/Token_Bucket
Most of this code was stolen from an entry in the ASPN Python Cookbook:
https://code.activestate.com/recipes/511490/
Warning:
Thread Safety: This implementation is not thread safe.
Access to a `TokenBucket` instance should occur within the critical
section of any multithreaded code.
"""
#: The rate in tokens/second that the bucket will be refilled.
fill_rate = None
#: Maximum number of tokens in the bucket.
capacity = 1
#: Timestamp of the last time a token was taken out of the bucket.
timestamp = None
def __init__(self, fill_rate, capacity=1):
self.capacity = float(capacity)
self._tokens = capacity
self.fill_rate = float(fill_rate)
self.timestamp = monotonic()
self.contents = deque()
[docs] def can_consume(self, tokens=1):
"""Check if one or more tokens can be consumed.
Returns:
bool: true if the number of tokens can be consumed
from the bucket. If they can be consumed, a call will also
consume the requested number of tokens from the bucket.
Calls will only consume `tokens` (the number requested)
or zero tokens -- it will never consume a partial number
of tokens.
"""
if tokens <= self._get_tokens():
self._tokens -= tokens
return True
return False
[docs] def expected_time(self, tokens=1):
"""Return estimated time of token availability.
Returns:
float: the time in seconds.
"""
_tokens = self._get_tokens()
tokens = max(tokens, _tokens)
return (tokens - _tokens) / self.fill_rate
def _get_tokens(self):
if self._tokens < self.capacity:
now = monotonic()
delta = self.fill_rate * (now - self.timestamp)
self._tokens = min(self.capacity, self._tokens + delta)
self.timestamp = now
return self._tokens