Skip to content

throttler

This page describes how you may use our quantpylib.throttler module and the functionalities exposed by our APIs. This module provides both synchronous and asynchronous support for rate-limiting access to function calls via a credit-based semaphore synchronization tool. A simple but common use would be to maximise throughput of API requests to an external server that places rate-limits on the number of API requests per minute/hour with request credits charged against a resource pool. This is made available by the quantpylib.throttler.rate_semaphore module through the RateSemaphore and AsyncRateSemaphore classes - and can be seen as an extension of the synchronization primitives threading.Semaphore and asyncio.locks.Semaphore in the Python standard library respectively. The quantpylib.throttler.decorators module provides decorators for instance methods that wraps threads and coroutines to provide seamless integration with the synchronization features.

A high-level walkthrough of the individual quant packages are presented in this page. Comprehensive documentation may be found in the respective pages. To follow along, make sure you have installed the necessary dependencies. Code example scripts are also provided in the repo.

The module is flexible enough to handle both fixed-cost endpoints (specs such as '20 API requests/min') or variable-cost endpoints (specs such as `5 API credits/getTick', '15 API credits/getOHLCV' with '100 credits/min per client').

Examples

Suppose we have a financial API endpoint that has rate limits as such:

Period Credits Awarded / app
10 seconds 40

The API gives us 40 credits to use every 10 seconds (capped at 40 credits max). Different endpoints can have variable credit costs, depending on the server load.

Suppose we have the following endpoints and their respective costs:

Endpoint Cost (credits / req)
getTick 20
getOHLCV 5..30
getPrice 12.5
... ...

Both synchronous and asynchronous semaphores expose their synchronization features through the semaphore.transact(...) method, which have the same function signature, except that the first parameter to a synchronous transaction is a parameter-less function, and the first parameter to the asynchronous transaction is a coroutine. Their detailed documentations are defined here.

For the demonstration, we require the following imports:

import time
import random
import asyncio
import threading

from datetime import datetime
from quantpylib.throttler.decorators import wrap_in_thread, aconsume_credits
from quantpylib.throttler.rate_semaphore import RateSemaphore,AsyncRateSemaphore

Synchronous Example

Let's begin with the example for the synchronous semaphore, given by the RateSemaphore class. Our objective is to maximise the throughput to a bunch of blocking requests that consume credits on some external server. To simulate blocking requests, such as a requests.get method, we use the time.sleep method. Note that this generalizes to any blocking method.

def getTick(work, id):
    print(f"{datetime.now()}:: getTick processing {id} takes {work} seconds")
    time.sleep(work)
    print(f"{datetime.now()}:: getTick processed {id}")
    return True

def getOHLCV(work, id):
    print(f"{datetime.now()}:: getOHLCV processing {id} takes {work} seconds")
    time.sleep(work)
    print(f"{datetime.now()}:: getOHLCV processed {id}")
    return True

Since we want to maximise throughput, and the function calls block the main thread, we want to create the requests in multiple threads. But since sending all of these requests simultaneously in different threads would overload the market data server (giving us errors/downgraded/blacklisted), we would wrap them in rate-limited transactions and submit them to the semaphore. Let's create some script to run some requests through the semaphore with the transact method :

def sync_example():
    print("-----------------------sync with thread-----------------------")
    sem = RateSemaphore(40)
    tick_req = lambda x: getTick(random.randint(1, 5), x)
    ohlcv_req = lambda x: getOHLCV(random.randint(1, 5), x)

    threads = [
        threading.Thread(target=sem.transact,kwargs={
            "lambda_func":lambda: tick_req(1), 
            "credits":20, 
            "refund_time":10, 
            "transaction_id":1, #optional
            "verbose":True #optional
        }),
        threading.Thread(target=sem.transact,kwargs={
            "lambda_func":lambda: ohlcv_req(2), 
            "credits":30, 
            "refund_time":10, 
            "transaction_id":2, #optional
            "verbose":True #optional
        }),
        threading.Thread(target=sem.transact,kwargs={
            "lambda_func":lambda: ohlcv_req(3), 
            "credits":5, 
            "refund_time":10, 
            "transaction_id":3, 
            "verbose":True
        }),
        threading.Thread(target=sem.transact,kwargs={
            "lambda_func":lambda: tick_req(4), 
            "credits":20, 
            "refund_time":10, 
            "transaction_id":4, 
            "verbose":True
        }),
    ]
    [thread.start() for thread in threads]
    [thread.join() for thread in threads]

if __name__ == "__main__":
    sync_example()
We set the verbosity flag to give us information on when the semaphore executed the transactions, so let us observe the printed information log:
-----------------------sync with thread-----------------------
2024-03-28 19:16:27.105953:: TXN 1 acquiring CreditSemaphore
2024-03-28 19:16:27.106051:: TXN 2 acquiring CreditSemaphore
2024-03-28 19:16:27.106083:: TXN 1 entered CreditSemaphore...
2024-03-28 19:16:27.106161:: TXN 3 acquiring CreditSemaphore
2024-03-28 19:16:27.106303:: TXN 4 acquiring CreditSemaphore
2024-03-28 19:16:27.106367:: TXN 3 entered CreditSemaphore...
2024-03-28 19:16:27.106426:: getOHLCV processing 3 takes 4 seconds
2024-03-28 19:16:27.106449:: getTick processing 1 takes 5 seconds
2024-03-28 19:16:31.106689:: getOHLCV processed 3
2024-03-28 19:16:31.106877:: TXN 3 exits CreditSemaphore, schedule refund in 10...
2024-03-28 19:16:32.107982:: getTick processed 1
2024-03-28 19:16:32.108390:: TXN 1 exits CreditSemaphore, schedule refund in 10...
2024-03-28 19:16:41.109162:: TXN 4 entered CreditSemaphore...
2024-03-28 19:16:41.109260:: getTick processing 4 takes 2 seconds
2024-03-28 19:16:43.110862:: getTick processed 4
2024-03-28 19:16:43.111243:: TXN 4 exits CreditSemaphore, schedule refund in 10...
2024-03-28 19:16:53.113011:: TXN 2 entered CreditSemaphore...
2024-03-28 19:16:53.113060:: getOHLCV processing 2 takes 3 seconds
2024-03-28 19:16:56.115643:: getOHLCV processed 2
2024-03-28 19:16:56.115815:: TXN 2 exits CreditSemaphore, schedule refund in 10...
It is not difficult to reason with the output. For instance, transactions 1 and 3 entered the semaphore first, while 2, 4 was placed on hold, leaving 40 - 20 - 5 = 15 credits. No other transaction can enter the semaphore. Transaction 3 processes and completes at 31 seconds, but the credit is only refunded 10 seconds later. We can see that at 41 seconds, the (5) credits from transaction 3 is refunded, giving us 15 + 5 = 20 credits, enough for transaction 4 to enter the semaphore but not 2. The rest of printed log statements are easy to rationalize.

Easy with Decorators

The example given is nice but somewhat unwiedly due to having to wrap the method/function of interest first in a semaphore transaction, then followed by a threading.Thread object. In many software applications, we have a data-service layer with a poller object/SDK for API calls made to external servers. A simulated example looks something like this:

class _Throttler():
    def __init__(self):
        self.rate_semaphore=RateSemaphore(31)

    @wrap_in_thread(costs=20,refund_in=10)#,attrname="rate_semaphore",verbose=True (optional-defaults)
    def _getTick(self, work, id):
        print(f"{datetime.now()}:: getTick processing {id} takes {work} seconds")
        time.sleep(work)
        print(f"{datetime.now()}:: getTick processed {id}")
        return True

    @wrap_in_thread(costs=10,refund_in=10,attrname="rate_semaphore",verbose=True)
    def _getOHLCV(self, work, id):
        print(f"{datetime.now()}:: getOHLCV processing {id} takes {work} seconds")
        time.sleep(work)
        print(f"{datetime.now()}:: getOHLCV processed {id}")
        return True
The data-access object is given one or more RateSemaphore objects corresponding to a unique credit/resource pool. For each method that makes a resource-consuming request, we can decorate the function using wrap_in_thread with the costs and refund timers as parameters. The decorated function calls then directly return threading.Thread instances wrapping transactions that contain the function request, which can be activated using the threading.Thread.start method.
def sync_example():
    print("-----------------------sync with thread-----------------------")
    #... previous example

    print("-----------------------sync with decorator-----------------------")
    throttler = _Throttler()
    threads = [
        throttler._getTick(3, 1),
        throttler._getOHLCV(1, 2),
        throttler._getTick(3, 3),
        throttler._getOHLCV(1, 4),
    ]
    [thread.start() for thread in threads]
    [thread.join() for thread in threads]
The behavior of the semaphore is similar to the example explored in the previous example. The printed log statements are hence presented without commentary, but may be rationalized easily.
----------------------sync with decorator-----------------------
2024-03-28 19:16:56.116058:: TXN {'fn': '_getTick', 'args': (3, 1), 'kwargs': {}} acquiring CreditSemaphore
2024-03-28 19:16:56.116154:: TXN {'fn': '_getOHLCV', 'args': (1, 2), 'kwargs': {}} acquiring CreditSemaphore
2024-03-28 19:16:56.116220:: TXN {'fn': '_getTick', 'args': (3, 1), 'kwargs': {}} entered CreditSemaphore...
2024-03-28 19:16:56.116302:: TXN {'fn': '_getTick', 'args': (3, 3), 'kwargs': {}} acquiring CreditSemaphore
2024-03-28 19:16:56.116555:: getTick processing 1 takes 3 seconds
2024-03-28 19:16:56.116366:: TXN {'fn': '_getOHLCV', 'args': (1, 2), 'kwargs': {}} entered CreditSemaphore...
2024-03-28 19:16:56.116515:: TXN {'fn': '_getOHLCV', 'args': (1, 4), 'kwargs': {}} acquiring CreditSemaphore
2024-03-28 19:16:56.116691:: getOHLCV processing 2 takes 1 seconds
2024-03-28 19:16:57.120664:: getOHLCV processed 2
2024-03-28 19:16:57.120879:: TXN {'fn': '_getOHLCV', 'args': (1, 2), 'kwargs': {}} exits CreditSemaphore, schedule refund in 10...
2024-03-28 19:16:59.117150:: getTick processed 1
2024-03-28 19:16:59.117323:: TXN {'fn': '_getTick', 'args': (3, 1), 'kwargs': {}} exits CreditSemaphore, schedule refund in 10...
2024-03-28 19:17:07.123469:: TXN {'fn': '_getOHLCV', 'args': (1, 4), 'kwargs': {}} entered CreditSemaphore...
2024-03-28 19:17:07.123532:: getOHLCV processing 4 takes 1 seconds
2024-03-28 19:17:08.125678:: getOHLCV processed 4
2024-03-28 19:17:08.126219:: TXN {'fn': '_getOHLCV', 'args': (1, 4), 'kwargs': {}} exits CreditSemaphore, schedule refund in 10...
2024-03-28 19:17:09.118384:: TXN {'fn': '_getTick', 'args': (3, 3), 'kwargs': {}} entered CreditSemaphore...
2024-03-28 19:17:09.118482:: getTick processing 3 takes 3 seconds
2024-03-28 19:17:12.119387:: getTick processed 3
2024-03-28 19:17:12.119741:: TXN {'fn': '_getTick', 'args': (3, 3), 'kwargs': {}} exits CreditSemaphore, schedule refund in 10...

Asynchronous Example

We now move onto the asynchronous semaphore, given by the AsyncRateSemaphore class. Our objective is to maximise the throughput to a bunch of non-blocking requests that consume credits on some external server. To simulate non-blocking requests, such as an async with session.get method of an aiohttp.ClientSession object or asynchronous database requests, we use the asyncio.sleep method. Note that this generalizes to any non-blocking coroutine.

async def agetTick(work, id):
    print(f"{datetime.now()}:: getTick processing {id} takes {work} seconds")
    await asyncio.sleep(work)
    print(f"{datetime.now()}:: getTick processed {id}")
    return True

async def agetOHLCV(work, id):
    print(f"{datetime.now()}:: getOHLCV processing {id} takes {work} seconds")
    await asyncio.sleep(work)
    print(f"{datetime.now()}:: getOHLCV processed {id}")
    return True

Since we want to maximise throughput, and the function calls are coroutines that do not block the main thread, we want to create the requests concurrently and place them on the event loop. But since sending all of these requests simultaneously in different coroutines would overload the market data server (giving us errors/downgraded/blacklisted), we would wrap them in rate-limited transactions and submit them to the semaphore. Let's create some script to run some request transactions through the semaphore:

async def async_example():
    print("-----------------------async with transactions-----------------------")
    sem = AsyncRateSemaphore(40, greedy_entry=True, greedy_exit=True)

    tick_req = lambda x: agetTick(random.randint(1, 5), x)
    ohlcv_req = lambda x: agetOHLCV(random.randint(1, 5), x)

    transactions = [
        sem.transact(coroutine=tick_req(1), credits=20, refund_time=10, transaction_id=1, verbose=True),
        sem.transact(coroutine=ohlcv_req(2), credits=30, refund_time=10, transaction_id=2, verbose=True),
        sem.transact(coroutine=ohlcv_req(3), credits=5, refund_time=10, transaction_id=3, verbose=True),
        sem.transact(coroutine=tick_req(4), credits=20, refund_time=10, transaction_id=4, verbose=True),
    ]
    await asyncio.gather(*transactions)

if __name__ == "__main__":
    sync_example() #previous example
    asyncio.run(async_example())
We set the verbosity flag to give us information on when the semaphore executed the transactions, so let us observe the printed information log:
-----------------------async with transactions-----------------------
2024-03-28 19:17:12.120864:: TXN 1 acquiring CreditSemaphore
2024-03-28 19:17:12.120915:: TXN 1 entered CreditSemaphore...
2024-03-28 19:17:12.120927:: getTick processing 1 takes 4 seconds
2024-03-28 19:17:12.120964:: TXN 2 acquiring CreditSemaphore
2024-03-28 19:17:12.120988:: TXN 3 acquiring CreditSemaphore
2024-03-28 19:17:12.121006:: TXN 3 entered CreditSemaphore...
2024-03-28 19:17:12.121016:: getOHLCV processing 3 takes 5 seconds
2024-03-28 19:17:12.121044:: TXN 4 acquiring CreditSemaphore
2024-03-28 19:17:16.121665:: getTick processed 1
2024-03-28 19:17:16.121740:: TXN 1 exits CreditSemaphore, schedule refund in 10...
2024-03-28 19:17:17.122353:: getOHLCV processed 3
2024-03-28 19:17:17.122462:: TXN 3 exits CreditSemaphore, schedule refund in 10...
2024-03-28 19:17:26.123119:: TXN 2 entered CreditSemaphore...
2024-03-28 19:17:26.123192:: getOHLCV processing 2 takes 1 seconds
2024-03-28 19:17:27.123519:: getOHLCV processed 2
2024-03-28 19:17:27.123633:: TXN 2 exits CreditSemaphore, schedule refund in 10...
2024-03-28 19:17:37.124999:: TXN 4 entered CreditSemaphore...
2024-03-28 19:17:37.125077:: getTick processing 4 takes 3 seconds
2024-03-28 19:17:40.126377:: getTick processed 4
2024-03-28 19:17:40.126493:: TXN 4 exits CreditSemaphore, schedule refund in 10...
It is not difficult to reason with the output. For instance, transactions 1 and 3 entered the semaphore first, while 2, 4 was placed on hold, leaving 40 - 20 - 5 = 15 credits. No other transaction can enter the semaphore. Transaction 1 processes and completes at 16 seconds, but the credit is only refunded 10 seconds later. We can see that at 26 seconds, the (20) credits from transaction 1 is refunded, giving us 15 + 20 = 35 credits, enough for transaction 2 to enter the semaphore but not both 2 and 4. 2 is admitted first, and the rest of printed log statements are easy to rationalize.

Easy with Decorators

The example given is nice but somewhat unwiedly due to having to wrap the method/function of interest in semaphore transactions. We would like to hide the throttling intricacies at the caller level, such that a user of said data-access layer or SDKs do not need to be aware of the presence of a semaphore. For instance:

class _Throttler():
    def __init__(self):
        #... previous example
        self.arate_semaphore=AsyncRateSemaphore(31)

    #... previous example

    @aconsume_credits(costs=20,refund_in=10,attrname="arate_semaphore") #we want the asynchronous semaphore, and since the default name is not rate_semaphore, we pass in attrname
    async def _agetTick(self, work, id):
        print(f"{datetime.now()}:: getTick processing {id} takes {work} seconds")
        await asyncio.sleep(work)
        print(f"{datetime.now()}:: getTick processed {id}")
        return True

    @aconsume_credits(costs=10,refund_in=10,attrname="arate_semaphore",verbose=True)
    async def _agetOHLCV(self, work, id):
        print(f"{datetime.now()}:: getOHLCV processing {id} takes {work} seconds")
        await asyncio.sleep(work)
        print(f"{datetime.now()}:: getOHLCV processed {id}")
        return True
The data-access object is given one or more AsyncRateSemaphore objects corresponding to a unique credit/resource pool. For each method that makes a resource-consuming request, we can decorate the function using aconsume_credits with the costs and refund timers as parameters. The decorated function calls then are submitted through the object attribute's asynchronous semaphore instance.
async def async_example():
    print("-----------------------async with transactions-----------------------")
    #... previous example

    print("-----------------------async with decorator-----------------------")
    throttler = _Throttler()
    transactions = [
        throttler._agetTick(3, 1),
        throttler._agetOHLCV(1, 2),
        throttler._agetTick(3, 3),
        throttler._agetOHLCV(1, 4),
    ]
    await asyncio.gather(*transactions)
The behavior of the semaphore is similar to the example explored in the previous example. The printed log statements are hence presented without commentary, but may be rationalized easily.
-----------------------async with decorator-----------------------
2024-03-28 19:17:40.126936:: TXN {'fn': '_agetTick', 'args': (3, 1), 'kwargs': {}} acquiring CreditSemaphore
2024-03-28 19:17:40.127015:: TXN {'fn': '_agetTick', 'args': (3, 1), 'kwargs': {}} entered CreditSemaphore...
2024-03-28 19:17:40.127044:: getTick processing 1 takes 3 seconds
2024-03-28 19:17:40.127110:: TXN {'fn': '_agetOHLCV', 'args': (1, 2), 'kwargs': {}} acquiring CreditSemaphore
2024-03-28 19:17:40.127149:: TXN {'fn': '_agetOHLCV', 'args': (1, 2), 'kwargs': {}} entered CreditSemaphore...
2024-03-28 19:17:40.127170:: getOHLCV processing 2 takes 1 seconds
2024-03-28 19:17:40.127217:: TXN {'fn': '_agetTick', 'args': (3, 3), 'kwargs': {}} acquiring CreditSemaphore
2024-03-28 19:17:40.127266:: TXN {'fn': '_agetOHLCV', 'args': (1, 4), 'kwargs': {}} acquiring CreditSemaphore
2024-03-28 19:17:41.127912:: getOHLCV processed 2
2024-03-28 19:17:41.128021:: TXN {'fn': '_agetOHLCV', 'args': (1, 2), 'kwargs': {}} exits CreditSemaphore, schedule refund in 10...
2024-03-28 19:17:43.127485:: getTick processed 1
2024-03-28 19:17:43.127539:: TXN {'fn': '_agetTick', 'args': (3, 1), 'kwargs': {}} exits CreditSemaphore, schedule refund in 10...
2024-03-28 19:17:51.129270:: TXN {'fn': '_agetOHLCV', 'args': (1, 4), 'kwargs': {}} entered CreditSemaphore...
2024-03-28 19:17:51.129314:: getOHLCV processing 4 takes 1 seconds
2024-03-28 19:17:52.130482:: getOHLCV processed 4
2024-03-28 19:17:52.130534:: TXN {'fn': '_agetOHLCV', 'args': (1, 4), 'kwargs': {}} exits CreditSemaphore, schedule refund in 10...
2024-03-28 19:17:53.128193:: TXN {'fn': '_agetTick', 'args': (3, 3), 'kwargs': {}} entered CreditSemaphore...
2024-03-28 19:17:53.128281:: getTick processing 3 takes 3 seconds
2024-03-28 19:17:56.129573:: getTick processed 3
2024-03-28 19:17:56.129683:: TXN {'fn': '_agetTick', 'args': (3, 3), 'kwargs': {}} exits CreditSemaphore, schedule refund in 10...

Notes on Behavior

  • Entry Behavior

    1. AsyncRateSemaphore acquires the semaphore if there are enough resources, but if greedy_entry=False, then the submitted transaction will wait behind the earlier pending transactions regardless of the resource pool availability. Otherwise, any submitted transaction that can run immediately will run without consideration for existing waiters.
    2. RateSemaphore acquires the semaphore immediately if there are enough resources, and otherwise waits. The order in which blocked threads are awakened should not be relied on and is OS-scheduler dependent.
  • Exit Behavior

    Requests submitted to both semaphore types exit from their respective transact method without waiting for the credits to be refunded. The credits are scheduled to be refunded separately on a thread (for synchronous implementations) or the event loop (for asynchronous implementations). When the credit is refunded

    1. AsyncRateSemaphore wakes up pending transactions that are able to execute on the state of the resource pool. If greedy_exit=False, then the number of pending transactions woken up will respect the FIFO order until the resource pool is insufficient for the earliest transaction. Otherwise, when credits are refunded with >=2 waiting transactions with arrival time txn A < txn B. If the semaphore has not enough credits to execute txn A, it can first run txn B. This helps to maximise throughput.

    2. RateSemaphore wakes up pending transactions that are able to execute on the state of the resource pool. The order in which blocked threads are awakened should not be relied on and is OS-scheduler dependent.

  • Exception Behavior

    Failed transactions (raised Exceptions) consume and refund credit in the same way as successful transactions.

Note that non-greedy entry and greedy exit can cause resource-expensive transactions to sit behind cheaper transactions which are constantly being submitted to the semaphore at a fast rate, preventing the expensive transaction from acquiring the semaphore.

Best Practices

  • Wrap unstable networks and expensive requests in a timeout transaction. This is to prevent the coroutine from 'await'-ing forever and hogging the semaphore.

  • Since the transactions wrap native Python functions and coroutines, it does not know when the code actually performs the credit-costing request. The transact functions should be closest to the costful logic as possible. It should not perform heavy compute or multiple requests so that the credits can refunded as quickly as possible for other transactions.

throttler.rate_semaphore

throttler.decorators