Skip to content

Commit

Permalink
implement target latency for batching
Browse files Browse the repository at this point in the history
  • Loading branch information
sauyon committed Mar 2, 2023
1 parent 8b72567 commit ee61bf8
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 19 deletions.
1 change: 1 addition & 0 deletions src/bentoml/_internal/configuration/v1/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@
s.Optional("batching"): {
s.Optional("enabled"): bool,
s.Optional("max_batch_size"): s.And(int, ensure_larger_than_zero),
s.Optional("target_latency_ms"): s.Or(int, None),
s.Optional("max_latency_ms"): s.And(int, ensure_larger_than_zero),
},
# NOTE: there is a distinction between being unset and None here; if set to 'None'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ runners:
batching:
enabled: true
max_batch_size: 100
target_latency_ms: ~
max_latency_ms: 10000
logging:
access:
Expand Down
49 changes: 32 additions & 17 deletions src/bentoml/_internal/marshal/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ class CorkDispatcher:
def __init__(
self,
max_latency_in_ms: int,
target_latency_in_ms: int | None,
max_batch_size: int,
shared_sema: t.Optional[NonBlockSema] = None,
fallback: t.Callable[[], t.Any] | type[t.Any] | None = None,
Expand All @@ -116,9 +117,14 @@ def __init__(
raises:
* all possible exceptions the decorated function has
"""
self.max_latency_in_ms = max_latency_in_ms / 1000.0
self.max_latency = max_latency_in_ms / 1000.0
if target_latency_in_ms is not None and target_latency_in_ms < 0:
target_latency_in_ms = None
self.target_latency = (
None if target_latency_in_ms is None else target_latency_in_ms / 1000.0
)
self.fallback = fallback
self.optimizer = Optimizer(self.max_latency_in_ms)
self.optimizer = Optimizer(self.max_latency)
self.max_batch_size = int(max_batch_size)
self.tick_interval = 0.001

Expand Down Expand Up @@ -189,7 +195,7 @@ async def controller(self):
> self.optimizer.N_SKIPPED_SAMPLE
- self.optimizer.outbound_counter
+ 6
and w0 >= self.max_latency_in_ms
and w0 >= self.max_latency
):
# we're being very conservative and only canceling requests if they have already timed out
self._queue.popleft()[2].cancel()
Expand Down Expand Up @@ -222,7 +228,7 @@ async def controller(self):
w0 = now - self._queue[0][0]

# only cancel requests if there are more than enough for training
if n > 6 and w0 >= self.max_latency_in_ms:
if n > 6 and w0 >= self.max_latency:
# we're being very conservative and only canceling requests if they have already timed out
self._queue.popleft()[2].cancel()
continue
Expand All @@ -246,7 +252,7 @@ async def controller(self):
if self.max_batch_size >= 2:
# we will attempt to keep the second request served within this time
step_2_wait = min(
self.max_latency_in_ms * 0.95,
self.max_latency * 0.95,
5 * (self.optimizer.o_a + self.optimizer.o_b),
)

Expand All @@ -266,7 +272,7 @@ async def controller(self):
b = self.optimizer.o_b

# only cancel requests if there are more than enough for training
if n > 5 and w0 >= self.max_latency_in_ms:
if n > 5 and w0 >= self.max_latency:
# we're being very conservative and only canceling requests if they have already timed out
self._queue.popleft()[2].cancel()
continue
Expand Down Expand Up @@ -295,7 +301,7 @@ async def controller(self):

# we will attempt to keep the second request served within this time
step_3_wait = min(
self.max_latency_in_ms * 0.95,
self.max_latency * 0.95,
7 * (self.optimizer.o_a + self.optimizer.o_b),
)
while (
Expand All @@ -313,7 +319,7 @@ async def controller(self):
b = self.optimizer.o_b

# only cancel requests if there are more than enough for training
if n > 3 and w0 >= self.max_latency_in_ms:
if n > 3 and w0 >= self.max_latency:
# we're being very conservative and only canceling requests if they have already timed out
self._queue.popleft()[2].cancel()
continue
Expand All @@ -334,7 +340,7 @@ async def controller(self):
logger.debug("Dispatcher finished optimizer training request 3.")
self.optimizer.trigger_refresh()

if self.optimizer.o_a + self.optimizer.o_b >= self.max_latency_in_ms:
if self.optimizer.o_a + self.optimizer.o_b >= self.max_latency:
logger.warning(
"BentoML has detected that a service has a max latency that is likely too low for serving. If many 429 errors are encountered, try raising the 'runner.max_latency' in your BentoML configuration YAML file."
)
Expand All @@ -354,21 +360,30 @@ async def controller(self):
a = self.optimizer.o_a
b = self.optimizer.o_b

if n > 1 and (w0 + a * n + b) >= self.max_latency_in_ms:
if n > 1 and (w0 + a * n + b) >= self.max_latency:
self._queue.popleft()[2].cancel()
continue
if self._sema.is_locked():
if n == 1 and w0 >= self.max_latency_in_ms:
if n == 1 and w0 >= self.max_latency:
self._queue.popleft()[2].cancel()
continue
await asyncio.sleep(self.tick_interval)
continue
if (
n < self.max_batch_size
and n * (wn + dt + (a or 0)) <= self.optimizer.wait * decay
):
await asyncio.sleep(self.tick_interval)
continue

if n < self.max_batch_size:
wait = (
self.optimizer.wait * decay
if self.target_latency is None
else self.target_latency
)
if self.target_latency is None:
if n * (wn + dt + (a or 0)) <= self.optimizer.wait * decay:
await asyncio.sleep(self.tick_interval)
continue
else: # self.target_latency is not None
if w0 + dt + (a * (n + 1)) + b <= self.target_latency:
await asyncio.sleep(self.tick_interval)
continue

n_call_out = min(self.max_batch_size, n)
# call
Expand Down
2 changes: 2 additions & 0 deletions src/bentoml/_internal/models/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,7 @@ def to_runner(
name: str = "",
max_batch_size: int | None = None,
max_latency_ms: int | None = None,
target_latency_ms: int | None = None,
method_configs: dict[str, dict[str, int]] | None = None,
) -> Runner:
"""
Expand All @@ -347,6 +348,7 @@ def to_runner(
models=[self],
max_batch_size=max_batch_size,
max_latency_ms=max_latency_ms,
target_latency_ms=target_latency_ms,
method_configs=method_configs,
)

Expand Down
16 changes: 14 additions & 2 deletions src/bentoml/_internal/runner/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class RunnerMethod(t.Generic[T, P, R]):
config: RunnableMethodConfig
max_batch_size: int
max_latency_ms: int
target_latency_ms: int | None

def run(self, *args: P.args, **kwargs: P.kwargs) -> R:
return self.runner._runner_handle.run_method(self, *args, **kwargs)
Expand Down Expand Up @@ -156,6 +157,7 @@ def __init__(
models: list[Model] | None = None,
max_batch_size: int | None = None,
max_latency_ms: int | None = None,
target_latency_ms: int | None = None,
method_configs: dict[str, dict[str, int]] | None = None,
) -> None:
"""
Expand All @@ -174,8 +176,9 @@ def __init__(
models: An optional list composed of ``bentoml.Model`` instances.
max_batch_size: Max batch size config for dynamic batching. If not provided, use the default value from
configuration.
max_latency_ms: Max latency config for dynamic batching. If not provided, use the default value from
configuration.
max_latency_ms: Max latency config. If not provided, uses the default value from configuration.
target_latency_ms: Target latency config for dynamic batching. If not provided, uses the default value
from the configuration.
method_configs: A dictionary per method config for this given Runner signatures.
Returns:
Expand Down Expand Up @@ -212,13 +215,17 @@ def __init__(

method_max_batch_size = None
method_max_latency_ms = None
method_target_latency_ms = None
if method_name in method_configs:
method_max_batch_size = method_configs[method_name].get(
"max_batch_size"
)
method_max_latency_ms = method_configs[method_name].get(
"max_latency_ms"
)
method_target_latency_ms = method_configs[method_name].get(
"target_latency_ms"
)

runner_method_map[method_name] = RunnerMethod(
runner=self,
Expand All @@ -234,6 +241,11 @@ def __init__(
max_latency_ms,
default=config["batching"]["max_latency_ms"],
),
target_latency_ms=first_not_none(
method_target_latency_ms,
target_latency_ms,
default=config["batching"]["target_latency_ms"],
),
)

self.__attrs_init__(
Expand Down
1 change: 1 addition & 0 deletions src/bentoml/_internal/server/runner_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ def __init__(
max_batch_size = method.max_batch_size if method.config.batchable else 1
self.dispatchers[method.name] = CorkDispatcher(
max_latency_in_ms=method.max_latency_ms,
target_latency_in_ms=method.target_latency_ms,
max_batch_size=max_batch_size,
fallback=functools.partial(
ServiceUnavailable, message="process is overloaded"
Expand Down
1 change: 1 addition & 0 deletions src/bentoml/triton.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ def __getattr__(self, item: str) -> t.Any:
config=_RunnableMethodConfig(batchable=True, batch_dim=(0, 0)),
max_batch_size=0,
max_latency_ms=10000,
target_latency_ms=None,
)

return super().__getattribute__(item)
Expand Down

0 comments on commit ee61bf8

Please sign in to comment.