Skip to content

Commit

Permalink
Add Julia language support
Browse files Browse the repository at this point in the history
* add julia function descriptor to proto/cpp
* make it compile
* Support JuliaFunctionDescriptor in == fallback
* Enable debug-level logging on the Ray backend
* Add Raylet support for Julia
* Return Julia language string
* EDIT: modify Java worker_pool tests to run Julia
* specify julia executable
* fix indent, add log_dir arg missing
* also includt log_dir when calling
* WIP
* Functional worker
* Use JULIA_PROJECT
* Refactor RuntimeEnvContext changes
* Funtional passing in JULIA_PROJECT
* Cleanup
* Rollback non-required changes
* Use exported start_worker
* Apply suggestions from code review
  Signed-off-by: Dave Kleinschmidt <[email protected]>
* use Ray.jl as entrypoint to worker runtime
* pass `--startup_token` to julia worker
* Drop `DEFAULT_WORKER_EXECUTABLE` from `build_julia_worker_command` (#9)
* Runtime environment can specify Julia executable and arguments (#8)
  * Support specifying runtime env executable
  * Support specifying runtime env args
  * Support specifying executable/args via runtime env
  * Avoid quoting default
  * Switch to using command in RuntimeEnvContext
  * Use separate command for Julia
  * Add TODO about switching to plugin
  Co-authored-by: Dave Kleinschmidt <[email protected]>
  Signed-off-by: Curtis Vogt <[email protected]>
* Avoid escaping `command_prefix` (#10)
* formatting
  Signed-off-by: Glenn Moynihan <[email protected]>
* revert logging level changes and delete dev comments
  Signed-off-by: Glenn Moynihan <[email protected]>
* Remove dev logging statements + todo
  Signed-off-by: Glenn Moynihan <[email protected]>
* Missed some JULIA language support
  • Loading branch information
kleinschmidt authored and omus committed Jul 4, 2024
1 parent 1240d3f commit 7660c2d
Show file tree
Hide file tree
Showing 12 changed files with 196 additions and 5 deletions.
6 changes: 5 additions & 1 deletion python/ray/_private/runtime_env/agent/runtime_env_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,11 @@ async def _setup_runtime_env(
# TODO(chenk008): Add log about allocated_resource to
# avoid lint error. That will be moved to cgroup plugin.
per_job_logger.debug(f"Worker has resource :" f"{allocated_resource}")
context = RuntimeEnvContext(env_vars=runtime_env.env_vars())
context = RuntimeEnvContext(
env_vars=runtime_env.env_vars(),
# TODO: use plugin instead of special casing
julia_command=runtime_env.julia_command(),
)

# Warn about unrecognized fields in the runtime env.
for name, _ in runtime_env.plugins():
Expand Down
5 changes: 5 additions & 0 deletions python/ray/_private/runtime_env/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ def __init__(
resources_dir: Optional[str] = None,
override_worker_entrypoint: Optional[str] = None,
java_jars: List[str] = None,
julia_command: List[str] = None,
):
self.command_prefix = command_prefix or []
self.env_vars = env_vars or {}
Expand All @@ -37,6 +38,7 @@ def __init__(
self.resources_dir: str = resources_dir
self.override_worker_entrypoint: Optional[str] = override_worker_entrypoint
self.java_jars = java_jars or []
self.julia_command = julia_command or []

def serialize(self) -> str:
return json.dumps(self.__dict__)
Expand All @@ -63,6 +65,9 @@ def exec_worker(self, passthrough_args: List[str], language: Language):

class_path_args = ["-cp", ray_jars + ":" + str(":".join(local_java_jars))]
passthrough_args = class_path_args + passthrough_args
elif language == Language.JULIA:
executable = self.julia_command or ["julia", "-e", "using Ray; start_worker()"]
executable += ["--"]
elif sys.platform == "win32":
executable = []
else:
Expand Down
65 changes: 65 additions & 0 deletions python/ray/_private/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -1617,6 +1617,7 @@ def start_raylet(
["{},{}".format(*kv) for kv in static_resources.items()]
)

# JAVA
has_java_command = False
if shutil.which("java") is not None:
has_java_command = True
Expand All @@ -1643,6 +1644,7 @@ def start_raylet(
else:
java_worker_command = []

# CPP
if os.path.exists(DEFAULT_WORKER_EXECUTABLE):
cpp_worker_command = build_cpp_worker_command(
gcs_address,
Expand All @@ -1657,6 +1659,25 @@ def start_raylet(
else:
cpp_worker_command = []

# JULIA
has_julia_command = False
if shutil.which("julia") is not None:
has_julia_command = True

if has_julia_command is True:
julia_worker_command = build_julia_worker_command(
gcs_address,
plasma_store_name,
raylet_name,
redis_password,
session_dir,
log_dir,
node_ip_address,
setup_worker_path,
)
else:
julia_worker_command = []

# Create the command that the Raylet will use to start workers.
# TODO(architkulkarni): Pipe in setup worker args separately instead of
# inserting them into start_worker_command and later erasing them if
Expand Down Expand Up @@ -1776,6 +1797,7 @@ def start_raylet(
f"--python_worker_command={subprocess.list2cmdline(start_worker_command)}", # noqa
f"--java_worker_command={subprocess.list2cmdline(java_worker_command)}", # noqa
f"--cpp_worker_command={subprocess.list2cmdline(cpp_worker_command)}", # noqa
f"--julia_worker_command={subprocess.list2cmdline(julia_worker_command)}", # noqa
f"--native_library_path={DEFAULT_NATIVE_LIBRARY_PATH}",
f"--temp_dir={temp_dir}",
f"--session_dir={session_dir}",
Expand Down Expand Up @@ -1950,6 +1972,49 @@ def build_cpp_worker_command(
return command


def build_julia_worker_command(
bootstrap_address: str,
plasma_store_name: str,
raylet_name: str,
redis_password: str,
session_dir: str,
log_dir: str,
node_ip_address: str,
setup_worker_path: str,
):
"""This method assembles the command used to start a Julia worker.
Args:
bootstrap_address: Bootstrap address of ray cluster.
plasma_store_name: The name of the plasma store socket to connect
to.
raylet_name: The name of the raylet socket to create.
redis_password: The password of connect to redis.
session_dir: The path of this session.
node_ip_address: The ip address for this node.
setup_worker_path: The path of the Python file that will set up
the environment for the worker process.
Returns:
The command string for starting Julia worker.
"""

command = [
sys.executable,
setup_worker_path,
f"--ray_plasma_store_socket_name={plasma_store_name}",
f"--ray_raylet_socket_name={raylet_name}",
"--ray_node_manager_port=RAY_NODE_MANAGER_PORT_PLACEHOLDER",
f"--ray_address={bootstrap_address}",
f"--ray_redis_password={redis_password}",
f"--ray_session_dir={session_dir}",
f"--ray_logs_dir={log_dir}",
f"--ray_node_ip_address={node_ip_address}",
"RAY_WORKER_DYNAMIC_OPTION_PLACEHOLDER",
]

return command


def determine_plasma_store_config(
object_store_memory: int,
plasma_directory: Optional[str] = None,
Expand Down
4 changes: 4 additions & 0 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ from ray.includes.common cimport (
LANGUAGE_CPP,
LANGUAGE_JAVA,
LANGUAGE_PYTHON,
LANGUAGE_JULIA,
LocalMemoryBuffer,
TASK_TYPE_NORMAL_TASK,
TASK_TYPE_ACTOR_CREATION_TASK,
Expand Down Expand Up @@ -756,6 +757,8 @@ cdef class Language:
return "CPP"
elif <int32_t>self.lang == <int32_t>LANGUAGE_JAVA:
return "JAVA"
elif <int32_t>self.lang == <int32_t>LANGUAGE_JULIA:
return "JULIA"
else:
raise Exception("Unexpected error")

Expand All @@ -765,6 +768,7 @@ cdef class Language:
PYTHON = Language.from_native(LANGUAGE_PYTHON)
CPP = Language.from_native(LANGUAGE_CPP)
JAVA = Language.from_native(LANGUAGE_JAVA)
JULIA = Language.from_native(LANGUAGE_JULIA)


cdef int prepare_resources(
Expand Down
1 change: 1 addition & 0 deletions python/ray/includes/common.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ cdef extern from "src/ray/protobuf/common.pb.h" nogil:
cdef CLanguage LANGUAGE_PYTHON "Language::PYTHON"
cdef CLanguage LANGUAGE_CPP "Language::CPP"
cdef CLanguage LANGUAGE_JAVA "Language::JAVA"
cdef CLanguage LANGUAGE_JULIA "Language::JULIA"

cdef extern from "src/ray/protobuf/common.pb.h" nogil:
cdef CWorkerType WORKER_TYPE_WORKER "ray::core::WorkerType::WORKER"
Expand Down
8 changes: 8 additions & 0 deletions python/ray/runtime_env/runtime_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ class MyClass:
"container",
"excludes",
"env_vars",
"julia_command",
"_ray_release",
"_ray_commit",
"_inject_current_ray",
Expand Down Expand Up @@ -339,6 +340,8 @@ def __init__(
runtime_env["image_uri"] = image_uri
if runtime_env.get("java_jars"):
runtime_env["java_jars"] = runtime_env.get("java_jars")
if runtime_env.get("julia_command"):
runtime_env["julia_command"] = runtime_env.get("julia_command")

self.update(runtime_env)

Expand Down Expand Up @@ -497,6 +500,11 @@ def nsight(self) -> Optional[Union[str, Dict[str, str]]]:
def env_vars(self) -> Dict:
return self.get("env_vars", {})

def julia_command(self) -> List[str]:
if "julia_command" in self:
return list(self["julia_command"])
return []

def has_conda(self) -> str:
if self.get("conda"):
return True
Expand Down
20 changes: 20 additions & 0 deletions src/ray/common/function_descriptor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,18 @@ FunctionDescriptor FunctionDescriptorBuilder::BuildPython(
return ray::FunctionDescriptor(new PythonFunctionDescriptor(std::move(descriptor)));
}

FunctionDescriptor FunctionDescriptorBuilder::BuildJulia(
const std::string &module_name,
const std::string &function_name,
const std::string &function_hash) {
rpc::FunctionDescriptor descriptor;
auto typed_descriptor = descriptor.mutable_julia_function_descriptor();
typed_descriptor->set_module_name(module_name);
typed_descriptor->set_function_name(function_name);
typed_descriptor->set_function_hash(function_hash);
return ray::FunctionDescriptor(new JuliaFunctionDescriptor(std::move(descriptor)));
}

FunctionDescriptor FunctionDescriptorBuilder::BuildCpp(const std::string &function_name,
const std::string &caller,
const std::string &class_name) {
Expand All @@ -65,6 +77,8 @@ FunctionDescriptor FunctionDescriptorBuilder::FromProto(rpc::FunctionDescriptor
return ray::FunctionDescriptor(new ray::PythonFunctionDescriptor(std::move(message)));
case ray::FunctionDescriptorType::kCppFunctionDescriptor:
return ray::FunctionDescriptor(new ray::CppFunctionDescriptor(std::move(message)));
case ray::FunctionDescriptorType::kJuliaFunctionDescriptor:
return ray::FunctionDescriptor(new ray::JuliaFunctionDescriptor(std::move(message)));
default:
break;
}
Expand Down Expand Up @@ -96,6 +110,12 @@ FunctionDescriptor FunctionDescriptorBuilder::FromVector(
function_descriptor_list[0], // function name
function_descriptor_list[1], // caller
function_descriptor_list[2]); // class name
} else if (language == rpc::Language::JULIA) {
RAY_CHECK(function_descriptor_list.size() == 3);
return FunctionDescriptorBuilder::BuildJulia(
function_descriptor_list[0], // module name
function_descriptor_list[1], // function name
function_descriptor_list[2]); // function hash
} else {
RAY_LOG(FATAL) << "Unsupported language " << language;
return FunctionDescriptorBuilder::Empty();
Expand Down
68 changes: 68 additions & 0 deletions src/ray/common/function_descriptor.h
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,64 @@ class PythonFunctionDescriptor : public FunctionDescriptorInterface {
const rpc::PythonFunctionDescriptor *typed_message_;
};

class JuliaFunctionDescriptor : public FunctionDescriptorInterface {
public:
/// Construct from a protobuf message object.
/// The input message will be **copied** into this object.
///
/// \param message The protobuf message.
explicit JuliaFunctionDescriptor(rpc::FunctionDescriptor message)
: FunctionDescriptorInterface(std::move(message)) {
RAY_CHECK(message_->function_descriptor_case() ==
ray::FunctionDescriptorType::kJuliaFunctionDescriptor);
typed_message_ = &(message_->julia_function_descriptor());
}

virtual size_t Hash() const {
return std::hash<int>()(ray::FunctionDescriptorType::kJuliaFunctionDescriptor) ^
std::hash<std::string>()(typed_message_->module_name()) ^
std::hash<std::string>()(typed_message_->function_name()) ^
std::hash<std::string>()(typed_message_->function_hash());
}

inline bool operator==(const JuliaFunctionDescriptor &other) const {
if (this == &other) {
return true;
}
return this->ModuleName() == other.ModuleName() &&
this->FunctionName() == other.FunctionName() &&
this->FunctionHash() == other.FunctionHash();
}

inline bool operator!=(const JuliaFunctionDescriptor &other) const {
return !(*this == other);
}

virtual std::string ToString() const {
return "{type=JuliaFunctionDescriptor, module_name=" +
typed_message_->module_name() +
", function_name=" + typed_message_->function_name() +
", function_hash=" + typed_message_->function_hash() + "}";
}

virtual std::string CallString() const {
const std::string &module_name = typed_message_->module_name();
const std::string &function_name = typed_message_->function_name();
return module_name.empty() ? function_name : module_name + "." + function_name;
}

virtual std::string ClassName() const { return ""; }

const std::string &ModuleName() const { return typed_message_->module_name(); }

const std::string &FunctionName() const { return typed_message_->function_name(); }

const std::string &FunctionHash() const { return typed_message_->function_hash(); }

private:
const rpc::JuliaFunctionDescriptor *typed_message_;
};

class CppFunctionDescriptor : public FunctionDescriptorInterface {
public:
/// Construct from a protobuf message object.
Expand Down Expand Up @@ -296,6 +354,9 @@ inline bool operator==(const FunctionDescriptor &left, const FunctionDescriptor
case ray::FunctionDescriptorType::kCppFunctionDescriptor:
return static_cast<const CppFunctionDescriptor &>(*left) ==
static_cast<const CppFunctionDescriptor &>(*right);
case ray::FunctionDescriptorType::kJuliaFunctionDescriptor:
return static_cast<const JuliaFunctionDescriptor &>(*left) ==
static_cast<const JuliaFunctionDescriptor &>(*right);
default:
RAY_LOG(FATAL) << "Unknown function descriptor type: " << left->Type();
return false;
Expand Down Expand Up @@ -329,6 +390,13 @@ class FunctionDescriptorBuilder {
const std::string &function_name,
const std::string &function_hash);

/// Build a JuliaFunctionDescriptor.
///
/// \return a ray::JuliaFunctionDescriptor
static FunctionDescriptor BuildJulia(const std::string &module_name,
const std::string &function_name,
const std::string &function_hash);

/// Build a CppFunctionDescriptor.
///
/// \return a ray::CppFunctionDescriptor
Expand Down
2 changes: 2 additions & 0 deletions src/ray/core_worker/common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ std::string LanguageString(Language language) {
return "java";
} else if (language == Language::CPP) {
return "cpp";
} else if (language == Language::JULIA) {
return "julia";
}
RAY_CHECK(false);
return "";
Expand Down
8 changes: 8 additions & 0 deletions src/ray/protobuf/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ enum Language {
PYTHON = 0;
JAVA = 1;
CPP = 2;
JULIA = 3;
}

// Type of a worker.
Expand Down Expand Up @@ -153,12 +154,19 @@ message CppFunctionDescriptor {
string class_name = 3;
}

message JuliaFunctionDescriptor {
string module_name = 1;
string function_name = 2;
string function_hash = 3;
}

// A union wrapper for various function descriptor types.
message FunctionDescriptor {
oneof function_descriptor {
JavaFunctionDescriptor java_function_descriptor = 1;
PythonFunctionDescriptor python_function_descriptor = 2;
CppFunctionDescriptor cpp_function_descriptor = 3;
JuliaFunctionDescriptor julia_function_descriptor = 4;
}
}

Expand Down
Loading

0 comments on commit 7660c2d

Please sign in to comment.