Skip to content

Commit

Permalink
Fix error handling and log spams in Executor
Browse files Browse the repository at this point in the history
* Generate "customer error" if failed to deserialize graph or
  function in Initialize RPC.
  ** These errors are not logged as they are in customer owned code.
  ** These errors are added to function's stderr so customer can
     see them.
* Fix the test for missing module. It check anything because g.run never
  raises.
* Use short formatting for exceptions in prod (non-dev) mode so only
  short backtraces are present in logs without fully rendered local vars
  in the backtraces and etc. This reduces log spam.
* Revert previous quickfixes that suppress logging of exception details
  in Executor.

Testing:

make check
make test
  • Loading branch information
eabatalov committed Jan 2, 2025
1 parent 498dc94 commit 7df0d05
Show file tree
Hide file tree
Showing 9 changed files with 90 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@
)


class CustomerError(RuntimeError):
pass


class FunctionExecutor:
"""Executor side class supporting a running FunctionExecutorServer.
Expand Down Expand Up @@ -50,7 +54,10 @@ async def initialize(
base_url: str,
config_path: Optional[str],
):
"""Creates and initializes a FunctionExecutorServer and all resources associated with it."""
"""Creates and initializes a FunctionExecutorServer and all resources associated with it.
Raises CustomerError if the server failed to initialize due to an error in customer owned code or data.
Raises an Exception if an internal error occured."""
try:
self._server = await self._server_factory.create(
config=config, logger=self._logger
Expand Down Expand Up @@ -129,5 +136,9 @@ async def _initialize_server(
stub: FunctionExecutorStub, initialize_request: InitializeRequest
):
initialize_response: InitializeResponse = await stub.initialize(initialize_request)
if not initialize_response.success:
if initialize_response.success:
return
if initialize_response.HasField("customer_error"):
raise CustomerError(initialize_response.customer_error)
else:
raise Exception("initialize RPC failed at function executor server")
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
)

from ..api_objects import Task
from .function_executor import FunctionExecutor
from .function_executor import CustomerError, FunctionExecutor
from .function_executor_state import FunctionExecutorState
from .server.function_executor_server_factory import (
FunctionExecutorServerConfiguration,
Expand Down Expand Up @@ -50,39 +50,43 @@ async def run(self) -> TaskOutput:
self._state.check_locked()

if self._state.function_executor is None:
self._state.function_executor = await self._create_function_executor()
try:
await self._create_function_executor()
except CustomerError as e:
return TaskOutput(
task=self._task_input.task,
stderr=str(e),
success=False,
)

return await self._run()

async def _create_function_executor(self) -> FunctionExecutor:
function_executor: FunctionExecutor = FunctionExecutor(
server_factory=self._factory, logger=self._logger
)
try:
config: FunctionExecutorServerConfiguration = (
FunctionExecutorServerConfiguration(
image_uri=self._task_input.task.image_uri,
)
)
initialize_request: InitializeRequest = InitializeRequest(
namespace=self._task_input.task.namespace,
graph_name=self._task_input.task.compute_graph,
graph_version=self._task_input.task.graph_version,
function_name=self._task_input.task.compute_fn,
graph=self._task_input.graph,
config: FunctionExecutorServerConfiguration = (
FunctionExecutorServerConfiguration(
image_uri=self._task_input.task.image_uri,
)
)
initialize_request: InitializeRequest = InitializeRequest(
namespace=self._task_input.task.namespace,
graph_name=self._task_input.task.compute_graph,
graph_version=self._task_input.task.graph_version,
function_name=self._task_input.task.compute_fn,
graph=self._task_input.graph,
)

try:
await function_executor.initialize(
config=config,
initialize_request=initialize_request,
base_url=self._base_url,
config_path=self._config_path,
)
return function_executor
except Exception as e:
self._logger.error(
f"failed to initialize function executor: {e.details()}",
# exc_info=e.details(),
)
self._state.function_executor = function_executor
except Exception:
await function_executor.destroy()
raise

Expand Down
4 changes: 2 additions & 2 deletions python-sdk/indexify/executor/task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ async def run(self, task_input: TaskInput, logger: Any) -> TaskOutput:
return await self._run(task_input, logger)
except Exception as e:
logger.error(
f"failed running the task: {e.details()}",
# exc_info=e.debug_error_string(),
"failed running the task:",
exc_info=e,
)
return TaskOutput.internal_error(task_input.task)

Expand Down
20 changes: 9 additions & 11 deletions python-sdk/indexify/function_executor/function_executor_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,24 +55,22 @@ def initialize(
# implementing smart caching in customer code. E.g. load a model into GPU only once and
# share the model's file descriptor between all tasks or download function configuration
# only once.
graph_serializer = get_serializer(request.graph.content_type)

try:
graph = graph_serializer.deserialize(request.graph.bytes)
except Exception as e:
self._logger.error(f"Caught exception {e}")
return InitializeResponse(success=False)

self._function = graph_serializer.deserialize(graph[request.function_name])

self._logger = self._logger.bind(
namespace=request.namespace,
graph_name=request.graph_name,
graph_version=str(request.graph_version),
function_name=request.function_name,
)
self._logger.info("initialized function executor service")
graph_serializer = get_serializer(request.graph.content_type)
try:
# Process user controlled input in a try-except block to not treat errors here as our
# internal platform errors.
graph = graph_serializer.deserialize(request.graph.bytes)
self._function = graph_serializer.deserialize(graph[request.function_name])
except Exception as e:
return InitializeResponse(success=False, customer_error=str(e))

self._logger.info("initialized function executor service")
return InitializeResponse(success=True)

def initialize_invocation_state_server(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ message InitializeRequest {

message InitializeResponse {
optional bool success = 1;
optional string customer_error = 2;
}

message SetInvocationStateRequest {
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,14 @@ class InitializeRequest(_message.Message):
) -> None: ...

class InitializeResponse(_message.Message):
__slots__ = ("success",)
__slots__ = ("success", "customer_error")
SUCCESS_FIELD_NUMBER: _ClassVar[int]
CUSTOMER_ERROR_FIELD_NUMBER: _ClassVar[int]
success: bool
def __init__(self, success: bool = ...) -> None: ...
customer_error: str
def __init__(
self, success: bool = ..., customer_error: _Optional[str] = ...
) -> None: ...

class SetInvocationStateRequest(_message.Message):
__slots__ = ("key", "value")
Expand Down
2 changes: 1 addition & 1 deletion python-sdk/indexify/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def configure_logging_early():

def configure_production_logging():
processors = [
structlog.processors.dict_tracebacks,
structlog.processors.format_exc_info,
structlog.processors.JSONRenderer(),
]
structlog.configure(processors=processors)
17 changes: 10 additions & 7 deletions python-sdk/tests/broken_graph_module_error_test/test_graph.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
import sys
import io
import unittest
from contextlib import redirect_stdout

from extractors import extractor_a, extractor_c

from indexify import RemoteGraph
from indexify.functions_sdk.data_objects import File
from indexify.functions_sdk.graph import Graph
from indexify.functions_sdk.indexify_functions import indexify_function


def create_broken_graph():
Expand All @@ -25,13 +24,17 @@ def test_broken_graph(self):
g = create_broken_graph()
g = RemoteGraph.deploy(g=g)

self.assertRaises(
Exception,
# We don't have a public SDK API to read a function's stderr
# so we rely on internal SDK behavior where it prints a failed function's
# stderr to the current stdout.
func_stdout: io.StringIO = io.StringIO()
with redirect_stdout(func_stdout):
g.run(
block_until_done=True,
a=10,
),
)
)
# Use regex because rich formatting characters are present in the output.
self.assertRegex(func_stdout.getvalue(), r"No module named.*'first_p_dep'")


if __name__ == "__main__":
Expand Down

0 comments on commit 7df0d05

Please sign in to comment.