Skip to content

Commit

Permalink
Add Executor CLI arg that defines port range used for Function Executors
Browse files Browse the repository at this point in the history
Also disable port reuse in Function Executor gRPC Servers and
fail if a wrong task was routed to a Function Executor.
Previously all these scenarios produced undefined behavior.

This allows to run multiple Executors on the same machine which is
convenient for Indexify development, users experimentation with
their workflows and to implement integ tests that require multiple
Executors.

This also provides clear and early error messages in case the ports
got misconfigured and if Executor -> Function Executor routing is
misbehaving.
  • Loading branch information
eabatalov committed Jan 8, 2025
1 parent 0aa922c commit eb91d71
Show file tree
Hide file tree
Showing 16 changed files with 143 additions and 56 deletions.
24 changes: 15 additions & 9 deletions executor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,25 @@ to run customer functions. Executor should never link with Indexify Python-SDK.
about programming languages and runtime environments used by Indexify Functions. Function Executor is
responsible for this.

This package doesn't provide an executable entry point that runs an Executor. This is intentional
as Executor has many configurable subcomponents. `indexify` package provides a cli with `executor`
command that runs Executor with functionality available in Open Source offering.

## Deployment

An Executor can run in a Virtual Machine, container or a in bare metal host. Each vm/container/bare metal
host in an Indexify cluster deployment runs a single Executor daemon process.
Open Source users manage the Executors fleet themself e.g. using Kubernetes, any other cluster
orchestrator or even manually.
### Production setup

An Executor can be configured to only run particular functions. This allows to route certain Indexify
functions to only particular subsets of Executors to e.g. implement a load balancing strategy.
A single Executor runs in a Virtual Machine, container or a in bare metal host. An Indexify cluster
is scaled by adding more Executor hosts. Open Source users manage and scale the hosts themself e.g.
using Kubernetes, any other orchestrator or even manually. E.g. the users provision secrets,
persistent volumes to each host using the orchestrator or manually. Each Executor runs a single function.
The function name and other qualifiers are defined in Executor arguments.

This package doesn't provide an executable entry point that runs an Executor. This is intentional
as Executor has many configurable subcomponents. `indexify` package provides a cli with `executor`
command that runs Executor with functionality available in Open Source offering.
### Development setup

To make Indexify development and testing easier an Executor in development mode can run any function.
Running multiple Executors on the same host is supported too. In this case each Executor requires a
unique port range passed to it in its arguments.

## Threat model

