From 7660c2d020e30b6eee07a7fc2017a636b85c4672 Mon Sep 17 00:00:00 2001 From: Dave Kleinschmidt Date: Tue, 1 Aug 2023 17:36:35 -0400 Subject: [PATCH] Add Julia language support * add julia function descriptor to proto/cpp * make it compile * Support JuliaFunctionDescriptor in == fallback * Enable debug-level logging on the Ray backend * Add Raylet support for Julia * Return Julia language string * EDIT: modify Java worker_pool tests to run Julia * specify julia executable * fix indent, add log_dir arg missing * also includt log_dir when calling * WIP * Functional worker * Use JULIA_PROJECT * Refactor RuntimeEnvContext changes * Funtional passing in JULIA_PROJECT * Cleanup * Rollback non-required changes * Use exported start_worker * Apply suggestions from code review Signed-off-by: Dave Kleinschmidt * use Ray.jl as entrypoint to worker runtime * pass `--startup_token` to julia worker * Drop `DEFAULT_WORKER_EXECUTABLE` from `build_julia_worker_command` (#9) * Runtime environment can specify Julia executable and arguments (#8) * Support specifying runtime env executable * Support specifying runtime env args * Support specifying executable/args via runtime env * Avoid quoting default * Switch to using command in RuntimeEnvContext * Use separate command for Julia * Add TODO about switching to plugin Co-authored-by: Dave Kleinschmidt Signed-off-by: Curtis Vogt * Avoid escaping `command_prefix` (#10) * formatting Signed-off-by: Glenn Moynihan * revert logging level changes and delete dev comments Signed-off-by: Glenn Moynihan * Remove dev logging statements + todo Signed-off-by: Glenn Moynihan * Missed some JULIA language support --- .../runtime_env/agent/runtime_env_agent.py | 6 +- python/ray/_private/runtime_env/context.py | 5 ++ python/ray/_private/services.py | 65 ++++++++++++++++++ python/ray/_raylet.pyx | 4 ++ python/ray/includes/common.pxd | 1 + python/ray/runtime_env/runtime_env.py | 8 +++ src/ray/common/function_descriptor.cc | 20 ++++++ src/ray/common/function_descriptor.h | 68 +++++++++++++++++++ src/ray/core_worker/common.cc | 2 + src/ray/protobuf/common.proto | 8 +++ src/ray/raylet/main.cc | 10 ++- src/ray/raylet/worker_pool.cc | 4 +- 12 files changed, 196 insertions(+), 5 deletions(-) diff --git a/python/ray/_private/runtime_env/agent/runtime_env_agent.py b/python/ray/_private/runtime_env/agent/runtime_env_agent.py index 87b5c9d2d807f..6571141651bcd 100644 --- a/python/ray/_private/runtime_env/agent/runtime_env_agent.py +++ b/python/ray/_private/runtime_env/agent/runtime_env_agent.py @@ -314,7 +314,11 @@ async def _setup_runtime_env( # TODO(chenk008): Add log about allocated_resource to # avoid lint error. That will be moved to cgroup plugin. per_job_logger.debug(f"Worker has resource :" f"{allocated_resource}") - context = RuntimeEnvContext(env_vars=runtime_env.env_vars()) + context = RuntimeEnvContext( + env_vars=runtime_env.env_vars(), + # TODO: use plugin instead of special casing + julia_command=runtime_env.julia_command(), + ) # Warn about unrecognized fields in the runtime env. for name, _ in runtime_env.plugins(): diff --git a/python/ray/_private/runtime_env/context.py b/python/ray/_private/runtime_env/context.py index 0d3fd38572fbb..7b70faa285aab 100644 --- a/python/ray/_private/runtime_env/context.py +++ b/python/ray/_private/runtime_env/context.py @@ -26,6 +26,7 @@ def __init__( resources_dir: Optional[str] = None, override_worker_entrypoint: Optional[str] = None, java_jars: List[str] = None, + julia_command: List[str] = None, ): self.command_prefix = command_prefix or [] self.env_vars = env_vars or {} @@ -37,6 +38,7 @@ def __init__( self.resources_dir: str = resources_dir self.override_worker_entrypoint: Optional[str] = override_worker_entrypoint self.java_jars = java_jars or [] + self.julia_command = julia_command or [] def serialize(self) -> str: return json.dumps(self.__dict__) @@ -63,6 +65,9 @@ def exec_worker(self, passthrough_args: List[str], language: Language): class_path_args = ["-cp", ray_jars + ":" + str(":".join(local_java_jars))] passthrough_args = class_path_args + passthrough_args + elif language == Language.JULIA: + executable = self.julia_command or ["julia", "-e", "using Ray; start_worker()"] + executable += ["--"] elif sys.platform == "win32": executable = [] else: diff --git a/python/ray/_private/services.py b/python/ray/_private/services.py index 802b9a06199b1..9374a2290698e 100644 --- a/python/ray/_private/services.py +++ b/python/ray/_private/services.py @@ -1617,6 +1617,7 @@ def start_raylet( ["{},{}".format(*kv) for kv in static_resources.items()] ) + # JAVA has_java_command = False if shutil.which("java") is not None: has_java_command = True @@ -1643,6 +1644,7 @@ def start_raylet( else: java_worker_command = [] + # CPP if os.path.exists(DEFAULT_WORKER_EXECUTABLE): cpp_worker_command = build_cpp_worker_command( gcs_address, @@ -1657,6 +1659,25 @@ def start_raylet( else: cpp_worker_command = [] + # JULIA + has_julia_command = False + if shutil.which("julia") is not None: + has_julia_command = True + + if has_julia_command is True: + julia_worker_command = build_julia_worker_command( + gcs_address, + plasma_store_name, + raylet_name, + redis_password, + session_dir, + log_dir, + node_ip_address, + setup_worker_path, + ) + else: + julia_worker_command = [] + # Create the command that the Raylet will use to start workers. # TODO(architkulkarni): Pipe in setup worker args separately instead of # inserting them into start_worker_command and later erasing them if @@ -1776,6 +1797,7 @@ def start_raylet( f"--python_worker_command={subprocess.list2cmdline(start_worker_command)}", # noqa f"--java_worker_command={subprocess.list2cmdline(java_worker_command)}", # noqa f"--cpp_worker_command={subprocess.list2cmdline(cpp_worker_command)}", # noqa + f"--julia_worker_command={subprocess.list2cmdline(julia_worker_command)}", # noqa f"--native_library_path={DEFAULT_NATIVE_LIBRARY_PATH}", f"--temp_dir={temp_dir}", f"--session_dir={session_dir}", @@ -1950,6 +1972,49 @@ def build_cpp_worker_command( return command +def build_julia_worker_command( + bootstrap_address: str, + plasma_store_name: str, + raylet_name: str, + redis_password: str, + session_dir: str, + log_dir: str, + node_ip_address: str, + setup_worker_path: str, +): + """This method assembles the command used to start a Julia worker. + + Args: + bootstrap_address: Bootstrap address of ray cluster. + plasma_store_name: The name of the plasma store socket to connect + to. + raylet_name: The name of the raylet socket to create. + redis_password: The password of connect to redis. + session_dir: The path of this session. + node_ip_address: The ip address for this node. + setup_worker_path: The path of the Python file that will set up + the environment for the worker process. + Returns: + The command string for starting Julia worker. + """ + + command = [ + sys.executable, + setup_worker_path, + f"--ray_plasma_store_socket_name={plasma_store_name}", + f"--ray_raylet_socket_name={raylet_name}", + "--ray_node_manager_port=RAY_NODE_MANAGER_PORT_PLACEHOLDER", + f"--ray_address={bootstrap_address}", + f"--ray_redis_password={redis_password}", + f"--ray_session_dir={session_dir}", + f"--ray_logs_dir={log_dir}", + f"--ray_node_ip_address={node_ip_address}", + "RAY_WORKER_DYNAMIC_OPTION_PLACEHOLDER", + ] + + return command + + def determine_plasma_store_config( object_store_memory: int, plasma_directory: Optional[str] = None, diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 76ebbd160d816..060612ce73608 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -110,6 +110,7 @@ from ray.includes.common cimport ( LANGUAGE_CPP, LANGUAGE_JAVA, LANGUAGE_PYTHON, + LANGUAGE_JULIA, LocalMemoryBuffer, TASK_TYPE_NORMAL_TASK, TASK_TYPE_ACTOR_CREATION_TASK, @@ -756,6 +757,8 @@ cdef class Language: return "CPP" elif self.lang == LANGUAGE_JAVA: return "JAVA" + elif self.lang == LANGUAGE_JULIA: + return "JULIA" else: raise Exception("Unexpected error") @@ -765,6 +768,7 @@ cdef class Language: PYTHON = Language.from_native(LANGUAGE_PYTHON) CPP = Language.from_native(LANGUAGE_CPP) JAVA = Language.from_native(LANGUAGE_JAVA) + JULIA = Language.from_native(LANGUAGE_JULIA) cdef int prepare_resources( diff --git a/python/ray/includes/common.pxd b/python/ray/includes/common.pxd index 71fdd92375b9e..e2acef90c4d02 100644 --- a/python/ray/includes/common.pxd +++ b/python/ray/includes/common.pxd @@ -243,6 +243,7 @@ cdef extern from "src/ray/protobuf/common.pb.h" nogil: cdef CLanguage LANGUAGE_PYTHON "Language::PYTHON" cdef CLanguage LANGUAGE_CPP "Language::CPP" cdef CLanguage LANGUAGE_JAVA "Language::JAVA" + cdef CLanguage LANGUAGE_JULIA "Language::JULIA" cdef extern from "src/ray/protobuf/common.pb.h" nogil: cdef CWorkerType WORKER_TYPE_WORKER "ray::core::WorkerType::WORKER" diff --git a/python/ray/runtime_env/runtime_env.py b/python/ray/runtime_env/runtime_env.py index 06b230741ea5d..a8e42383a0fcf 100644 --- a/python/ray/runtime_env/runtime_env.py +++ b/python/ray/runtime_env/runtime_env.py @@ -274,6 +274,7 @@ class MyClass: "container", "excludes", "env_vars", + "julia_command", "_ray_release", "_ray_commit", "_inject_current_ray", @@ -339,6 +340,8 @@ def __init__( runtime_env["image_uri"] = image_uri if runtime_env.get("java_jars"): runtime_env["java_jars"] = runtime_env.get("java_jars") + if runtime_env.get("julia_command"): + runtime_env["julia_command"] = runtime_env.get("julia_command") self.update(runtime_env) @@ -497,6 +500,11 @@ def nsight(self) -> Optional[Union[str, Dict[str, str]]]: def env_vars(self) -> Dict: return self.get("env_vars", {}) + def julia_command(self) -> List[str]: + if "julia_command" in self: + return list(self["julia_command"]) + return [] + def has_conda(self) -> str: if self.get("conda"): return True diff --git a/src/ray/common/function_descriptor.cc b/src/ray/common/function_descriptor.cc index 22a9979322664..9a3ee3a178538 100644 --- a/src/ray/common/function_descriptor.cc +++ b/src/ray/common/function_descriptor.cc @@ -46,6 +46,18 @@ FunctionDescriptor FunctionDescriptorBuilder::BuildPython( return ray::FunctionDescriptor(new PythonFunctionDescriptor(std::move(descriptor))); } +FunctionDescriptor FunctionDescriptorBuilder::BuildJulia( + const std::string &module_name, + const std::string &function_name, + const std::string &function_hash) { + rpc::FunctionDescriptor descriptor; + auto typed_descriptor = descriptor.mutable_julia_function_descriptor(); + typed_descriptor->set_module_name(module_name); + typed_descriptor->set_function_name(function_name); + typed_descriptor->set_function_hash(function_hash); + return ray::FunctionDescriptor(new JuliaFunctionDescriptor(std::move(descriptor))); +} + FunctionDescriptor FunctionDescriptorBuilder::BuildCpp(const std::string &function_name, const std::string &caller, const std::string &class_name) { @@ -65,6 +77,8 @@ FunctionDescriptor FunctionDescriptorBuilder::FromProto(rpc::FunctionDescriptor return ray::FunctionDescriptor(new ray::PythonFunctionDescriptor(std::move(message))); case ray::FunctionDescriptorType::kCppFunctionDescriptor: return ray::FunctionDescriptor(new ray::CppFunctionDescriptor(std::move(message))); + case ray::FunctionDescriptorType::kJuliaFunctionDescriptor: + return ray::FunctionDescriptor(new ray::JuliaFunctionDescriptor(std::move(message))); default: break; } @@ -96,6 +110,12 @@ FunctionDescriptor FunctionDescriptorBuilder::FromVector( function_descriptor_list[0], // function name function_descriptor_list[1], // caller function_descriptor_list[2]); // class name + } else if (language == rpc::Language::JULIA) { + RAY_CHECK(function_descriptor_list.size() == 3); + return FunctionDescriptorBuilder::BuildJulia( + function_descriptor_list[0], // module name + function_descriptor_list[1], // function name + function_descriptor_list[2]); // function hash } else { RAY_LOG(FATAL) << "Unsupported language " << language; return FunctionDescriptorBuilder::Empty(); diff --git a/src/ray/common/function_descriptor.h b/src/ray/common/function_descriptor.h index a9544b2e6c4fe..42ae56cde18cb 100644 --- a/src/ray/common/function_descriptor.h +++ b/src/ray/common/function_descriptor.h @@ -220,6 +220,64 @@ class PythonFunctionDescriptor : public FunctionDescriptorInterface { const rpc::PythonFunctionDescriptor *typed_message_; }; +class JuliaFunctionDescriptor : public FunctionDescriptorInterface { + public: + /// Construct from a protobuf message object. + /// The input message will be **copied** into this object. + /// + /// \param message The protobuf message. + explicit JuliaFunctionDescriptor(rpc::FunctionDescriptor message) + : FunctionDescriptorInterface(std::move(message)) { + RAY_CHECK(message_->function_descriptor_case() == + ray::FunctionDescriptorType::kJuliaFunctionDescriptor); + typed_message_ = &(message_->julia_function_descriptor()); + } + + virtual size_t Hash() const { + return std::hash()(ray::FunctionDescriptorType::kJuliaFunctionDescriptor) ^ + std::hash()(typed_message_->module_name()) ^ + std::hash()(typed_message_->function_name()) ^ + std::hash()(typed_message_->function_hash()); + } + + inline bool operator==(const JuliaFunctionDescriptor &other) const { + if (this == &other) { + return true; + } + return this->ModuleName() == other.ModuleName() && + this->FunctionName() == other.FunctionName() && + this->FunctionHash() == other.FunctionHash(); + } + + inline bool operator!=(const JuliaFunctionDescriptor &other) const { + return !(*this == other); + } + + virtual std::string ToString() const { + return "{type=JuliaFunctionDescriptor, module_name=" + + typed_message_->module_name() + + ", function_name=" + typed_message_->function_name() + + ", function_hash=" + typed_message_->function_hash() + "}"; + } + + virtual std::string CallString() const { + const std::string &module_name = typed_message_->module_name(); + const std::string &function_name = typed_message_->function_name(); + return module_name.empty() ? function_name : module_name + "." + function_name; + } + + virtual std::string ClassName() const { return ""; } + + const std::string &ModuleName() const { return typed_message_->module_name(); } + + const std::string &FunctionName() const { return typed_message_->function_name(); } + + const std::string &FunctionHash() const { return typed_message_->function_hash(); } + + private: + const rpc::JuliaFunctionDescriptor *typed_message_; +}; + class CppFunctionDescriptor : public FunctionDescriptorInterface { public: /// Construct from a protobuf message object. @@ -296,6 +354,9 @@ inline bool operator==(const FunctionDescriptor &left, const FunctionDescriptor case ray::FunctionDescriptorType::kCppFunctionDescriptor: return static_cast(*left) == static_cast(*right); + case ray::FunctionDescriptorType::kJuliaFunctionDescriptor: + return static_cast(*left) == + static_cast(*right); default: RAY_LOG(FATAL) << "Unknown function descriptor type: " << left->Type(); return false; @@ -329,6 +390,13 @@ class FunctionDescriptorBuilder { const std::string &function_name, const std::string &function_hash); + /// Build a JuliaFunctionDescriptor. + /// + /// \return a ray::JuliaFunctionDescriptor + static FunctionDescriptor BuildJulia(const std::string &module_name, + const std::string &function_name, + const std::string &function_hash); + /// Build a CppFunctionDescriptor. /// /// \return a ray::CppFunctionDescriptor diff --git a/src/ray/core_worker/common.cc b/src/ray/core_worker/common.cc index 0f640e154bc37..4a380f461f219 100644 --- a/src/ray/core_worker/common.cc +++ b/src/ray/core_worker/common.cc @@ -39,6 +39,8 @@ std::string LanguageString(Language language) { return "java"; } else if (language == Language::CPP) { return "cpp"; + } else if (language == Language::JULIA) { + return "julia"; } RAY_CHECK(false); return ""; diff --git a/src/ray/protobuf/common.proto b/src/ray/protobuf/common.proto index ae321a8a7cadd..e125808d6368e 100644 --- a/src/ray/protobuf/common.proto +++ b/src/ray/protobuf/common.proto @@ -26,6 +26,7 @@ enum Language { PYTHON = 0; JAVA = 1; CPP = 2; + JULIA = 3; } // Type of a worker. @@ -153,12 +154,19 @@ message CppFunctionDescriptor { string class_name = 3; } +message JuliaFunctionDescriptor { + string module_name = 1; + string function_name = 2; + string function_hash = 3; +} + // A union wrapper for various function descriptor types. message FunctionDescriptor { oneof function_descriptor { JavaFunctionDescriptor java_function_descriptor = 1; PythonFunctionDescriptor python_function_descriptor = 2; CppFunctionDescriptor cpp_function_descriptor = 3; + JuliaFunctionDescriptor julia_function_descriptor = 4; } } diff --git a/src/ray/raylet/main.cc b/src/ray/raylet/main.cc index 4c77133c29d1e..76b871d0138d9 100644 --- a/src/ray/raylet/main.cc +++ b/src/ray/raylet/main.cc @@ -64,6 +64,7 @@ DEFINE_int32(maximum_startup_concurrency, 1, "Maximum startup concurrency."); DEFINE_string(static_resource_list, "", "The static resource list of this node."); DEFINE_string(python_worker_command, "", "Python worker command."); DEFINE_string(java_worker_command, "", "Java worker command."); +DEFINE_string(julia_worker_command, "", "Julia worker command."); DEFINE_string(dashboard_agent_command, "", "Dashboard agent command."); DEFINE_string(runtime_env_agent_command, "", "Runtime env agent command."); DEFINE_string(cpp_worker_command, "", "CPP worker command."); @@ -158,6 +159,7 @@ int main(int argc, char *argv[]) { const std::string static_resource_list = FLAGS_static_resource_list; const std::string python_worker_command = FLAGS_python_worker_command; const std::string java_worker_command = FLAGS_java_worker_command; + const std::string julia_worker_command = FLAGS_julia_worker_command; const std::string dashboard_agent_command = FLAGS_dashboard_agent_command; const std::string runtime_env_agent_command = FLAGS_runtime_env_agent_command; const std::string cpp_worker_command = FLAGS_cpp_worker_command; @@ -337,10 +339,14 @@ int main(int argc, char *argv[]) { node_manager_config.worker_commands.emplace( make_pair(ray::Language::CPP, ParseCommandLine(cpp_worker_command))); } + if (!julia_worker_command.empty()) { + node_manager_config.worker_commands.emplace( + make_pair(ray::Language::JULIA, ParseCommandLine(julia_worker_command))); + } node_manager_config.native_library_path = native_library_path; if (python_worker_command.empty() && java_worker_command.empty() && - cpp_worker_command.empty()) { - RAY_LOG(FATAL) << "At least one of Python/Java/CPP worker command " + cpp_worker_command.empty() && julia_worker_command.empty()) { + RAY_LOG(FATAL) << "At least one of Python/Java/CPP/Julia worker command " << "should be provided"; } if (dashboard_agent_command.empty()) { diff --git a/src/ray/raylet/worker_pool.cc b/src/ray/raylet/worker_pool.cc index 74e682b7a52e2..c4a705ec72409 100644 --- a/src/ray/raylet/worker_pool.cc +++ b/src/ray/raylet/worker_pool.cc @@ -359,7 +359,7 @@ WorkerPool::BuildProcessCommandArgs(const Language &language, // TODO(jjyao) This should be renamed to worker cache key hash worker_command_args.push_back("--runtime-env-hash=" + std::to_string(runtime_env_hash)); - } else if (language == Language::CPP) { + } else if (language == Language::CPP || language == Language::JULIA) { worker_command_args.push_back("--startup_token=" + std::to_string(worker_startup_token_counter_)); worker_command_args.push_back("--ray_runtime_env_hash=" + @@ -846,7 +846,7 @@ Status WorkerPool::RegisterDriver(const std::shared_ptr &driver const auto job_id = driver->GetAssignedJobId(); HandleJobStarted(job_id, job_config); - if (driver->GetLanguage() == Language::JAVA) { + if (driver->GetLanguage() == Language::JAVA || driver->GetLanguage() == Language::JULIA) { send_reply_callback(Status::OK(), port); } else { if (!first_job_registered_ && RayConfig::instance().prestart_worker_first_driver() &&