diff --git a/src/bentoml/_internal/configuration/v1/default_configuration.yaml b/src/bentoml/_internal/configuration/v1/default_configuration.yaml index a8b38303723..671cb32f100 100644 --- a/src/bentoml/_internal/configuration/v1/default_configuration.yaml +++ b/src/bentoml/_internal/configuration/v1/default_configuration.yaml @@ -83,7 +83,9 @@ runners: timeout: 300 optimizer: name: linear - options: {} + options: + initial_slope_ms: 2. + initial_intercept_ms: 1. batching: enabled: true max_batch_size: 100 diff --git a/src/bentoml/_internal/marshal/dispatcher.py b/src/bentoml/_internal/marshal/dispatcher.py index 607323ab06a..ebde0b2d805 100644 --- a/src/bentoml/_internal/marshal/dispatcher.py +++ b/src/bentoml/_internal/marshal/dispatcher.py @@ -50,7 +50,6 @@ class Job: dispatch_time: float = 0 - OPTIMIZER_REGISTRY = {} @@ -58,7 +57,6 @@ class Optimizer(ABC): optimizer_id: str n_skipped_sample: int = 0 - @abstractmethod def __init__(self, options: dict[str, t.Any]): pass @@ -100,8 +98,8 @@ class LinearOptimizer(Optimizer, optimizer_id="linear"): """ Analyze historical data to predict execution time using a simple linear regression on batch size. """ - o_a: float = 2 - o_b: float = 1 + o_a: float = 2. + o_b: float = 1. n_kept_sample = 50 # amount of outbound info kept for inferring params n_skipped_sample = 2 # amount of outbound info skipped after init @@ -113,10 +111,10 @@ def __init__(self, options: dict[str, t.Any]): (all in seconds) """ for key in options: - if key == "initial_slope": - self.o_a = options[key] - elif key == "initial_intercept": - self.o_b = options[key] + if key == "initial_slope_ms": + self.o_a = options[key] / 1000. + elif key == "initial_intercept_ms": + self.o_b = options[key] / 1000. elif key == "n_kept_sample": self.n_kept_sample = options[key] elif key == "n_skipped_sample": @@ -124,7 +122,7 @@ def __init__(self, options: dict[str, t.Any]): elif key == "param_refresh_interval": self.param_refresh_interval = options[key] else: - logger.warning("Strategy 'target_latency' ignoring unknown configuration key '{key}'.") + logger.warning(f"Optimizer 'linear' ignoring unknown configuration key '{key}'.") self.o_stat: collections.deque[tuple[int, float]] = collections.deque( maxlen=self.n_kept_sample @@ -181,7 +179,7 @@ def __init__(self, optimizer: Optimizer, options: dict[t.Any, t.Any]): pass @abstractmethod - async def batch(self, optimizer: Optimizer, queue: t.Deque[Job], max_latency: float, max_batch_size: int, tick_interval: float, dispatch: t.Callable[[t.Sequence[Job]], None]): + async def batch(self, optimizer: Optimizer, queue: t.Deque[Job], max_latency: float, max_batch_size: int, tick_interval: float, dispatch: t.Callable[[t.Sequence[Job], int], None]): pass def __init_subclass__(cls, strategy_id: str): @@ -197,16 +195,16 @@ def __init__(self, options: dict[t.Any, t.Any]): if key == "latency": self.latency = options[key] / 1000.0 else: - logger.warning("Strategy 'target_latency' ignoring unknown configuration key '{key}'.") + logger.warning(f"Strategy 'target_latency' ignoring unknown configuration key '{key}'.") - async def batch(self, optimizer: Optimizer, queue: t.Deque[Job], max_latency: float, max_batch_size: int, tick_interval: float, dispatch: t.Callable[[t.Sequence[Job]], None]): + async def batch(self, optimizer: Optimizer, queue: t.Deque[Job], max_latency: float, max_batch_size: int, tick_interval: float, dispatch: t.Callable[[t.Sequence[Job], int], None]): n = len(queue) now = time.time() w0 = now - queue[0].enqueue_time latency_0 = w0 + optimizer.predict(n) - while latency_0 < self.latency: + while latency_0 < self.latency and n < max_batch_size: n = len(queue) now = time.time() w0 = now - queue[0].enqueue_time @@ -214,6 +212,16 @@ async def batch(self, optimizer: Optimizer, queue: t.Deque[Job], max_latency: fl await asyncio.sleep(tick_interval) + # call + n_call_out = 0 + batch_size = 0 + for job in queue: + if batch_size + job.data.sample.batch_size <= max_batch_size: + n_call_out += 1 + batch_size += job.data.sample.batch_size + inputs_info = tuple(self._queue.pop() for _ in range(n_call_out)) + dispatch(inputs_info, batch_size) + class AdaptiveStrategy(BatchingStrategy, strategy_id="adaptive"): decay: float = 0.95 @@ -226,12 +234,14 @@ def __init__(self, options: dict[t.Any, t.Any]): for key in options: if key == "decay": self.decay = options[key] + elif key == "n_kept_samples": + self.n_kept_samples = options[key] else: - logger.warning("Strategy 'adaptive' ignoring unknown configuration value") + logger.warning(f"Strategy 'adaptive' ignoring unknown configuration value") self.avg_wait_times = collections.deque(maxlen=self.n_kept_samples) - async def batch(self, optimizer: Optimizer, queue: t.Deque[Job], max_latency: float, max_batch_size: int, tick_interval: float, dispatch: t.Callable[[t.Sequence[Job]], None]): + async def batch(self, optimizer: Optimizer, queue: t.Deque[Job], max_latency: float, max_batch_size: int, tick_interval: float, dispatch: t.Callable[[t.Sequence[Job], int], None]): n = len(queue) now = time.time() w0 = now - queue[0].enqueue_time @@ -254,22 +264,26 @@ async def batch(self, optimizer: Optimizer, queue: t.Deque[Job], max_latency: fl await asyncio.sleep(tick_interval) # dispatch the batch - n = len(queue) - n_call_out = min(max_batch_size, n) - # call inputs_info: list[Job] = [] + n_call_out = 0 + batch_size = 0 + for job in queue: + if batch_size + job.data.sample.batch_size <= max_batch_size: + n_call_out += 1 + for _ in range(n_call_out): job = queue.pop() + batch_size += job.data.sample.batch_size new_wait = (now - job.enqueue_time) / self.n_kept_samples if len(self.avg_wait_times) == self.n_kept_samples: oldest_wait = self.avg_wait_times.popleft() self.avg_req_wait = self.avg_req_wait - oldest_wait + new_wait else: + # avg deliberately undercounts until we hit n_kept_sample for simplicity self.avg_req_wait += new_wait - self.avg_wait_times.append(new_wait) inputs_info.append(job) - dispatch(inputs_info) + dispatch(inputs_info, batch_size) class Dispatcher: @@ -385,15 +399,15 @@ async def train_optimizer( await asyncio.sleep(self.tick_interval) continue - n_call_out = min(n, batch_size) - req_count += 1 # call - self._sema.acquire() + n_call_out = 0 + batch_size = 0 + for job in queue: + if batch_size + job.data.sample.batch_size <= max_batch_size: + n_call_out += 1 + batch_size += job.data.sample.batch_size inputs_info = tuple(self._queue.pop() for _ in range(n_call_out)) - for info in inputs_info: - # fake wait as 0 for training requests - info.enqueue_time = now - self._loop.create_task(self.outbound_call(inputs_info)) + dispatch(inputs_info, batch_size) except asyncio.CancelledError: return except Exception as e: # pylint: disable=broad-except @@ -461,9 +475,9 @@ async def controller(self): except Exception as e: # pylint: disable=broad-except logger.error(traceback.format_exc(), exc_info=e) - def _dispatch(self, inputs_info: t.Sequence[Job]): + def _dispatch(self, inputs_info: t.Sequence[Job], batch_size: int): self._sema.acquire() - task = self._loop.create_task(self.outbound_call(inputs_info)) + task = self._loop.create_task(self.outbound_call(inputs_info, batch_size)) self.background_tasks.add(task) task.add_done_callback(self.background_tasks.discard) @@ -476,11 +490,10 @@ async def inbound_call(self, data: t.Any): self._wake_event.notify_all() return await future - async def outbound_call(self, inputs_info: t.Sequence[Job]): + async def outbound_call(self, inputs_info: t.Sequence[Job], batch_size: int): _time_start = time.time() _done = False - batch_size = len(inputs_info) - logger.debug("Dynamic batching cork released, batch size: %d", batch_size) + logger.debug("Dynamic batching cork released, batch size: %d (%d requests)", batch_size, len(inputs_info)) try: outputs = await self.callback( tuple(t.cast(t.Any, input_info.data) for input_info in inputs_info) diff --git a/src/bentoml/_internal/server/runner_app.py b/src/bentoml/_internal/server/runner_app.py index 36eac3b14cf..8341b547e32 100644 --- a/src/bentoml/_internal/server/runner_app.py +++ b/src/bentoml/_internal/server/runner_app.py @@ -61,7 +61,6 @@ def __init__( self.dispatchers[method.name] = Dispatcher( max_latency_in_ms=method.max_latency_ms, - batching_strategy=method.batching_strategy, max_batch_size=max_batch_size, optimizer=method.optimizer, strategy=method.batching_strategy,