Skip to content

Commit

Permalink
more misc fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
sauyon committed Apr 25, 2023
1 parent 5e6c844 commit ce337a6
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,9 @@ runners:
timeout: 300
optimizer:
name: linear
options: {}
options:
initial_slope_ms: 2.
initial_intercept_ms: 1.
batching:
enabled: true
max_batch_size: 100
Expand Down
77 changes: 45 additions & 32 deletions src/bentoml/_internal/marshal/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,13 @@ class Job:
dispatch_time: float = 0



OPTIMIZER_REGISTRY = {}


class Optimizer(ABC):
optimizer_id: str
n_skipped_sample: int = 0

@abstractmethod
def __init__(self, options: dict[str, t.Any]):
pass

Expand Down Expand Up @@ -100,8 +98,8 @@ class LinearOptimizer(Optimizer, optimizer_id="linear"):
"""
Analyze historical data to predict execution time using a simple linear regression on batch size.
"""
o_a: float = 2
o_b: float = 1
o_a: float = 2.
o_b: float = 1.

n_kept_sample = 50 # amount of outbound info kept for inferring params
n_skipped_sample = 2 # amount of outbound info skipped after init
Expand All @@ -113,18 +111,18 @@ def __init__(self, options: dict[str, t.Any]):
(all in seconds)
"""
for key in options:
if key == "initial_slope":
self.o_a = options[key]
elif key == "initial_intercept":
self.o_b = options[key]
if key == "initial_slope_ms":
self.o_a = options[key] / 1000.
elif key == "initial_intercept_ms":
self.o_b = options[key] / 1000.
elif key == "n_kept_sample":
self.n_kept_sample = options[key]
elif key == "n_skipped_sample":
self.n_skipped_sample = options[key]
elif key == "param_refresh_interval":
self.param_refresh_interval = options[key]
else:
logger.warning("Strategy 'target_latency' ignoring unknown configuration key '{key}'.")
logger.warning(f"Optimizer 'linear' ignoring unknown configuration key '{key}'.")

self.o_stat: collections.deque[tuple[int, float]] = collections.deque(
maxlen=self.n_kept_sample
Expand Down Expand Up @@ -181,7 +179,7 @@ def __init__(self, optimizer: Optimizer, options: dict[t.Any, t.Any]):
pass

@abstractmethod
async def batch(self, optimizer: Optimizer, queue: t.Deque[Job], max_latency: float, max_batch_size: int, tick_interval: float, dispatch: t.Callable[[t.Sequence[Job]], None]):
async def batch(self, optimizer: Optimizer, queue: t.Deque[Job], max_latency: float, max_batch_size: int, tick_interval: float, dispatch: t.Callable[[t.Sequence[Job], int], None]):
pass

def __init_subclass__(cls, strategy_id: str):
Expand All @@ -197,23 +195,33 @@ def __init__(self, options: dict[t.Any, t.Any]):
if key == "latency":
self.latency = options[key] / 1000.0
else:
logger.warning("Strategy 'target_latency' ignoring unknown configuration key '{key}'.")
logger.warning(f"Strategy 'target_latency' ignoring unknown configuration key '{key}'.")


async def batch(self, optimizer: Optimizer, queue: t.Deque[Job], max_latency: float, max_batch_size: int, tick_interval: float, dispatch: t.Callable[[t.Sequence[Job]], None]):
async def batch(self, optimizer: Optimizer, queue: t.Deque[Job], max_latency: float, max_batch_size: int, tick_interval: float, dispatch: t.Callable[[t.Sequence[Job], int], None]):
n = len(queue)
now = time.time()
w0 = now - queue[0].enqueue_time
latency_0 = w0 + optimizer.predict(n)

while latency_0 < self.latency:
while latency_0 < self.latency and n < max_batch_size:
n = len(queue)
now = time.time()
w0 = now - queue[0].enqueue_time
latency_0 = w0 + optimizer.predict(n)

await asyncio.sleep(tick_interval)

# call
n_call_out = 0
batch_size = 0
for job in queue:
if batch_size + job.data.sample.batch_size <= max_batch_size:
n_call_out += 1
batch_size += job.data.sample.batch_size
inputs_info = tuple(self._queue.pop() for _ in range(n_call_out))
dispatch(inputs_info, batch_size)


class AdaptiveStrategy(BatchingStrategy, strategy_id="adaptive"):
decay: float = 0.95
Expand All @@ -226,12 +234,14 @@ def __init__(self, options: dict[t.Any, t.Any]):
for key in options:
if key == "decay":
self.decay = options[key]
elif key == "n_kept_samples":
self.n_kept_samples = options[key]
else:
logger.warning("Strategy 'adaptive' ignoring unknown configuration value")
logger.warning(f"Strategy 'adaptive' ignoring unknown configuration value")

self.avg_wait_times = collections.deque(maxlen=self.n_kept_samples)

async def batch(self, optimizer: Optimizer, queue: t.Deque[Job], max_latency: float, max_batch_size: int, tick_interval: float, dispatch: t.Callable[[t.Sequence[Job]], None]):
async def batch(self, optimizer: Optimizer, queue: t.Deque[Job], max_latency: float, max_batch_size: int, tick_interval: float, dispatch: t.Callable[[t.Sequence[Job], int], None]):
n = len(queue)
now = time.time()
w0 = now - queue[0].enqueue_time
Expand All @@ -254,22 +264,26 @@ async def batch(self, optimizer: Optimizer, queue: t.Deque[Job], max_latency: fl
await asyncio.sleep(tick_interval)

# dispatch the batch
n = len(queue)
n_call_out = min(max_batch_size, n)
# call
inputs_info: list[Job] = []
n_call_out = 0
batch_size = 0
for job in queue:
if batch_size + job.data.sample.batch_size <= max_batch_size:
n_call_out += 1

for _ in range(n_call_out):
job = queue.pop()
batch_size += job.data.sample.batch_size
new_wait = (now - job.enqueue_time) / self.n_kept_samples
if len(self.avg_wait_times) == self.n_kept_samples:
oldest_wait = self.avg_wait_times.popleft()
self.avg_req_wait = self.avg_req_wait - oldest_wait + new_wait
else:
# avg deliberately undercounts until we hit n_kept_sample for simplicity
self.avg_req_wait += new_wait
self.avg_wait_times.append(new_wait)
inputs_info.append(job)

dispatch(inputs_info)
dispatch(inputs_info, batch_size)


class Dispatcher:
Expand Down Expand Up @@ -385,15 +399,15 @@ async def train_optimizer(
await asyncio.sleep(self.tick_interval)
continue

n_call_out = min(n, batch_size)
req_count += 1
# call
self._sema.acquire()
n_call_out = 0
batch_size = 0
for job in queue:
if batch_size + job.data.sample.batch_size <= max_batch_size:
n_call_out += 1
batch_size += job.data.sample.batch_size
inputs_info = tuple(self._queue.pop() for _ in range(n_call_out))
for info in inputs_info:
# fake wait as 0 for training requests
info.enqueue_time = now
self._loop.create_task(self.outbound_call(inputs_info))
dispatch(inputs_info, batch_size)
except asyncio.CancelledError:
return
except Exception as e: # pylint: disable=broad-except
Expand Down Expand Up @@ -461,9 +475,9 @@ async def controller(self):
except Exception as e: # pylint: disable=broad-except
logger.error(traceback.format_exc(), exc_info=e)

def _dispatch(self, inputs_info: t.Sequence[Job]):
def _dispatch(self, inputs_info: t.Sequence[Job], batch_size: int):
self._sema.acquire()
task = self._loop.create_task(self.outbound_call(inputs_info))
task = self._loop.create_task(self.outbound_call(inputs_info, batch_size))
self.background_tasks.add(task)
task.add_done_callback(self.background_tasks.discard)

Expand All @@ -476,11 +490,10 @@ async def inbound_call(self, data: t.Any):
self._wake_event.notify_all()
return await future

async def outbound_call(self, inputs_info: t.Sequence[Job]):
async def outbound_call(self, inputs_info: t.Sequence[Job], batch_size: int):
_time_start = time.time()
_done = False
batch_size = len(inputs_info)
logger.debug("Dynamic batching cork released, batch size: %d", batch_size)
logger.debug("Dynamic batching cork released, batch size: %d (%d requests)", batch_size, len(inputs_info))
try:
outputs = await self.callback(
tuple(t.cast(t.Any, input_info.data) for input_info in inputs_info)
Expand Down
1 change: 0 additions & 1 deletion src/bentoml/_internal/server/runner_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ def __init__(

self.dispatchers[method.name] = Dispatcher(
max_latency_in_ms=method.max_latency_ms,
batching_strategy=method.batching_strategy,
max_batch_size=max_batch_size,
optimizer=method.optimizer,
strategy=method.batching_strategy,
Expand Down

0 comments on commit ce337a6

Please sign in to comment.