diff --git a/src/bentoml/_internal/configuration/v1/__init__.py b/src/bentoml/_internal/configuration/v1/__init__.py index 2d9776edf7f..d516f40000f 100644 --- a/src/bentoml/_internal/configuration/v1/__init__.py +++ b/src/bentoml/_internal/configuration/v1/__init__.py @@ -140,6 +140,7 @@ s.Optional("batching"): { s.Optional("enabled"): bool, s.Optional("max_batch_size"): s.And(int, ensure_larger_than_zero), + s.Optional("target_latency_ms"): s.Or(int, None), s.Optional("max_latency_ms"): s.And(int, ensure_larger_than_zero), }, # NOTE: there is a distinction between being unset and None here; if set to 'None' diff --git a/src/bentoml/_internal/configuration/v1/default_configuration.yaml b/src/bentoml/_internal/configuration/v1/default_configuration.yaml index 9ddd2741d50..81c472ea761 100644 --- a/src/bentoml/_internal/configuration/v1/default_configuration.yaml +++ b/src/bentoml/_internal/configuration/v1/default_configuration.yaml @@ -82,6 +82,7 @@ runners: batching: enabled: true max_batch_size: 100 + target_latency_ms: ~ max_latency_ms: 10000 logging: access: diff --git a/src/bentoml/_internal/marshal/dispatcher.py b/src/bentoml/_internal/marshal/dispatcher.py index c086e61007c..ae3dc8c4cf2 100644 --- a/src/bentoml/_internal/marshal/dispatcher.py +++ b/src/bentoml/_internal/marshal/dispatcher.py @@ -103,6 +103,7 @@ class CorkDispatcher: def __init__( self, max_latency_in_ms: int, + target_latency_in_ms: int | None, max_batch_size: int, shared_sema: t.Optional[NonBlockSema] = None, fallback: t.Callable[[], t.Any] | type[t.Any] | None = None, @@ -116,9 +117,14 @@ def __init__( raises: * all possible exceptions the decorated function has """ - self.max_latency_in_ms = max_latency_in_ms / 1000.0 + self.max_latency = max_latency_in_ms / 1000.0 + if target_latency_in_ms is not None and target_latency_in_ms < 0: + target_latency_in_ms = None + self.target_latency = ( + None if target_latency_in_ms is None else target_latency_in_ms / 1000.0 + ) self.fallback = fallback - self.optimizer = Optimizer(self.max_latency_in_ms) + self.optimizer = Optimizer(self.max_latency) self.max_batch_size = int(max_batch_size) self.tick_interval = 0.001 @@ -189,7 +195,7 @@ async def controller(self): > self.optimizer.N_SKIPPED_SAMPLE - self.optimizer.outbound_counter + 6 - and w0 >= self.max_latency_in_ms + and w0 >= self.max_latency ): # we're being very conservative and only canceling requests if they have already timed out self._queue.popleft()[2].cancel() @@ -222,7 +228,7 @@ async def controller(self): w0 = now - self._queue[0][0] # only cancel requests if there are more than enough for training - if n > 6 and w0 >= self.max_latency_in_ms: + if n > 6 and w0 >= self.max_latency: # we're being very conservative and only canceling requests if they have already timed out self._queue.popleft()[2].cancel() continue @@ -246,7 +252,7 @@ async def controller(self): if self.max_batch_size >= 2: # we will attempt to keep the second request served within this time step_2_wait = min( - self.max_latency_in_ms * 0.95, + self.max_latency * 0.95, 5 * (self.optimizer.o_a + self.optimizer.o_b), ) @@ -266,7 +272,7 @@ async def controller(self): b = self.optimizer.o_b # only cancel requests if there are more than enough for training - if n > 5 and w0 >= self.max_latency_in_ms: + if n > 5 and w0 >= self.max_latency: # we're being very conservative and only canceling requests if they have already timed out self._queue.popleft()[2].cancel() continue @@ -295,7 +301,7 @@ async def controller(self): # we will attempt to keep the second request served within this time step_3_wait = min( - self.max_latency_in_ms * 0.95, + self.max_latency * 0.95, 7 * (self.optimizer.o_a + self.optimizer.o_b), ) while ( @@ -313,7 +319,7 @@ async def controller(self): b = self.optimizer.o_b # only cancel requests if there are more than enough for training - if n > 3 and w0 >= self.max_latency_in_ms: + if n > 3 and w0 >= self.max_latency: # we're being very conservative and only canceling requests if they have already timed out self._queue.popleft()[2].cancel() continue @@ -334,7 +340,7 @@ async def controller(self): logger.debug("Dispatcher finished optimizer training request 3.") self.optimizer.trigger_refresh() - if self.optimizer.o_a + self.optimizer.o_b >= self.max_latency_in_ms: + if self.optimizer.o_a + self.optimizer.o_b >= self.max_latency: logger.warning( "BentoML has detected that a service has a max latency that is likely too low for serving. If many 429 errors are encountered, try raising the 'runner.max_latency' in your BentoML configuration YAML file." ) @@ -354,21 +360,30 @@ async def controller(self): a = self.optimizer.o_a b = self.optimizer.o_b - if n > 1 and (w0 + a * n + b) >= self.max_latency_in_ms: + if n > 1 and (w0 + a * n + b) >= self.max_latency: self._queue.popleft()[2].cancel() continue if self._sema.is_locked(): - if n == 1 and w0 >= self.max_latency_in_ms: + if n == 1 and w0 >= self.max_latency: self._queue.popleft()[2].cancel() continue await asyncio.sleep(self.tick_interval) continue - if ( - n < self.max_batch_size - and n * (wn + dt + (a or 0)) <= self.optimizer.wait * decay - ): - await asyncio.sleep(self.tick_interval) - continue + + if n < self.max_batch_size: + wait = ( + self.optimizer.wait * decay + if self.target_latency is None + else self.target_latency + ) + if self.target_latency is None: + if n * (wn + dt + (a or 0)) <= self.optimizer.wait * decay: + await asyncio.sleep(self.tick_interval) + continue + else: # self.target_latency is not None + if w0 + dt + (a * (n + 1)) + b <= self.target_latency: + await asyncio.sleep(self.tick_interval) + continue n_call_out = min(self.max_batch_size, n) # call diff --git a/src/bentoml/_internal/models/model.py b/src/bentoml/_internal/models/model.py index 903dac461aa..5dd08a33641 100644 --- a/src/bentoml/_internal/models/model.py +++ b/src/bentoml/_internal/models/model.py @@ -325,6 +325,7 @@ def to_runner( name: str = "", max_batch_size: int | None = None, max_latency_ms: int | None = None, + target_latency_ms: int | None = None, method_configs: dict[str, dict[str, int]] | None = None, ) -> Runner: """ @@ -347,6 +348,7 @@ def to_runner( models=[self], max_batch_size=max_batch_size, max_latency_ms=max_latency_ms, + target_latency_ms=target_latency_ms, method_configs=method_configs, ) diff --git a/src/bentoml/_internal/runner/runner.py b/src/bentoml/_internal/runner/runner.py index 3da39cd5bd8..e0b5bce1d2a 100644 --- a/src/bentoml/_internal/runner/runner.py +++ b/src/bentoml/_internal/runner/runner.py @@ -47,6 +47,7 @@ class RunnerMethod(t.Generic[T, P, R]): config: RunnableMethodConfig max_batch_size: int max_latency_ms: int + target_latency_ms: int | None def run(self, *args: P.args, **kwargs: P.kwargs) -> R: return self.runner._runner_handle.run_method(self, *args, **kwargs) @@ -156,6 +157,7 @@ def __init__( models: list[Model] | None = None, max_batch_size: int | None = None, max_latency_ms: int | None = None, + target_latency_ms: int | None = None, method_configs: dict[str, dict[str, int]] | None = None, ) -> None: """ @@ -174,8 +176,9 @@ def __init__( models: An optional list composed of ``bentoml.Model`` instances. max_batch_size: Max batch size config for dynamic batching. If not provided, use the default value from configuration. - max_latency_ms: Max latency config for dynamic batching. If not provided, use the default value from - configuration. + max_latency_ms: Max latency config. If not provided, uses the default value from configuration. + target_latency_ms: Target latency config for dynamic batching. If not provided, uses the default value + from the configuration. method_configs: A dictionary per method config for this given Runner signatures. Returns: @@ -212,6 +215,7 @@ def __init__( method_max_batch_size = None method_max_latency_ms = None + method_target_latency_ms = None if method_name in method_configs: method_max_batch_size = method_configs[method_name].get( "max_batch_size" @@ -219,6 +223,9 @@ def __init__( method_max_latency_ms = method_configs[method_name].get( "max_latency_ms" ) + method_target_latency_ms = method_configs[method_name].get( + "target_latency_ms" + ) runner_method_map[method_name] = RunnerMethod( runner=self, @@ -234,6 +241,11 @@ def __init__( max_latency_ms, default=config["batching"]["max_latency_ms"], ), + target_latency_ms=first_not_none( + method_target_latency_ms, + target_latency_ms, + default=config["batching"]["target_latency_ms"], + ), ) self.__attrs_init__( diff --git a/src/bentoml/_internal/server/runner_app.py b/src/bentoml/_internal/server/runner_app.py index 98c7c31c6a5..e0d5e9a0599 100644 --- a/src/bentoml/_internal/server/runner_app.py +++ b/src/bentoml/_internal/server/runner_app.py @@ -58,6 +58,7 @@ def __init__( max_batch_size = method.max_batch_size if method.config.batchable else 1 self.dispatchers[method.name] = CorkDispatcher( max_latency_in_ms=method.max_latency_ms, + target_latency_in_ms=method.target_latency_ms, max_batch_size=max_batch_size, fallback=functools.partial( ServiceUnavailable, message="process is overloaded" diff --git a/src/bentoml/triton.py b/src/bentoml/triton.py index f86156fb076..d0b880a4adc 100644 --- a/src/bentoml/triton.py +++ b/src/bentoml/triton.py @@ -188,6 +188,7 @@ def __getattr__(self, item: str) -> t.Any: config=_RunnableMethodConfig(batchable=True, batch_dim=(0, 0)), max_batch_size=0, max_latency_ms=10000, + target_latency_ms=None, ) return super().__getattribute__(item)