Skip to content

quantpylib.throttler.rate_semaphore

quantpylib.throttler.rate_semaphore is our core rate-limiting and throttling module, and exposes two synchronization tools, the quantpylib.throttler.rate_semaphore.AsyncRateSemaphore and quantpylib.throttler.rate_semaphore.RateSemaphore classes.

Instead of having to write server-dependent code on rate-limiting based on error messages and response headers (say, of a HTTP 429 response), the rate-semaphores make rate-limiting on the client side seamless and easy by wrapping native Python objects and synchronization primitives. Any Python function can be rate-limited (with RateSemaphore) and any Python coroutine can be rate-limited (with AsyncRateSemaphore) by submitting transactions to the semaphores through the transact method. The module is flexible enough to handle both fixed-cost endpoints (specs such as '20 API requests/min') or variable-cost endpoints (specs such as `5 API credits/getTick', '15 API credits/getOHLCV' with '100 credits/min per client').

Users would likely only have to interact with the transact method in both classes to use the synchronization features. Additionally, users may opt to decorate request-functions with our quantpylib.throttler.decorators on instance methods that makes interacting with the semaphores a one-liner.

Examples for both decorator usage, and synchronous, asynchronous semaphores are given here.

AsyncRateSemaphore

An asynchronous semaphore implementation that limits access to coroutines (known as transactions) based on available credits, and manages credit-debit accounting of a common resource pool. Coroutines to be executed are submitted to the AsyncRateSemaphore.transact method along with the number of credits it consumes and the time it takes for those credits to be refunded to the resource pool. Can be used to throttle asynchronous work such as non-blocking API requests or database operations. In most cases, users would only need to interact with the AsyncRateSemaphore.transact method.

__init__(credits=1, greedy_entry=False, greedy_exit=True)

Initialize the AsyncRateSemaphore with an initial number of credits.

Parameters:

Name Type Description Default
credits float

The initial number of credits the semaphore starts with. Defaults to 1.

1
greedy_entry bool

Determines whether to enforce a FIFO or greedy policy for pending coroutines on semaphore entry. Defaults to False.

False
greedy_exit bool

Determines whether to enforce a FIFO or greedy policy for waking up pending coroutines on credit refund. Defaults to True.

True

Raises:

Type Description
ValueError

If credits is less than 0.

acquire(require_credits, refund_time) async

Acquire the semaphore, decrementing the resource pool by specified number of credits. If the existing resource pool is larger than credits required, decrement the credits and return immediately. If there is not enough credits on entry, do a non-blocking wait until enough resources are freed up. Additionally, if greedy_entry=False, then the executing transaction will wait behind the earlier pending transactions regardless of the resource pool availability.

Parameters:

Name Type Description Default
require_credits float

The number of credits required to enter the semaphore.

required
refund_time float

The time in seconds after which the credits should be refunded.

required

Returns:

Name Type Description
bool

True when the credits were successfully acquired.

release(taken_credits)

Refund the specified number of credits and wake up pending transactions that are able to execute on the state of the resource pool. Additionally, if greedy_exit=False, then the number of pending transactions woken up will respect the FIFO order until the resource pool is insufficient for the earliest transaction.

Parameters:

Name Type Description Default
taken_credits float

The number of credits to release.

required

transact(coroutine, credits, refund_time, transaction_id=None, verbose=False) async

Execute an asynchronous coroutine object using the semaphore for synchronization, utilizing the AsyncRateSemaphore.acquire and AsyncRateSemaphore.release methods for proper synchronization. The refund mechanism is scheduled on the running event loop and the transact method returns without waiting for credits to be refunded. Failed transactions (raised Exceptions) consume and refund credit in the same way as successful transactions.

Parameters:

Name Type Description Default
coroutine coroutine function

The coroutine to execute as part of the transaction.

required
credits float

The number of credits required for the transaction.

required
refund_time float

The time in seconds after which the credits should be refunded.

required
transaction_id str

Identifier for the transaction. Defaults to None.

None
verbose bool

Whether to print verbose transaction information. Defaults to False.

False

Returns:

Name Type Description
Any

The result of the executed coroutine.

Raises:

Type Description
Exception

If the coroutine raises any exception.

Notes

The argument coroutine should NOT be a asyncio.Task object, since any await statements will trigger its execution on the event loop before the semaphore is acquired.

RateSemaphore

A semaphore implementation that limits the access to method/requests (known as transactions) based on available credits, and manages credit-debit accounting of a common resource pool. Functions to be executed are submitted to the RateSemaphore.transact method along with the number of credits it consumes and the time it takes for those credits to be refunded to the resource pool. Can be used to throttle API requests, database operations and so on. In most cases, the user would only need to interact with the RateSemaphore.transact method.

__init__(credits=1)

Initialize the RateSemaphore with an initial number of credits.

Parameters:

Name Type Description Default
credits float

The initial number of credits the semaphore starts with. Defaults to 1.

1

Raises:

Type Description
ValueError

If credits is less than 0.

acquire(require_credits)

Acquire the semaphore, decrementing the resource pool by specified number of credits. If the existing resource pool is larger than credits required, decrement the credits and return immediately. If there is not enough credits on entry, block, wait until some other thread has called release() until enough resources are freed up. This is done with proper interlocking so that if multiple acquire() calls are blocked, release() will wake exactly one of them up. The implementation may pick one at random, so the order in which blocked threads are awakened should not be relied on and is OS-scheduler dependent.

Parameters:

Name Type Description Default
require_credits float

The number of credits required to enter the semaphore.

required

Returns:

Name Type Description
bool

True when the credits were successfully acquired.

release(taken_credits)

Refund the specified number of credits.

Parameters:

Name Type Description Default
taken_credits float

The number of credits to release.

required

transact(lambda_func, credits, refund_time, transaction_id=None, verbose=False)

Execute a parameter-less function using the semaphore for synchronization, using the RateSemaphore.acquire and RateSemaphore.release methods for proper synchronization. The refund mechanism is scheduled in a worker thread and the transact method returns without waiting for credits to be refunded. Failed transactions (raised Exceptions) consume and refund credit in the same way as successful transactions.

Parameters:

Name Type Description Default
lambda_func callable

The function to execute as part of the transaction. Should not take in any parameters.

required
credits float

The number of credits required for the transaction.

required
refund_time float

The time in seconds after which the credits should be refunded.

required
transaction_id str

Identifier for the transaction. Defaults to None.

None
verbose bool

Whether to print verbose transaction information. Defaults to False.

False

Returns:

Name Type Description
Any

The result of the transaction function.

Raises:

Type Description
Exception

If the transaction function raises any exception.

Notes

Any function func that takes in *args, **kwargs can easily be used with the semaphore by passing in lambda_func = lambda : func(*args,**kwargs).