throttler
This page describes how you may use our quantpylib.throttler
module and the functionalities exposed by our APIs. This module provides both synchronous and asynchronous support for rate-limiting access to function calls via a credit-based
semaphore synchronization tool. A simple but common use would be to maximise throughput of API requests to an external server that places rate-limits on the number of API requests per minute/hour with request credits charged against a resource pool.
This is made available by the quantpylib.throttler.rate_semaphore
module through
the RateSemaphore
and AsyncRateSemaphore
classes - and can be seen as an extension of the synchronization
primitives threading.Semaphore
and asyncio.locks.Semaphore
in the Python standard library respectively.
The quantpylib.throttler.decorators
module provides decorators for instance methods that wraps threads and coroutines to provide seamless integration with the synchronization features.
A high-level walkthrough of the individual quant packages are presented in this page. Comprehensive documentation may be found in the respective pages. To follow along, make sure you have installed the necessary dependencies. Code example scripts are also provided in the repo.
The module is flexible enough to handle both fixed-cost endpoints (specs such as '20 API requests/min') or variable-cost endpoints (specs such as `5 API credits/getTick', '15 API credits/getOHLCV' with '100 credits/min per client').
Examples
Suppose we have a financial API endpoint that has rate limits as such:
Period | Credits Awarded / app |
---|---|
10 seconds | 40 |
The API gives us 40 credits to use every 10 seconds (capped at 40 credits max). Different endpoints can have variable credit costs, depending on the server load.
Suppose we have the following endpoints and their respective costs:
Endpoint | Cost (credits / req) |
---|---|
getTick | 20 |
getOHLCV | 5..30 |
getPrice | 12.5 |
... | ... |
Both synchronous and asynchronous semaphores expose their synchronization features through the semaphore.transact(...)
method, which have the same function signature, except
that the first parameter to a synchronous transaction is a parameter-less function, and the first parameter to the asynchronous transaction is a coroutine. Their detailed
documentations are defined here.
For the demonstration, we require the following imports:
import time
import random
import asyncio
import threading
from datetime import datetime
from quantpylib.throttler.decorators import wrap_in_thread, aconsume_credits
from quantpylib.throttler.rate_semaphore import RateSemaphore,AsyncRateSemaphore
Synchronous Example
Let's begin with the example for the synchronous semaphore, given by the RateSemaphore
class. Our objective is to maximise the throughput to a bunch of blocking requests that consume credits on some external server. To simulate blocking requests, such as a requests.get
method, we use the time.sleep
method.
Note that this generalizes to any blocking method.
def getTick(work, id):
print(f"{datetime.now()}:: getTick processing {id} takes {work} seconds")
time.sleep(work)
print(f"{datetime.now()}:: getTick processed {id}")
return True
def getOHLCV(work, id):
print(f"{datetime.now()}:: getOHLCV processing {id} takes {work} seconds")
time.sleep(work)
print(f"{datetime.now()}:: getOHLCV processed {id}")
return True
Since we want to maximise throughput, and the function calls block the main thread, we want to create the requests in multiple threads. But since sending all of these requests simultaneously in different threads would overload the market data server (giving us errors/downgraded/blacklisted), we would wrap them in rate-limited transactions and submit them to the semaphore.
Let's create some script to run some requests through the semaphore with the
transact
method :
def sync_example():
print("-----------------------sync with thread-----------------------")
sem = RateSemaphore(40)
tick_req = lambda x: getTick(random.randint(1, 5), x)
ohlcv_req = lambda x: getOHLCV(random.randint(1, 5), x)
threads = [
threading.Thread(target=sem.transact,kwargs={
"lambda_func":lambda: tick_req(1),
"credits":20,
"refund_time":10,
"transaction_id":1, #optional
"verbose":True #optional
}),
threading.Thread(target=sem.transact,kwargs={
"lambda_func":lambda: ohlcv_req(2),
"credits":30,
"refund_time":10,
"transaction_id":2, #optional
"verbose":True #optional
}),
threading.Thread(target=sem.transact,kwargs={
"lambda_func":lambda: ohlcv_req(3),
"credits":5,
"refund_time":10,
"transaction_id":3,
"verbose":True
}),
threading.Thread(target=sem.transact,kwargs={
"lambda_func":lambda: tick_req(4),
"credits":20,
"refund_time":10,
"transaction_id":4,
"verbose":True
}),
]
[thread.start() for thread in threads]
[thread.join() for thread in threads]
if __name__ == "__main__":
sync_example()
-----------------------sync with thread-----------------------
2024-03-28 19:16:27.105953:: TXN 1 acquiring CreditSemaphore
2024-03-28 19:16:27.106051:: TXN 2 acquiring CreditSemaphore
2024-03-28 19:16:27.106083:: TXN 1 entered CreditSemaphore...
2024-03-28 19:16:27.106161:: TXN 3 acquiring CreditSemaphore
2024-03-28 19:16:27.106303:: TXN 4 acquiring CreditSemaphore
2024-03-28 19:16:27.106367:: TXN 3 entered CreditSemaphore...
2024-03-28 19:16:27.106426:: getOHLCV processing 3 takes 4 seconds
2024-03-28 19:16:27.106449:: getTick processing 1 takes 5 seconds
2024-03-28 19:16:31.106689:: getOHLCV processed 3
2024-03-28 19:16:31.106877:: TXN 3 exits CreditSemaphore, schedule refund in 10...
2024-03-28 19:16:32.107982:: getTick processed 1
2024-03-28 19:16:32.108390:: TXN 1 exits CreditSemaphore, schedule refund in 10...
2024-03-28 19:16:41.109162:: TXN 4 entered CreditSemaphore...
2024-03-28 19:16:41.109260:: getTick processing 4 takes 2 seconds
2024-03-28 19:16:43.110862:: getTick processed 4
2024-03-28 19:16:43.111243:: TXN 4 exits CreditSemaphore, schedule refund in 10...
2024-03-28 19:16:53.113011:: TXN 2 entered CreditSemaphore...
2024-03-28 19:16:53.113060:: getOHLCV processing 2 takes 3 seconds
2024-03-28 19:16:56.115643:: getOHLCV processed 2
2024-03-28 19:16:56.115815:: TXN 2 exits CreditSemaphore, schedule refund in 10...
Easy with Decorators
The example given is nice but somewhat unwiedly due to having to wrap the method/function of interest first in a semaphore
transaction, then followed by a threading.Thread
object. In many software applications, we have a data-service layer with a poller object/SDK for API calls made to
external servers. A simulated example looks something like this:
class _Throttler():
def __init__(self):
self.rate_semaphore=RateSemaphore(31)
@wrap_in_thread(costs=20,refund_in=10)#,attrname="rate_semaphore",verbose=True (optional-defaults)
def _getTick(self, work, id):
print(f"{datetime.now()}:: getTick processing {id} takes {work} seconds")
time.sleep(work)
print(f"{datetime.now()}:: getTick processed {id}")
return True
@wrap_in_thread(costs=10,refund_in=10,attrname="rate_semaphore",verbose=True)
def _getOHLCV(self, work, id):
print(f"{datetime.now()}:: getOHLCV processing {id} takes {work} seconds")
time.sleep(work)
print(f"{datetime.now()}:: getOHLCV processed {id}")
return True
RateSemaphore
objects corresponding to a unique credit/resource pool. For each
method that makes a resource-consuming request, we can decorate the function using wrap_in_thread
with the costs and refund timers as parameters.
The decorated function calls then directly return threading.Thread
instances wrapping transactions that contain the function request, which can be activated using
the threading.Thread.start
method.
def sync_example():
print("-----------------------sync with thread-----------------------")
#... previous example
print("-----------------------sync with decorator-----------------------")
throttler = _Throttler()
threads = [
throttler._getTick(3, 1),
throttler._getOHLCV(1, 2),
throttler._getTick(3, 3),
throttler._getOHLCV(1, 4),
]
[thread.start() for thread in threads]
[thread.join() for thread in threads]
----------------------sync with decorator-----------------------
2024-03-28 19:16:56.116058:: TXN {'fn': '_getTick', 'args': (3, 1), 'kwargs': {}} acquiring CreditSemaphore
2024-03-28 19:16:56.116154:: TXN {'fn': '_getOHLCV', 'args': (1, 2), 'kwargs': {}} acquiring CreditSemaphore
2024-03-28 19:16:56.116220:: TXN {'fn': '_getTick', 'args': (3, 1), 'kwargs': {}} entered CreditSemaphore...
2024-03-28 19:16:56.116302:: TXN {'fn': '_getTick', 'args': (3, 3), 'kwargs': {}} acquiring CreditSemaphore
2024-03-28 19:16:56.116555:: getTick processing 1 takes 3 seconds
2024-03-28 19:16:56.116366:: TXN {'fn': '_getOHLCV', 'args': (1, 2), 'kwargs': {}} entered CreditSemaphore...
2024-03-28 19:16:56.116515:: TXN {'fn': '_getOHLCV', 'args': (1, 4), 'kwargs': {}} acquiring CreditSemaphore
2024-03-28 19:16:56.116691:: getOHLCV processing 2 takes 1 seconds
2024-03-28 19:16:57.120664:: getOHLCV processed 2
2024-03-28 19:16:57.120879:: TXN {'fn': '_getOHLCV', 'args': (1, 2), 'kwargs': {}} exits CreditSemaphore, schedule refund in 10...
2024-03-28 19:16:59.117150:: getTick processed 1
2024-03-28 19:16:59.117323:: TXN {'fn': '_getTick', 'args': (3, 1), 'kwargs': {}} exits CreditSemaphore, schedule refund in 10...
2024-03-28 19:17:07.123469:: TXN {'fn': '_getOHLCV', 'args': (1, 4), 'kwargs': {}} entered CreditSemaphore...
2024-03-28 19:17:07.123532:: getOHLCV processing 4 takes 1 seconds
2024-03-28 19:17:08.125678:: getOHLCV processed 4
2024-03-28 19:17:08.126219:: TXN {'fn': '_getOHLCV', 'args': (1, 4), 'kwargs': {}} exits CreditSemaphore, schedule refund in 10...
2024-03-28 19:17:09.118384:: TXN {'fn': '_getTick', 'args': (3, 3), 'kwargs': {}} entered CreditSemaphore...
2024-03-28 19:17:09.118482:: getTick processing 3 takes 3 seconds
2024-03-28 19:17:12.119387:: getTick processed 3
2024-03-28 19:17:12.119741:: TXN {'fn': '_getTick', 'args': (3, 3), 'kwargs': {}} exits CreditSemaphore, schedule refund in 10...
Asynchronous Example
We now move onto the asynchronous semaphore, given by the AsyncRateSemaphore
class. Our objective is to maximise the throughput to a bunch of non-blocking requests that consume credits on some external server. To simulate non-blocking requests, such as an
async with session.get
method of an aiohttp.ClientSession
object or asynchronous database requests, we use the
asyncio.sleep
method. Note that this generalizes to any non-blocking coroutine.
async def agetTick(work, id):
print(f"{datetime.now()}:: getTick processing {id} takes {work} seconds")
await asyncio.sleep(work)
print(f"{datetime.now()}:: getTick processed {id}")
return True
async def agetOHLCV(work, id):
print(f"{datetime.now()}:: getOHLCV processing {id} takes {work} seconds")
await asyncio.sleep(work)
print(f"{datetime.now()}:: getOHLCV processed {id}")
return True
Since we want to maximise throughput, and the function calls are coroutines that do not block the main thread, we want to create the requests concurrently and place them on the event loop. But since sending all of these requests simultaneously in different coroutines would overload the market data server (giving us errors/downgraded/blacklisted), we would wrap them in rate-limited transactions and submit them to the semaphore. Let's create some script to run some request transactions through the semaphore:
async def async_example():
print("-----------------------async with transactions-----------------------")
sem = AsyncRateSemaphore(40, greedy_entry=True, greedy_exit=True)
tick_req = lambda x: agetTick(random.randint(1, 5), x)
ohlcv_req = lambda x: agetOHLCV(random.randint(1, 5), x)
transactions = [
sem.transact(coroutine=tick_req(1), credits=20, refund_time=10, transaction_id=1, verbose=True),
sem.transact(coroutine=ohlcv_req(2), credits=30, refund_time=10, transaction_id=2, verbose=True),
sem.transact(coroutine=ohlcv_req(3), credits=5, refund_time=10, transaction_id=3, verbose=True),
sem.transact(coroutine=tick_req(4), credits=20, refund_time=10, transaction_id=4, verbose=True),
]
await asyncio.gather(*transactions)
if __name__ == "__main__":
sync_example() #previous example
asyncio.run(async_example())
-----------------------async with transactions-----------------------
2024-03-28 19:17:12.120864:: TXN 1 acquiring CreditSemaphore
2024-03-28 19:17:12.120915:: TXN 1 entered CreditSemaphore...
2024-03-28 19:17:12.120927:: getTick processing 1 takes 4 seconds
2024-03-28 19:17:12.120964:: TXN 2 acquiring CreditSemaphore
2024-03-28 19:17:12.120988:: TXN 3 acquiring CreditSemaphore
2024-03-28 19:17:12.121006:: TXN 3 entered CreditSemaphore...
2024-03-28 19:17:12.121016:: getOHLCV processing 3 takes 5 seconds
2024-03-28 19:17:12.121044:: TXN 4 acquiring CreditSemaphore
2024-03-28 19:17:16.121665:: getTick processed 1
2024-03-28 19:17:16.121740:: TXN 1 exits CreditSemaphore, schedule refund in 10...
2024-03-28 19:17:17.122353:: getOHLCV processed 3
2024-03-28 19:17:17.122462:: TXN 3 exits CreditSemaphore, schedule refund in 10...
2024-03-28 19:17:26.123119:: TXN 2 entered CreditSemaphore...
2024-03-28 19:17:26.123192:: getOHLCV processing 2 takes 1 seconds
2024-03-28 19:17:27.123519:: getOHLCV processed 2
2024-03-28 19:17:27.123633:: TXN 2 exits CreditSemaphore, schedule refund in 10...
2024-03-28 19:17:37.124999:: TXN 4 entered CreditSemaphore...
2024-03-28 19:17:37.125077:: getTick processing 4 takes 3 seconds
2024-03-28 19:17:40.126377:: getTick processed 4
2024-03-28 19:17:40.126493:: TXN 4 exits CreditSemaphore, schedule refund in 10...
Easy with Decorators
The example given is nice but somewhat unwiedly due to having to wrap the method/function of interest in semaphore transactions. We would like to hide the throttling intricacies at the caller level, such that a user of said data-access layer or SDKs do not need to be aware of the presence of a semaphore. For instance:
class _Throttler():
def __init__(self):
#... previous example
self.arate_semaphore=AsyncRateSemaphore(31)
#... previous example
@aconsume_credits(costs=20,refund_in=10,attrname="arate_semaphore") #we want the asynchronous semaphore, and since the default name is not rate_semaphore, we pass in attrname
async def _agetTick(self, work, id):
print(f"{datetime.now()}:: getTick processing {id} takes {work} seconds")
await asyncio.sleep(work)
print(f"{datetime.now()}:: getTick processed {id}")
return True
@aconsume_credits(costs=10,refund_in=10,attrname="arate_semaphore",verbose=True)
async def _agetOHLCV(self, work, id):
print(f"{datetime.now()}:: getOHLCV processing {id} takes {work} seconds")
await asyncio.sleep(work)
print(f"{datetime.now()}:: getOHLCV processed {id}")
return True
AsyncRateSemaphore
objects corresponding to a unique credit/resource pool. For each
method that makes a resource-consuming request, we can decorate the function using aconsume_credits
with the costs and refund timers as parameters.
The decorated function calls then are submitted through the object attribute's asynchronous semaphore instance.
async def async_example():
print("-----------------------async with transactions-----------------------")
#... previous example
print("-----------------------async with decorator-----------------------")
throttler = _Throttler()
transactions = [
throttler._agetTick(3, 1),
throttler._agetOHLCV(1, 2),
throttler._agetTick(3, 3),
throttler._agetOHLCV(1, 4),
]
await asyncio.gather(*transactions)
-----------------------async with decorator-----------------------
2024-03-28 19:17:40.126936:: TXN {'fn': '_agetTick', 'args': (3, 1), 'kwargs': {}} acquiring CreditSemaphore
2024-03-28 19:17:40.127015:: TXN {'fn': '_agetTick', 'args': (3, 1), 'kwargs': {}} entered CreditSemaphore...
2024-03-28 19:17:40.127044:: getTick processing 1 takes 3 seconds
2024-03-28 19:17:40.127110:: TXN {'fn': '_agetOHLCV', 'args': (1, 2), 'kwargs': {}} acquiring CreditSemaphore
2024-03-28 19:17:40.127149:: TXN {'fn': '_agetOHLCV', 'args': (1, 2), 'kwargs': {}} entered CreditSemaphore...
2024-03-28 19:17:40.127170:: getOHLCV processing 2 takes 1 seconds
2024-03-28 19:17:40.127217:: TXN {'fn': '_agetTick', 'args': (3, 3), 'kwargs': {}} acquiring CreditSemaphore
2024-03-28 19:17:40.127266:: TXN {'fn': '_agetOHLCV', 'args': (1, 4), 'kwargs': {}} acquiring CreditSemaphore
2024-03-28 19:17:41.127912:: getOHLCV processed 2
2024-03-28 19:17:41.128021:: TXN {'fn': '_agetOHLCV', 'args': (1, 2), 'kwargs': {}} exits CreditSemaphore, schedule refund in 10...
2024-03-28 19:17:43.127485:: getTick processed 1
2024-03-28 19:17:43.127539:: TXN {'fn': '_agetTick', 'args': (3, 1), 'kwargs': {}} exits CreditSemaphore, schedule refund in 10...
2024-03-28 19:17:51.129270:: TXN {'fn': '_agetOHLCV', 'args': (1, 4), 'kwargs': {}} entered CreditSemaphore...
2024-03-28 19:17:51.129314:: getOHLCV processing 4 takes 1 seconds
2024-03-28 19:17:52.130482:: getOHLCV processed 4
2024-03-28 19:17:52.130534:: TXN {'fn': '_agetOHLCV', 'args': (1, 4), 'kwargs': {}} exits CreditSemaphore, schedule refund in 10...
2024-03-28 19:17:53.128193:: TXN {'fn': '_agetTick', 'args': (3, 3), 'kwargs': {}} entered CreditSemaphore...
2024-03-28 19:17:53.128281:: getTick processing 3 takes 3 seconds
2024-03-28 19:17:56.129573:: getTick processed 3
2024-03-28 19:17:56.129683:: TXN {'fn': '_agetTick', 'args': (3, 3), 'kwargs': {}} exits CreditSemaphore, schedule refund in 10...
Notes on Behavior
-
Entry Behavior
AsyncRateSemaphore
acquires the semaphore if there are enough resources, but ifgreedy_entry=False
, then the submitted transaction will wait behind the earlier pending transactions regardless of the resource pool availability. Otherwise, any submitted transaction that can run immediately will run without consideration for existing waiters.RateSemaphore
acquires the semaphore immediately if there are enough resources, and otherwise waits. The order in which blocked threads are awakened should not be relied on and is OS-scheduler dependent.
-
Exit Behavior
Requests submitted to both semaphore types exit from their respectivetransact
method without waiting for the credits to be refunded. The credits are scheduled to be refunded separately on a thread (for synchronous implementations) or the event loop (for asynchronous implementations). When the credit is refunded-
AsyncRateSemaphore
wakes up pending transactions that are able to execute on the state of the resource pool. Ifgreedy_exit=False
, then the number of pending transactions woken up will respect the FIFO order until the resource pool is insufficient for the earliest transaction. Otherwise, when credits are refunded with >=2 waiting transactions with arrival timetxn A
<txn B
. If the semaphore has not enough credits to executetxn A
, it can first runtxn B
. This helps to maximise throughput. -
RateSemaphore
wakes up pending transactions that are able to execute on the state of the resource pool. The order in which blocked threads are awakened should not be relied on and is OS-scheduler dependent.
-
-
Exception Behavior
Failed transactions (raised Exceptions) consume and refund credit in the same way as successful transactions.
Best Practices
-
Wrap unstable networks and expensive requests in a timeout transaction. This is to prevent the coroutine from 'await'-ing forever and hogging the semaphore.
-
Since the transactions wrap native Python functions and coroutines, it does not know when the code actually performs the credit-costing request. The
transact
functions should be closest to the costful logic as possible. It should not perform heavy compute or multiple requests so that the credits can refunded as quickly as possible for other transactions.