Skip to content

Commit

Permalink
update pytorch script runner to use pipe
Browse files Browse the repository at this point in the history
  • Loading branch information
ngc92 committed Jan 13, 2025
1 parent 0c04a1b commit f58a407
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 106 deletions.
19 changes: 12 additions & 7 deletions src/discord-cluster-manager/cogs/verify_run_cog.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
import re
from pathlib import Path
from unittest.mock import AsyncMock

import discord
Expand All @@ -12,19 +13,16 @@
logger = setup_logging()


def create_mock_attachment():
def create_mock_attachment(file_name: str, content: str):
"Create an AsyncMock to simulate discord.Attachment"

mock_attachment = AsyncMock(spec=discord.Attachment)
mock_attachment.filename = "test_script.py"
mock_attachment.filename = file_name
mock_attachment.content_type = "text/plain"
mock_attachment.read = AsyncMock(return_value="print('Hello, world!')".encode("utf-8"))
mock_attachment.read = AsyncMock(return_value=content.encode("utf-8"))
return mock_attachment


script_file = create_mock_attachment()


class VerifyRunCog(commands.Cog):
"""
A Discord cog for verifying the success of training runs.
Expand All @@ -45,6 +43,7 @@ async def verify_github_run(
interaction: discord.Interaction,
) -> bool:
github_command = github_cog.run_github
script_file = create_mock_attachment("test_script.py", "print('Hello, world!')")
github_thread = await github_command.callback(github_cog, interaction, script_file, choice)

message_contents = [msg.content async for msg in github_thread.history(limit=None)]
Expand Down Expand Up @@ -86,7 +85,13 @@ async def verify_modal_run(self, modal_cog: ModalCog, interaction: discord.Inter
t4 = app_commands.Choice(name="T4", value="t4")
modal_command = modal_cog.run_modal

modal_thread = await modal_command.callback(modal_cog, interaction, script_file, t4)
sub_code = create_mock_attachment(
"submission.py", Path("examples/identity_py/submission.py").read_text()
)
ref_code = Path("examples/identity_py/reference.py").read_text()
modal_thread = await modal_command.callback(
modal_cog, interaction, sub_code, t4, reference_code=ref_code
)

message_contents = [msg.content async for msg in modal_thread.history(limit=None)]

Expand Down
86 changes: 86 additions & 0 deletions src/discord-cluster-manager/eval.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import math
import os
import time

import torch
from reference import check_implementation, generate_input, ref_kernel
from train import custom_kernel


class PopcornLogger:
def __init__(self, fd):
self.channel = open(fd, "w")

def log(self, key: str, value):
print(f"{key}: {value}\n", file=self.channel)


def correctness() -> bool:
for _ in range(10): # check multiple times
inputs = generate_input()

custom_output = custom_kernel(inputs)
ref_output = ref_kernel(inputs)

if not check_implementation(custom_output, ref_output):
return False

print("custom implementation matches the reference implementation.")
return True


def metric(logger: PopcornLogger):
warmup_runs = 10
timed_runs = 100

# Warmup Code
print("warming up...")
for _ in range(warmup_runs):
inputs = generate_input()
_ = custom_kernel(inputs)
torch.cuda.synchronize()

# Timing Code
times = []

for _ in range(timed_runs):
inputs = generate_input()

start_time = time.time()
custom_output = custom_kernel(inputs)
torch.cuda.synchronize()
end_time = time.time()
times.append(end_time - start_time)

ref_output = ref_kernel(inputs)
torch.cuda.synchronize()
if not check_implementation(custom_output, ref_output):
logger.log("check", "fail")
exit(1)

total_time = sum(times)
average_duration = total_time / timed_runs
variance = sum(map(lambda x: (x - average_duration) ** 2, times)) # noqa
standard_deviation = math.sqrt(variance / (timed_runs - 1))
standard_error = standard_deviation / math.sqrt(timed_runs)

logger.log("check", "pass")
logger.log("duration.mean", average_duration * 1e9)
logger.log("duration.std", standard_deviation * 1e9)
logger.log("duration.err", standard_error * 1e9)
logger.log("duration.best", min(times) * 1e9)
logger.log("duration.worst", max(times) * 1e9)

print(f"Submitted kernel runtime: {average_duration:.4f} ± {standard_error:.4} seconds")


def main():
logger = PopcornLogger(int(os.environ["POPCORN_FD"]))
if not correctness():
logger.log("check", "fail")
exit(1)
metric(logger)


if __name__ == "__main__":
main()
68 changes: 1 addition & 67 deletions src/discord-cluster-manager/leaderboard_eval.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,71 +4,5 @@

from pathlib import Path

py_eval = """
import torch
import time
from reference import ref_kernel, generate_input, check_implementation
from train import custom_kernel
def correctness() -> bool:
for _ in range(10): # check multiple times
inputs = generate_input()
custom_output = custom_kernel(inputs)
ref_output = ref_kernel(inputs)
if not check_implementation(custom_output, ref_output):
return False
print('custom implementation matches the reference implementation.')
return True
def metric():
warmup_runs = 10
timed_runs = 100
# Warmup Code
print('warming up...')
for _ in range(warmup_runs):
inputs = generate_input()
_ = custom_kernel(inputs)
torch.cuda.synchronize()
# Timing Code
total_time = 0.0
for _ in range(timed_runs):
inputs = generate_input()
start_time = time.time()
custom_output = custom_kernel(inputs)
torch.cuda.synchronize()
end_time = time.time()
total_time += (end_time - start_time)
ref_output = ref_kernel(inputs)
torch.cuda.synchronize()
if not check_implementation(custom_output, ref_output):
return -1
custom_duration = total_time / timed_runs
print(f'Submitted kernel runtime: {custom_duration:.4f} seconds')
return custom_duration
def main():
assert (correctness())
s = metric()
print(f'score:{s}')
if __name__ == '__main__':
main()
"""

py_eval = Path.read_text(Path(__file__).parent / "eval.py")
cu_eval = Path.read_text(Path(__file__).parent / "eval.cu")
27 changes: 26 additions & 1 deletion src/discord-cluster-manager/modal_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,40 @@ def modal_run_pytorch_script( # noqa: C901
"""Modal version of run_pytorch_script, handling timeouts"""
try:
with timeout(timeout_seconds):
return run_pytorch_script(
run_result = run_pytorch_script(
script_content=script_content,
reference_content=reference_content,
submission_content=submission_content,
arch=arch,
)
if not run_result.success:
# exit code 1 encodes failed tests
if run_result.exit_code == 1:
return f"check_implementation failed:\n{run_result.stderr}", 0.0
else:
return (
f"Script failed with exit code "
f"({run_result.exit_code}):\n{run_result.stderr}",
0.0,
)

print("run process stdout:", run_result.stdout)
print("run process stderr:", run_result.stderr)

score = float(run_result.result.get("duration.mean", "0.0")) / 1e9
passed = run_result.result.get("check", "") == "pass"
if not passed:
return "check_implementation failed", 0.0

if score is None:
return run_result.stdout, run_result.duration

return run_result.stdout, score

except TimeoutException as e:
return f"Timeout Error: {str(e)}", 0.0
except Exception as e:
return f"Error executing script: {str(e)}", 0.0


def modal_run_cuda_script( # # noqa: C901
Expand Down
39 changes: 8 additions & 31 deletions src/discord-cluster-manager/run_eval.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ def compile_cuda_script( # # noqa: C901
)


def run_cuda_program(args: list[str]) -> RunResult:
def run_program(args: list[str]) -> RunResult:
# set up a pipe so the tester can communicate its verdict with us
env = os.environ.copy()
pipe_read, pipe_write = os.pipe()
Expand Down Expand Up @@ -142,7 +142,9 @@ def run_cuda_program(args: list[str]) -> RunResult:
result_dict[key.strip()] = value.strip()

return RunResult(
success=run_process.returncode == 0,
# TODO should we return 0 also on test failure?
# TODO check what return codes python uses, e.g. on uncaught exception
success=(run_process.returncode == 0 or run_process.returncode == 1),
command=_make_cmd(run_process.args),
stdout=run_process.stdout,
stderr=run_process.stderr,
Expand Down Expand Up @@ -206,7 +208,7 @@ def run_cuda_script( # # noqa: C901
result={},
)

run_result = run_cuda_program(["./eval.out"])
run_result = run_program(["./eval.out"])
return compile_result, run_result

finally:
Expand All @@ -221,9 +223,9 @@ def run_pytorch_script( # noqa: C901
reference_content: Optional[str] = None,
submission_content: Optional[str] = None,
arch: int = None,
) -> tuple[str, float]:
) -> RunResult:
"""
Executes the provided PyTorch GPU kernel in an isolated environment with a timeout
Executes the provided PyTorch GPU kernel in an isolated environment
Args:
script_content: The PyTorch script containing the GPU kernel to benchmark
Expand All @@ -247,33 +249,8 @@ def run_pytorch_script( # noqa: C901
with open("eval.py", "w") as f:
f.write(script_content)

execution_start_time = time.perf_counter()
result = subprocess.run(
["python", "eval.py"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)

if result.returncode != 0:
raise RuntimeError(
"Script execution failed with return code "
+ f"{result.returncode}:\n{result.stderr}"
)

score = None
for line in result.stdout.splitlines():
if line.startswith("score:"):
score = float(line.split(":")[1].strip())
return "score", score

if score is None:
execution_end_time = time.perf_counter()
score = execution_end_time - execution_start_time
return run_program(["python", "eval.py"])

return result.stdout, score
except Exception as e:
return f"Error executing script: {str(e)}", 0.0
finally:
tmp_files = ["eval.py", "reference.py", "train.py"]
for f in tmp_files:
Expand Down

0 comments on commit f58a407

Please sign in to comment.