Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Create new proxy service to store (and relay) broadcasted daft data #3774

Closed
wants to merge 18 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 39 additions & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 5 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ daft-compression = {path = "src/daft-compression", default-features = false}
daft-connect = {path = "src/daft-connect", optional = true}
daft-core = {path = "src/daft-core", default-features = false}
daft-csv = {path = "src/daft-csv", default-features = false}
daft-dashboard-server = {path = "src/daft-dashboard-server"}
daft-dsl = {path = "src/daft-dsl", default-features = false}
daft-functions = {path = "src/daft-functions"}
daft-functions-json = {path = "src/daft-functions-json", default-features = false}
Expand Down Expand Up @@ -170,10 +171,10 @@ members = [
"src/hyperloglog",
"src/daft-connect",
"src/parquet2",
# "src/spark-connect-script",
"src/generated/spark-connect",
"src/common/partitioning",
"src/daft-ray-execution"
"src/daft-ray-execution",
"src/daft-dashboard-server"
]

[workspace.dependencies]
Expand Down Expand Up @@ -247,7 +248,8 @@ tokio = {version = "1.37.0", features = [
"signal",
"macros",
"rt",
"rt-multi-thread"
"rt-multi-thread",
"sync"
]}
tokio-stream = {version = "0.1.14", features = ["fs", "io-util", "time"]}
tokio-util = "0.7.11"
Expand Down
5 changes: 5 additions & 0 deletions daft/daft/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -1213,6 +1213,11 @@ def binary_length(expr: PyExpr) -> PyExpr: ...
def binary_concat(left: PyExpr, right: PyExpr) -> PyExpr: ...
def binary_slice(expr: PyExpr, start: PyExpr, length: PyExpr | None = None) -> PyExpr: ...

# ---
# dashboard namespace
# ---
def launch(): ...

class PyCatalog:
@staticmethod
def new() -> PyCatalog: ...
Expand Down
15 changes: 15 additions & 0 deletions daft/dashboard.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import os

import daft.daft as native

DAFT_DASHBOARD_ENV_NAME = "DAFT_DASHBOARD"
DAFT_DASHBOARD_ADDR = "http://localhost:3238"


def launch():
"""Launches the Daft dashboard server on port 3000.

The server serves HTML/CSS/JS bundles, so you are able to point your browser towards `http://localhost:3000` and view information regarding your queries.
"""
os.environ[DAFT_DASHBOARD_ENV_NAME] = "1"
native.launch()
44 changes: 44 additions & 0 deletions daft/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import typing
import warnings
from dataclasses import dataclass
from datetime import datetime, timezone
from functools import partial, reduce
from typing import (
TYPE_CHECKING,
Expand All @@ -28,6 +29,7 @@
TypeVar,
Union,
)
from uuid import uuid4

from daft.api_annotations import DataframePublicAPI
from daft.context import get_context
Expand Down Expand Up @@ -158,6 +160,45 @@ def _result(self) -> Optional[PartitionSet]:
else:
return self._result_cache.value

def _explain_broadcast(self):
import json
from urllib import request
from urllib.error import URLError

from daft.dashboard import DAFT_DASHBOARD_ADDR
from daft.dataframe.display import MermaidFormatter

dashboard_addr = os.environ.get("DAFT_DASHBOARD")
if not dashboard_addr:
return
elif not int(dashboard_addr):
return

is_cached = self._result_cache is not None
plan_time_start = datetime.now(timezone.utc)
mermaid_plan = MermaidFormatter(
builder=self.__builder, show_all=True, simple=False, is_cached=is_cached
)._repr_markdown_()
plan_time_end = datetime.now(timezone.utc)

headers = {
"Content-Type": "application/json",
}
data = json.dumps(
{
"id": str(uuid4()),
"mermaid-plan": mermaid_plan,
"plan-time-start": str(plan_time_start),
"plan-time-end": str(plan_time_end),
}
).encode("utf-8")
req = request.Request(DAFT_DASHBOARD_ADDR, headers=headers, data=data)

try:
request.urlopen(req, timeout=1)
except URLError as e:
warnings.warn(f"Failed to broadcast metrics over {DAFT_DASHBOARD_ADDR}: {e}")

@DataframePublicAPI
def explain(
self, show_all: bool = False, format: str = "ascii", simple: bool = False, file: Optional[io.IOBase] = None
Expand Down Expand Up @@ -2818,6 +2859,7 @@ def collect(self, num_preview_rows: Optional[int] = 8) -> "DataFrame":
DataFrame: DataFrame with materialized results.
"""
self._materialize_results()
self._explain_broadcast()

assert self._result is not None
dataframe_len = len(self._result)
Expand Down Expand Up @@ -2889,6 +2931,8 @@ def show(self, n: int = 8) -> None:
n: number of rows to show. Defaults to 8.
"""
dataframe_display = self._construct_show_display(n)
self._explain_broadcast()

try:
from IPython.display import display

Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
22 changes: 22 additions & 0 deletions src/daft-dashboard-server/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
[dependencies]
anyhow = "1.0"
chrono = {workspace = true, features = ["serde"]}
fork = "0.2"
http-body-util = "0.1"
hyper = {features = ["full"], version = "1.6"}
hyper-util = {features = ["full"], version = "0.1"}
parking_lot = "0.12"
serde_json = "1.0"
simdutf8 = "0.1"
futures.workspace = true
pyo3.workspace = true
serde.workspace = true
tokio.workspace = true

[lints]
workspace = true

[package]
name = "daft-dashboard-server"
edition.workspace = true
version.workspace = true
Loading