Skip to content

Commit

Permalink
Use interprocess pipe to signal exec error in forked process
Browse files Browse the repository at this point in the history
Implements canonical fork/ exec error signaling process that uses a pipe
configure to close on `exec` sys. call to notify the parent process if
an error occurs in the child and it is unable to `exec` the desired
executable file

- Implement Pipe abstraction
- Add `read_bytes` capability to `FileDecscriptor`
- Add file descriptor controls function to Libc. bindings
- Fix previously broken test for rasing on exec errors

Refs:
- https://cr.yp.to/docs/selfpipe.html

Signed-off-by: Hristo I. Gueorguiev <[email protected]>
  • Loading branch information
izo0x90 committed Feb 17, 2025
1 parent b109fc4 commit 294413f
Show file tree
Hide file tree
Showing 4 changed files with 236 additions and 21 deletions.
34 changes: 34 additions & 0 deletions stdlib/src/builtin/file_descriptor.mojo
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@ f.close()
```
"""
from sys import os_is_macos, os_is_linux
from sys._amdgpu import printf_append_string_n, printf_begin
from sys.ffi import external_call
from sys.info import is_amd_gpu, is_nvidia_gpu

from builtin.io import _printf
from builtin.os import abort
from memory import Span, UnsafePointer


Expand Down Expand Up @@ -85,6 +87,38 @@ struct FileDescriptor(Writer):
written,
)

@always_inline
fn read_bytes(mut self, size: Int) raises -> Span[Byte, MutableAnyOrigin]:
"""
Read a Span of bytes from the file.
Args:
size: Number of bytes to read.
Returns:
A list of bytes.
"""

@parameter
if is_nvidia_gpu():
constrained[False, "Nvidia GPU read bytes not implemented"]()
return abort[Span[Byte, MutableAnyOrigin]]()
elif is_amd_gpu():
constrained[False, "AMD GPU read bytes not implemented"]()
return abort[Span[Byte, MutableAnyOrigin]]()
elif os_is_macos() or os_is_linux():
var buf = UnsafePointer[UInt8].alloc(size)
read = external_call["read", Int](self.value, buf, size)

if read < 0:
buf.free()
raise Error("Failed to read bytes.")

return Span[Byte, MutableAnyOrigin](ptr=buf, length=size)
else:
constrained[False, "Unknown platform read bytes not implemented"]()
return abort[Span[Byte, MutableAnyOrigin]]()

fn write[*Ts: Writable](mut self, *args: *Ts):
"""Write a sequence of Writable arguments to the provided Writer.
Expand Down
159 changes: 154 additions & 5 deletions stdlib/src/os/process.mojo
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,140 @@ Example:
from os import Process
```
"""
from collections import Optional
from collections.string import StringSlice

from sys import (
external_call,
os_is_linux,
os_is_macos,
os_is_windows,
)
from sys._libc import vfork, execvp, kill, SignalCodes
from sys.ffi import OpaquePointer, c_char, c_int, c_str_ptr
from sys._libc import (
vfork,
execvp,
exit,
kill,
SignalCodes,
pipe,
fcntl,
FcntlCommands,
FcntlFDFlags,
close,
)
from sys.ffi import c_char, c_int, c_str_ptr
from sys.os import sep

from memory import UnsafePointer
from memory import Span, UnsafePointer


# ===----------------------------------------------------------------------=== #
# Process comm.
# ===----------------------------------------------------------------------=== #
struct Pipe:
"""Create a pipe for interprocess communication.
Example usage:
```
pipe().write_bytes("TEST".as_bytes())
```
"""

var fd_in: Optional[FileDescriptor]
"""File descriptor for pipe input."""
var fd_out: Optional[FileDescriptor]
"""File descriptor for pipe output."""

fn __init__(
mut self,
in_close_on_exec: Bool = False,
out_close_on_exec: Bool = False,
) raises:
"""Struct to manage interprocess pipe comms.
Args:
in_close_on_exec: Close the read side of pipe if `exec` sys. call is issued in process.
out_close_on_exec: Close the write side of pipe if `exec` sys. call is issued in process.
"""
var pipe_fds = UnsafePointer[c_int].alloc(2)
if pipe(pipe_fds) < 0:
pipe_fds.free()
raise Error("Failed to create pipe")

if in_close_on_exec:
if not self._set_close_on_exec(pipe_fds[0]):
pipe_fds.free()
raise Error("Failed to configure input pipe close on exec")

if out_close_on_exec:
if not self._set_close_on_exec(pipe_fds[1]):
pipe_fds.free()
raise Error("Failed to configure output pipe close on exec")

self.fd_in = FileDescriptor(Int(pipe_fds[0]))
self.fd_out = FileDescriptor(Int(pipe_fds[1]))
pipe_fds.free()

fn __del__(owned self):
"""Ensures pipes input and output file descriptors are closed, when the object is destroyed.
"""
self.set_input_only()
self.set_output_only()

@staticmethod
fn _set_close_on_exec(fd: c_int) -> Bool:
return (
fcntl(
fd,
FcntlCommands.F_SETFD,
fcntl(fd, FcntlCommands.F_GETFD, 0) | FcntlFDFlags.FD_CLOEXEC,
)
== 0
)

@always_inline
fn set_input_only(mut self):
"""Close the output descriptor/ channel for this side of the pipe."""
if self.fd_out:
_ = close(rebind[Int](self.fd_out.value()))
self.fd_out = None

@always_inline
fn set_output_only(mut self):
"""Close the input descriptor/ channel for this side of the pipe."""
if self.fd_in:
_ = close(rebind[Int](self.fd_in.value()))
self.fd_in = None

@always_inline
fn write_bytes(mut self, bytes: Span[Byte, _]) raises:
"""
Write a span of bytes to the pipe.
Args:
bytes: The byte span to write to this pipe.
"""
if self.fd_out:
self.fd_out.value().write_bytes(bytes)
else:
raise Error("Can not write from read only side of pipe")

@always_inline
fn read_bytes(mut self, size: Int) raises -> Span[Byte, MutableAnyOrigin]:
"""
Read a span of bytes from this pipe.
Args:
size: The number of bytes to read from this pipe.
Returns:
Span of bytes with len=size read from this pipe.
"""
if self.fd_in:
return self.fd_in.value().read_bytes(size)

raise Error("Can not read from write only side of pipe")


# ===----------------------------------------------------------------------=== #
# Process execution
Expand Down Expand Up @@ -103,8 +225,15 @@ struct Process:
@parameter
if os_is_linux() or os_is_macos():
var file_name = path.split(sep)[-1]
var pipe = Pipe(out_close_on_exec=True)
var exec_err_code = String("EXEC_ERR")

var pid = vfork()

if pid == 0:
"""Child process."""
pipe.set_output_only()

var arg_count = len(argv)
var argv_array_ptr_cstr_ptr = UnsafePointer[c_str_ptr].alloc(
arg_count + 2
Expand All @@ -125,10 +254,30 @@ struct Process:

# This will only get reached if exec call fails to replace currently executing code
argv_array_ptr_cstr_ptr.free()
raise Error("Failed to execute " + path)

# Canonical fork/ exec error handling pattern of using a pipe that closes on exec is
# used to signal error to parent process `https://cr.yp.to/docs/selfpipe.html`
pipe.write_bytes(exec_err_code.as_bytes())

exit(1)

elif pid < 0:
raise Error("Unable to fork parent")

pipe.set_input_only()
var err: Optional[Span[Byte, MutableAnyOrigin]]
try:
err = pipe.read_bytes(exec_err_code.byte_length())
except e:
err = None

if (
err
and len(err.value()) > 0
and StringSlice(unsafe_from_utf8=err.value()) == exec_err_code
):
raise Error("Failed to execute " + path)

return Process(child_pid=pid)
elif os_is_windows():
constrained[
Expand Down
34 changes: 34 additions & 0 deletions stdlib/src/sys/_libc.mojo
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,40 @@ fn kill(pid: c_int, sig: c_int) -> c_int:
return external_call["kill", c_int](pid, sig)


@always_inline
fn pipe(fildes: UnsafePointer[c_int]) -> c_int:
return external_call["pipe", c_int](fildes)


@always_inline
fn close(fd: c_int) -> c_int:
return external_call["close", c_int](fd)


@always_inline
fn write(fd: c_int, buf: OpaquePointer, nbyte: c_size_t) -> c_int:
return external_call["write", c_int](fd, buf, nbyte)


# ===-----------------------------------------------------------------------===#
# fcntl.h - Control over file descriptors
# ===-----------------------------------------------------------------------===#


struct FcntlCommands:
alias F_GETFD: c_int = 1
alias F_SETFD: c_int = 2


struct FcntlFDFlags:
alias FD_CLOEXEC: c_int = 1


@always_inline
fn fcntl[*types: Intable](fd: c_int, cmd: c_int, *args: *types) -> c_int:
return external_call["fcntl", c_int](fd, cmd, args)


# ===-----------------------------------------------------------------------===#
# dlfcn.h — dynamic library operations
# ===-----------------------------------------------------------------------===#
Expand Down
30 changes: 14 additions & 16 deletions stdlib/test/os/test_process.mojo
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,24 @@ from os import Process
from testing import assert_false, assert_raises


# CHECK-LABEL: TEST
# CHECK-LABEL: TEST_ECHO
def test_process_run():
_ = Process.run("echo", List[String]("== TEST"))
_ = Process.run("echo", List[String]("== TEST_ECHO"))


# def test_process_run_missing():
# # assert_raises does not work with exception raised in child process
# # crashes with thread error
# missing_executable_file = "ThIsFiLeCoUlDNoTPoSsIbLlYExIsT.NoTAnExTeNsIoN"
#
# # verify that the test file does not exist before starting the test
# assert_false(
# exists(missing_executable_file),
# "Unexpected file '" + missing_executable_file + "' it should not exist",
# )
#
# # Forking appears to break asserts
# with assert_raises():
# _ = Process.run(missing_executable_file, List[String]())
def test_process_run_missing():
missing_executable_file = "ThIsFiLeCoUlDNoTPoSsIbLlYExIsT.NoTAnExTeNsIoN"

# verify that the test file does not exist before starting the test
assert_false(
exists(missing_executable_file),
"Unexpected file '" + missing_executable_file + "' it should not exist",
)

with assert_raises():
_ = Process.run(missing_executable_file, List[String]())


def main():
test_process_run()
test_process_run_missing()

0 comments on commit 294413f

Please sign in to comment.