Expand Down
1 change: 1 addition & 0 deletions executor/src/executor/downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ def _write_cached_graph(
# Atomically rename the fully written file at tmp path.
# This allows us to not use any locking because file link/unlink
# are atomic operations at filesystem level.
# This also allows to share the same cache between multiple Executors.
os.replace(tmp_path, path)

async def download_input(self, task: Task) -> SerializedObject:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import asyncio
import os
import signal
from typing import Any, Optional
from typing import Any, List, Optional

from .function_executor_server_factory import (
FunctionExecutorServerConfiguration,
Expand All @@ -14,10 +14,10 @@ class SubprocessFunctionExecutorServerFactory(FunctionExecutorServerFactory):
def __init__(
self,
development_mode: bool,
server_ports: range,
):
self._development_mode: bool = development_mode
# Registred ports range end at 49151. We start from 50000 to hopefully avoid conflicts.
self._free_ports = set(range(50000, 51000))
self._free_ports: List[int] = list(reversed(server_ports))

async def create(
self, config: FunctionExecutorServerConfiguration, logger: Any
Expand Down Expand Up @@ -100,7 +100,9 @@ def _allocate_port(self) -> int:
def _release_port(self, port: int) -> None:
# No asyncio.Lock is required here because this operation never awaits
# and it is always called from the same thread where the event loop is running.
self._free_ports.add(port)
#
# Prefer port reuse to repro as many possible issues deterministically as possible.
self._free_ports.append(port)


def _server_address(port: int) -> str:
Expand Down
4 changes: 4 additions & 0 deletions executor/src/executor/function_executor/single_task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ async def _create_function_executor(self) -> FunctionExecutor:

async def _run(self) -> TaskOutput:
request: RunTaskRequest = RunTaskRequest(
namespace=self._task_input.task.namespace,
graph_name=self._task_input.task.compute_graph,
graph_version=self._task_input.task.graph_version,
function_name=self._task_input.task.compute_fn,
graph_invocation_id=self._task_input.task.invocation_id,
task_id=self._task_input.task.id,
function_input=self._task_input.input,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
RouterCallResult,
)
from indexify.functions_sdk.invocation_state.invocation_state import InvocationState
from indexify.http_client import IndexifyClient

from function_executor.proto.function_executor_pb2 import (
RunTaskRequest,
Expand Down Expand Up @@ -115,24 +114,6 @@ def _flush_logs(self) -> None:
sys.stderr.flush()


def _indexify_client(
logger: Any,
namespace: str,
indexify_server_addr: str,
config_path: Optional[str],
) -> IndexifyClient:
# This client is required to implement key/value store functionality for customer functions.
protocol: str = "http"
if config_path:
logger.info("TLS is enabled")
protocol = "https"
return IndexifyClient(
service_url=f"{protocol}://{indexify_server_addr}",
namespace=namespace,
config_path=config_path,
)


def _is_router(func_wrapper: IndexifyFunctionWrapper) -> bool:
"""Determines if the function is a router.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@ def check(self):
Raises: ValueError: If the request is invalid.
"""
(
self._message_validator.required_field("graph_invocation_id")
self._message_validator.required_field("namespace")
.required_field("graph_name")
.required_field("graph_version")
.required_field("function_name")
.required_field("graph_invocation_id")
.required_field("task_id")
.required_serialized_object("function_input")
.optional_serialized_object("function_init_value")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,18 @@
# This is due to internal hard gRPC limits. When we want to increase the message sizes
# we'll have to implement chunking for large messages.
_MAX_GRPC_MESSAGE_LENGTH = -1
# Disable port reuse: fail if multiple Function Executor Servers attempt to bind to the
# same port. This happens when Indexify users misconfigure the Servers. Disabling the port
# reuse results in a clear error message on Server startup instead of obscure errors later
# while Indexify cluster is serving tasks.
# If we don't disable port reuse then a random Server gets the requests so wrong tasks get
# routed to wrong servers.
_REUSE_SERVER_PORT = 0

GRPC_SERVER_OPTIONS = [
("grpc.max_receive_message_length", _MAX_GRPC_MESSAGE_LENGTH),
("grpc.max_send_message_length", _MAX_GRPC_MESSAGE_LENGTH),
("grpc.so_reuseport", _REUSE_SERVER_PORT),
]

GRPC_CHANNEL_OPTIONS = GRPC_SERVER_OPTIONS
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,14 @@ message RouterOutput {
}

message RunTaskRequest {
optional string graph_invocation_id = 4;
optional string namespace = 1;
optional string graph_name = 2;
optional int32 graph_version = 3;
optional string function_name = 4;
optional string graph_invocation_id = 5;
optional string task_id = 6;
optional SerializedObject function_input = 9;
optional SerializedObject function_init_value = 10;
optional SerializedObject function_input = 7;
optional SerializedObject function_init_value = 8;
}

message RunTaskResponse {
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -142,21 +142,37 @@ class RouterOutput(_message.Message):

class RunTaskRequest(_message.Message):
__slots__ = (
"namespace",
"graph_name",
"graph_version",
"function_name",
"graph_invocation_id",
"task_id",
"function_input",
"function_init_value",
)
NAMESPACE_FIELD_NUMBER: _ClassVar[int]
GRAPH_NAME_FIELD_NUMBER: _ClassVar[int]
GRAPH_VERSION_FIELD_NUMBER: _ClassVar[int]
FUNCTION_NAME_FIELD_NUMBER: _ClassVar[int]
GRAPH_INVOCATION_ID_FIELD_NUMBER: _ClassVar[int]
TASK_ID_FIELD_NUMBER: _ClassVar[int]
FUNCTION_INPUT_FIELD_NUMBER: _ClassVar[int]
FUNCTION_INIT_VALUE_FIELD_NUMBER: _ClassVar[int]
namespace: str
graph_name: str
graph_version: int
function_name: str
graph_invocation_id: str
task_id: str
function_input: SerializedObject
function_init_value: SerializedObject
def __init__(
self,
namespace: _Optional[str] = ...,
graph_name: _Optional[str] = ...,
graph_version: _Optional[int] = ...,
function_name: _Optional[str] = ...,
graph_invocation_id: _Optional[str] = ...,
task_id: _Optional[str] = ...,
function_input: _Optional[_Union[SerializedObject, _Mapping]] = ...,
Expand Down
19 changes: 19 additions & 0 deletions function_executor/src/function_executor/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,25 @@ def run_task(
# If our code raises an exception the grpc framework converts it into GRPC_STATUS_UNKNOWN
# error with the exception message. Differentiating errors is not needed for now.
RunTaskRequestValidator(request=request).check()

# Fail with internal error as this happened due to wrong task routing to this Server.
if request.namespace != self._namespace:
raise ValueError(
f"This Function Executor is not initialized for this namespace {request.namespace}"
)
if request.graph_name != self._graph_name:
raise ValueError(
f"This Function Executor is not initialized for this graph {request.graph_name}"
)
if request.graph_version != self._graph_version:
raise ValueError(
f"This Function Executor is not initialized for this graph version {request.graph_version}"
)
if request.function_name != self._function_name:
raise ValueError(
f"This Function Executor is not initialized for this function {request.function_name}"
)

return RunTaskHandler(
request=request,
graph_name=self._graph_name,
Expand Down
Loading

0 comments on commit eb91d71

Please sign in to comment.