-
Notifications
You must be signed in to change notification settings - Fork 803
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: implement batching strategies #3630
base: main
Are you sure you want to change the base?
Changes from 4 commits
7510f21
06ffa76
ca5edb7
c11fb81
451eff0
6af6df2
bc4753f
c4e2bec
3e69b4d
6652866
b153452
4211a6b
a8e17f4
a4c3eac
5e6c844
ce337a6
ca9bfcf
17e61d5
a4d4850
ce403c1
28d209a
56088fe
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -52,28 +52,82 @@ In addition to declaring model as batchable, batch dimensions can also be config | |
Configuring Batching | ||
-------------------- | ||
|
||
If a model supports batching, adaptive batching is enabled by default. To explicitly disable or control adaptive batching behaviors at runtime, configuration can be specified under the ``batching`` key. | ||
Additionally, there are two configurations for customizing batching behaviors, `max_batch_size` and `max_latency_ms`. | ||
If a model supports batching, adaptive batching is enabled by default. To explicitly disable or | ||
control adaptive batching behaviors at runtime, configuration can be specified under the | ||
``batching`` key. Additionally, there are three configuration keys for customizing batching | ||
behaviors, ``max_batch_size``, ``max_latency_ms``, and ``strategy``. | ||
|
||
Max Batch Size | ||
^^^^^^^^^^^^^^ | ||
|
||
Configured through the ``max_batch_size`` key, max batch size represents the maximum size a batch can reach before releasing for inferencing. Max batch size should be set based on the capacity of the available system resources, e.g. memory or GPU memory. | ||
Configured through the ``max_batch_size`` key, max batch size represents the maximum size a batch | ||
can reach before being released for inferencing. Max batch size should be set based on the capacity | ||
of the available system resources, e.g. memory or GPU memory. | ||
|
||
Max Latency | ||
^^^^^^^^^^^ | ||
|
||
Configured through the ``max_latency_ms`` key, max latency represents the maximum latency in milliseconds that a batch should wait before releasing for inferencing. Max latency should be set based on the service level objective (SLO) of the inference requests. | ||
Configured through the ``max_latency_ms`` key, max latency represents the maximum latency in | ||
milliseconds that the scheduler will attempt to uphold by cancelling requests when it thinks the | ||
runner server is incapable of servicing that request in time. Max latency should be set based on the | ||
service level objective (SLO) of the inference requests. | ||
|
||
Batching Strategy | ||
^^^^^^^^^^^^^^^^^ | ||
|
||
Configured through the ``strategy`` and ``strategy_options`` keys, the batching strategy determines | ||
the way that the scheduler chooses a batching window, i.e. the time it waits for requests to combine | ||
them into a batch before dispatching it to begin execution. There are three options: | ||
|
||
- target_latency: this strategy waits until it expects the first request received will take around | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
``latency`` time to complete before beginning execution. Choose this method if | ||
you think that your service workload will be very bursty and so the intelligent | ||
wait algorithm will do a poor job of identifying average wait times. | ||
|
||
It takes one option, ``latency_ms`` (default 1000), which is the latency target | ||
to use for dispatch. | ||
|
||
- fixed_wait: this strategy will wait a fixed amount of time after the first request has been | ||
received. It differs from the target_latency strategy in that it does not consider | ||
the amount of time that it expects a batch will take to execute. | ||
|
||
It takes one option, ``wait_ms`` (default 1000), the amount of time to wait after | ||
receiving the first request. | ||
|
||
- intelligent_wait: this strategy waits intelligently in an effort to optimize average latency | ||
across all requests. It takes the average the average time spent in queue, then | ||
calculates the average time it expects to take to wait for and then execute the | ||
batch including the next request. If that time, multiplied by number of | ||
requests in the queue, is less than the average wait time, it will continue | ||
waiting for the next request to arrive. This is the default, and the other | ||
options should only be chosen if undesirable latency behavior is observed. | ||
|
||
It has one option, ``decay`` (default 0.95), which is the rate at which the | ||
dispatcher decays the wait time, per dispatched job. Note that this does not | ||
decay the actual expected wait time, but instead reduces the batching window, | ||
which indirectly reduces the average waiting time. | ||
|
||
|
||
.. code-block:: yaml | ||
:caption: ⚙️ `configuration.yml` | ||
|
||
runners: | ||
# batching options for all runners | ||
batching: | ||
enabled: true | ||
max_batch_size: 100 | ||
max_latency_ms: 500 | ||
strategy: avg_wait | ||
iris_clf: | ||
# batching options for specifically the iris_clf runner | ||
# these options override the above | ||
batching: | ||
enabled: true | ||
max_batch_size: 100 | ||
max_latency_ms: 500 | ||
strategy: target_latency | ||
strategy_options: | ||
latency_ms: 200 | ||
|
||
Monitoring | ||
---------- | ||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -7,6 +7,8 @@ | |||||
import functools | ||||||
import traceback | ||||||
import collections | ||||||
from abc import ABC | ||||||
from abc import abstractmethod | ||||||
|
||||||
import numpy as np | ||||||
|
||||||
|
@@ -32,10 +34,16 @@ def is_locked(self): | |||||
def release(self): | ||||||
self.sema += 1 | ||||||
|
||||||
@attr.define | ||||||
class Job: | ||||||
enqueue_time: float, | ||||||
data: t.Any, | ||||||
future: asyncio.Future[t.Any], | ||||||
dispatch_time: float = 0, | ||||||
|
||||||
class Optimizer: | ||||||
""" | ||||||
Analyse historical data to optimize CorkDispatcher. | ||||||
Analyze historical data to predict execution time using a simple linear regression on batch size. | ||||||
""" | ||||||
|
||||||
N_KEPT_SAMPLE = 50 # amount of outbound info kept for inferring params | ||||||
|
@@ -92,14 +100,97 @@ def trigger_refresh(self): | |||||
T_OUT = t.TypeVar("T_OUT") | ||||||
|
||||||
|
||||||
class CorkDispatcher: | ||||||
""" | ||||||
A decorator that: | ||||||
* wrap batch function | ||||||
* implement CORK algorithm to cork & release calling of wrapped function | ||||||
The wrapped function should be an async function. | ||||||
""" | ||||||
BATCHING_STRATEGY_REGISTRY = {} | ||||||
|
||||||
|
||||||
class BatchingStrategy(abc.ABC): | ||||||
strategy_id: str | ||||||
|
||||||
@abc.abstractmethod | ||||||
def controller(queue: t.Sequence[Job], predict_execution_time: t.Callable[t.Sequence[Job]], dispatch: t.Callable[]): | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
pass | ||||||
|
||||||
def __init_subclass__(cls, strategy_id: str): | ||||||
BATCHING_STRATEGY_REGISTRY[strategy_id] = cls | ||||||
cls.strategy_id = strategy_id | ||||||
|
||||||
|
||||||
class TargetLatencyStrategy(strategy_id="target_latency"): | ||||||
latency: float = 1 | ||||||
|
||||||
def __init__(self, options: dict[t.Any, t.Any]): | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. TODO: typed dict for init. |
||||||
for key in options: | ||||||
if key == "latency": | ||||||
self.latency = options[key] / 1000.0 | ||||||
else: | ||||||
logger.warning("Strategy 'target_latency' ignoring unknown configuration key '{key}'.") | ||||||
|
||||||
async def wait(queue: t.Sequence[Job], optimizer: Optimizer, max_latency: float, max_batch_size: int, tick_interval: float): | ||||||
now = time.time() | ||||||
w0 = now - queue[0].enqueue_time | ||||||
latency_0 = w0 + optimizer.o_a * n + optimizer.o_b | ||||||
|
||||||
while latency_0 < self.latency: | ||||||
n = len(queue) | ||||||
now = time.time() | ||||||
w0 = now - queue[0].enqueue_time | ||||||
latency_0 = w0 + optimizer.o_a * n + optimizer.o_b | ||||||
|
||||||
await asyncio.sleep(tick_interval) | ||||||
|
||||||
|
||||||
class FixedWaitStrategy(strategy_id="fixed_wait"): | ||||||
wait: float = 1 | ||||||
|
||||||
def __init__(self, options: dict[t.Any, t.Any]): | ||||||
for key in options: | ||||||
if key == "wait": | ||||||
self.wait = options[key] / 1000.0 | ||||||
else: | ||||||
logger.warning("Strategy 'fixed_wait' ignoring unknown configuration key '{key}'") | ||||||
|
||||||
async def wait(queue: t.Sequence[Job], optimizer: Optimizer, max_latency: float, max_batch_size: int, tick_interval: float): | ||||||
now = time.time() | ||||||
w0 = now - queue[0].enqueue_time | ||||||
|
||||||
if w0 < self.wait: | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add loop checking for |
||||||
await asyncio.sleep(self.wait - w0) | ||||||
|
||||||
|
||||||
class IntelligentWaitStrategy(strategy_id="intelligent_wait"): | ||||||
decay: float = 0.95 | ||||||
|
||||||
def __init__(self, options: dict[t.Any, t.Any]): | ||||||
for key in options: | ||||||
if key == "decay": | ||||||
self.decay = options[key] | ||||||
else: | ||||||
logger.warning("Strategy 'intelligent_wait' ignoring unknown configuration value") | ||||||
|
||||||
async def wait(queue: t.Sequence[Job], optimizer: Optimizer, max_latency: float, max_batch_size: int, tick_interval: float): | ||||||
n = len(queue) | ||||||
now = time.time() | ||||||
wn = now - queue[-1].enqueue_time | ||||||
latency_0 = w0 + optimizer.o_a * n + optimizer.o_b | ||||||
while ( | ||||||
# if we don't already have enough requests, | ||||||
n < max_batch_size | ||||||
# we are not about to cancel the first request, | ||||||
and latency_0 + dt <= self.max_latency * 0.95 | ||||||
# and waiting will cause average latency to decrese | ||||||
sauyon marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
and n * (wn + dt + optimizer.o_a) <= optimizer.wait * decay | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. n: number of requests in queue ^ The above is a measure of how much latency will be added to every request if we wait for a new request and add that to the batch less than optimizer.wait: the average amount of time a request sits in queue |
||||||
): | ||||||
n = len(queue) | ||||||
now = time.time() | ||||||
w0 = now - queue[0].enqueue_time | ||||||
latency_0 = w0 + optimizer.o_a * n + optimizer.o_b | ||||||
|
||||||
# wait for additional requests to arrive | ||||||
await asyncio.sleep(tick_interval) | ||||||
|
||||||
|
||||||
|
||||||
class Dispatcher: | ||||||
def __init__( | ||||||
self, | ||||||
max_latency_in_ms: int, | ||||||
|
@@ -123,9 +214,7 @@ def __init__( | |||||
self.tick_interval = 0.001 | ||||||
|
||||||
self._controller = None | ||||||
self._queue: collections.deque[ | ||||||
tuple[float, t.Any, asyncio.Future[t.Any]] | ||||||
] = collections.deque() # TODO(bojiang): maxlen | ||||||
self._queue: collections.deque[Job] = collections.deque() # TODO(bojiang): maxlen | ||||||
self._sema = shared_sema if shared_sema else NonBlockSema(1) | ||||||
|
||||||
def shutdown(self): | ||||||
|
@@ -214,6 +303,9 @@ async def train_optimizer( | |||||
# call | ||||||
self._sema.acquire() | ||||||
inputs_info = tuple(self._queue.pop() for _ in range(n_call_out)) | ||||||
for info in inputs_info: | ||||||
# fake wait as 0 for training requests | ||||||
info[0] = now | ||||||
self._loop.create_task(self.outbound_call(inputs_info)) | ||||||
except asyncio.CancelledError: | ||||||
return | ||||||
|
@@ -259,27 +351,28 @@ async def controller(self): | |||||
dt = self.tick_interval | ||||||
decay = 0.95 # the decay rate of wait time | ||||||
now = time.time() | ||||||
w0 = now - self._queue[0][0] | ||||||
wn = now - self._queue[-1][0] | ||||||
w0 = now - self._queue[0].enqueue_time | ||||||
wn = now - self._queue[-1].enqueue_time | ||||||
a = self.optimizer.o_a | ||||||
b = self.optimizer.o_b | ||||||
|
||||||
if n > 1 and (w0 + a * n + b) >= self.max_latency_in_ms: | ||||||
self._queue.popleft()[2].cancel() | ||||||
# the estimated latency of the first request if we began processing now | ||||||
latency_0 = w0 + a * n + b | ||||||
|
||||||
if n > 1 and latency_0 >= self.max_latency_in_ms: | ||||||
self._queue.popleft().future.cancel() | ||||||
continue | ||||||
if self._sema.is_locked(): | ||||||
if n == 1 and w0 >= self.max_latency_in_ms: | ||||||
self._queue.popleft()[2].cancel() | ||||||
self._queue.popleft().future.cancel() | ||||||
continue | ||||||
await asyncio.sleep(self.tick_interval) | ||||||
continue | ||||||
if ( | ||||||
n < self.max_batch_size | ||||||
and n * (wn + dt + (a or 0)) <= self.optimizer.wait * decay | ||||||
): | ||||||
await asyncio.sleep(self.tick_interval) | ||||||
continue | ||||||
|
||||||
# we are now free to dispatch whenever we like | ||||||
await self.strategy.wait(self._queue, optimizer, self.max_latency, self.max_batch_size, self.tick_interval) | ||||||
|
||||||
n = len(self._queue) | ||||||
n_call_out = min(self.max_batch_size, n) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Move this (and above) logic into strategy. |
||||||
# call | ||||||
self._sema.acquire() | ||||||
|
@@ -290,6 +383,7 @@ async def controller(self): | |||||
except Exception as e: # pylint: disable=broad-except | ||||||
logger.error(traceback.format_exc(), exc_info=e) | ||||||
|
||||||
|
||||||
async def inbound_call(self, data: t.Any): | ||||||
now = time.time() | ||||||
future = self._loop.create_future() | ||||||
|
@@ -300,7 +394,7 @@ async def inbound_call(self, data: t.Any): | |||||
return await future | ||||||
|
||||||
async def outbound_call( | ||||||
self, inputs_info: tuple[tuple[float, t.Any, asyncio.Future[t.Any]]] | ||||||
self, inputs_info: tuple[Job, ...] | ||||||
): | ||||||
_time_start = time.time() | ||||||
_done = False | ||||||
|
@@ -315,7 +409,7 @@ async def outbound_call( | |||||
_done = True | ||||||
self.optimizer.log_outbound( | ||||||
n=len(inputs_info), | ||||||
wait=_time_start - inputs_info[-1][0], | ||||||
wait=_time_start - inputs_info[-1].enqueue_time, | ||||||
duration=time.time() - _time_start, | ||||||
) | ||||||
except asyncio.CancelledError: | ||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Docs changes are outdated, correct?