Skip to content

Commit

Permalink
chore: allow for level 2 of Ray tracing which records memory
Browse files Browse the repository at this point in the history
  • Loading branch information
Jay Chia committed Dec 18, 2024
1 parent 4bb0413 commit 890fd6a
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 19 deletions.
5 changes: 3 additions & 2 deletions daft/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ def set_execution_config(
default_morsel_size: int | None = None,
shuffle_algorithm: str | None = None,
pre_shuffle_merge_threshold: int | None = None,
enable_ray_tracing: bool | None = None,
enable_ray_tracing: int | None = None,
) -> DaftContext:
"""Globally sets various configuration parameters which control various aspects of Daft execution.
Expand Down Expand Up @@ -395,7 +395,8 @@ def set_execution_config(
default_morsel_size: Default size of morsels used for the new local executor. Defaults to 131072 rows.
shuffle_algorithm: The shuffle algorithm to use. Defaults to "map_reduce". Other options are "pre_shuffle_merge".
pre_shuffle_merge_threshold: Memory threshold in bytes for pre-shuffle merge. Defaults to 1GB
enable_ray_tracing: Enable tracing for Ray. Accessible in `/tmp/ray/session_latest/logs/daft` after the run completes. Defaults to False.
enable_ray_tracing: Enable tracing for Ray. Accessible in `/tmp/ray/session_latest/logs/daft` after the run completes. Defaults to 0, but
can be set to 1 or 2 depending on the level of tracing desired. Levels 2 and above require `memray` to be installed.
"""
# Replace values in the DaftExecutionConfig with user-specified overrides
ctx = get_context()
Expand Down
4 changes: 2 additions & 2 deletions daft/daft/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -1736,7 +1736,7 @@ class PyDaftExecutionConfig:
enable_aqe: bool | None = None,
enable_native_executor: bool | None = None,
default_morsel_size: int | None = None,
enable_ray_tracing: bool | None = None,
enable_ray_tracing: int | None = None,
shuffle_algorithm: str | None = None,
pre_shuffle_merge_threshold: int | None = None,
) -> PyDaftExecutionConfig: ...
Expand Down Expand Up @@ -1783,7 +1783,7 @@ class PyDaftExecutionConfig:
@property
def pre_shuffle_merge_threshold(self) -> int: ...
@property
def enable_ray_tracing(self) -> bool: ...
def enable_ray_tracing(self) -> int: ...

class PyDaftPlanningConfig:
@staticmethod
Expand Down
20 changes: 18 additions & 2 deletions daft/runners/ray_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,14 @@ class EndTaskEvent(TaskEvent):

# End Unix timestamp
end: float
memory_stats: TaskMemoryStats | None


@dataclasses.dataclass(frozen=True)
class TaskMemoryStats:
peak_memory_allocated: int
total_memory_allocated: int
total_num_allocations: int


class _NodeInfo:
Expand Down Expand Up @@ -123,9 +131,15 @@ def mark_task_start(
)
)

def mark_task_end(self, execution_id: str, task_id: str, end: float):
def mark_task_end(
self,
execution_id: str,
task_id: str,
end: float,
memory_stats: TaskMemoryStats | None,
):
# Add an EndTaskEvent
self._task_events[execution_id].append(EndTaskEvent(task_id=task_id, end=end))
self._task_events[execution_id].append(EndTaskEvent(task_id=task_id, end=end, memory_stats=memory_stats))

Check warning on line 142 in daft/runners/ray_metrics.py

View check run for this annotation

Codecov / codecov/patch

daft/runners/ray_metrics.py#L142

Added line #L142 was not covered by tests

def get_task_events(self, execution_id: str, idx: int) -> tuple[list[TaskEvent], int]:
events = self._task_events[execution_id]
Expand Down Expand Up @@ -177,11 +191,13 @@ def mark_task_end(
self,
task_id: str,
end: float,
memory_stats: TaskMemoryStats | None,
) -> None:
self.actor.mark_task_end.remote(
self.execution_id,
task_id,
end,
memory_stats,
)

def get_task_events(self, idx: int) -> tuple[list[TaskEvent], int]:
Expand Down
62 changes: 57 additions & 5 deletions daft/runners/ray_tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import dataclasses
import json
import logging
import os
import pathlib
import time
from datetime import datetime
Expand Down Expand Up @@ -50,7 +51,7 @@ def ray_tracer(execution_id: str, daft_execution_config: PyDaftExecutionConfig)
# Dump the RayRunner trace if we detect an active Ray session, otherwise we give up and do not write the trace
ray_logs_location = get_log_location()
filepath: pathlib.Path | None
if ray_logs_location.exists() and daft_execution_config.enable_ray_tracing:
if ray_logs_location.exists() and daft_execution_config.enable_ray_tracing > 0:
trace_filename = (
f"trace_RayRunner.{execution_id}.{datetime.replace(datetime.now(), microsecond=0).isoformat()[:-3]}.json"
)
Expand Down Expand Up @@ -255,6 +256,11 @@ def _flush_task_metrics(self):
"ph": RunnerTracer.PHASE_ASYNC_END,
"pid": 1,
"tid": 2,
"args": {
"memray_peak_memory_allocated": task_event.memory_stats.peak_memory_allocated,
"memray_total_memory_allocated": task_event.memory_stats.total_memory_allocated,
"memray_total_num_allocations": task_event.memory_stats.total_num_allocations,
},
},
ts=end_ts,
)
Expand All @@ -272,6 +278,11 @@ def _flush_task_metrics(self):
"ph": RunnerTracer.PHASE_DURATION_END,
"pid": node_idx + RunnerTracer.NODE_PIDS_START,
"tid": worker_idx,
"args": {
"memray_peak_memory_allocated": task_event.memory_stats.peak_memory_allocated,
"memray_total_memory_allocated": task_event.memory_stats.total_memory_allocated,
"memray_total_num_allocations": task_event.memory_stats.total_num_allocations,
},
},
ts=end_ts,
)
Expand Down Expand Up @@ -655,7 +666,9 @@ def __next__(self):
@contextlib.contextmanager
def collect_ray_task_metrics(execution_id: str, task_id: str, stage_id: int, execution_config: PyDaftExecutionConfig):
"""Context manager that will ping the metrics actor to record various execution metrics about a given task."""
if execution_config.enable_ray_tracing:
if execution_config.enable_ray_tracing == 0:
yield
elif execution_config.enable_ray_tracing == 1:

Check warning on line 671 in daft/runners/ray_tracing.py

View check run for this annotation

Codecov / codecov/patch

daft/runners/ray_tracing.py#L671

Added line #L671 was not covered by tests
import time

runtime_context = ray.get_runtime_context()
Expand All @@ -670,7 +683,46 @@ def collect_ray_task_metrics(execution_id: str, task_id: str, stage_id: int, exe
runtime_context.get_assigned_resources(),
runtime_context.get_task_id(),
)
yield
metrics_actor.mark_task_end(task_id, time.time())
try:
yield

Check warning on line 687 in daft/runners/ray_tracing.py

View check run for this annotation

Codecov / codecov/patch

daft/runners/ray_tracing.py#L686-L687

Added lines #L686 - L687 were not covered by tests
finally:
metrics_actor.mark_task_end(task_id, time.time(), memory_stats=None)
elif execution_config.enable_ray_tracing == 2:
import time

Check warning on line 691 in daft/runners/ray_tracing.py

View check run for this annotation

Codecov / codecov/patch

daft/runners/ray_tracing.py#L689-L691

Added lines #L689 - L691 were not covered by tests

import memray
from memray._memray import compute_statistics

Check warning on line 694 in daft/runners/ray_tracing.py

View check run for this annotation

Codecov / codecov/patch

daft/runners/ray_tracing.py#L693-L694

Added lines #L693 - L694 were not covered by tests

tmpdir = "/tmp/ray/session_latest/logs/daft/task_memray_dumps"
os.makedirs(tmpdir, exist_ok=True)
memray_tmpfile = os.path.join(tmpdir, f"task-{task_id}.memray.bin")

Check warning on line 698 in daft/runners/ray_tracing.py

View check run for this annotation

Codecov / codecov/patch

daft/runners/ray_tracing.py#L696-L698

Added lines #L696 - L698 were not covered by tests

runtime_context = ray.get_runtime_context()
metrics_actor = ray_metrics.get_metrics_actor(execution_id)
metrics_actor.mark_task_start(

Check warning on line 702 in daft/runners/ray_tracing.py

View check run for this annotation

Codecov / codecov/patch

daft/runners/ray_tracing.py#L700-L702

Added lines #L700 - L702 were not covered by tests
task_id,
time.time(),
runtime_context.get_node_id(),
runtime_context.get_worker_id(),
stage_id,
runtime_context.get_assigned_resources(),
runtime_context.get_task_id(),
)
try:
with memray.Tracker(memray_tmpfile, native_traces=True, follow_fork=True):
yield

Check warning on line 713 in daft/runners/ray_tracing.py

View check run for this annotation

Codecov / codecov/patch

daft/runners/ray_tracing.py#L711-L713

Added lines #L711 - L713 were not covered by tests
finally:
stats = compute_statistics(memray_tmpfile)
metrics_actor.mark_task_end(

Check warning on line 716 in daft/runners/ray_tracing.py

View check run for this annotation

Codecov / codecov/patch

daft/runners/ray_tracing.py#L715-L716

Added lines #L715 - L716 were not covered by tests
task_id,
time.time(),
ray_metrics.TaskMemoryStats(
peak_memory_allocated=stats.peak_memory_allocated,
total_memory_allocated=stats.total_memory_allocated,
total_num_allocations=stats.total_num_allocations,
),
)
else:
yield
raise RuntimeError(

Check warning on line 726 in daft/runners/ray_tracing.py

View check run for this annotation

Codecov / codecov/patch

daft/runners/ray_tracing.py#L726

Added line #L726 was not covered by tests
f"Unrecognized value for $DAFT_ENABLE_RAY_TRACING. Expected a number from 0 to 2, but received: {execution_config.enable_ray_tracing}"
)
14 changes: 8 additions & 6 deletions src/common/daft-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ pub struct DaftExecutionConfig {
pub default_morsel_size: usize,
pub shuffle_algorithm: String,
pub pre_shuffle_merge_threshold: usize,
pub enable_ray_tracing: bool,
pub enable_ray_tracing: u32,
}

impl Default for DaftExecutionConfig {
Expand Down Expand Up @@ -80,7 +80,7 @@ impl Default for DaftExecutionConfig {
default_morsel_size: 128 * 1024,
shuffle_algorithm: "map_reduce".to_string(),
pre_shuffle_merge_threshold: 1024 * 1024 * 1024, // 1GB
enable_ray_tracing: false,
enable_ray_tracing: 0,
}
}
}
Expand Down Expand Up @@ -109,10 +109,12 @@ impl DaftExecutionConfig {
cfg.enable_native_executor = true;
}
let ray_tracing_env_var_name = "DAFT_ENABLE_RAY_TRACING";
if let Ok(val) = std::env::var(ray_tracing_env_var_name)
&& matches!(val.trim().to_lowercase().as_str(), "1" | "true")
{
cfg.enable_ray_tracing = true;
if let Ok(val) = std::env::var(ray_tracing_env_var_name) {
if let Ok(val) = val.trim().parse::<u32>() {
cfg.enable_ray_tracing = val;
} else {
log::warn!("Invalid value for DAFT_ENABLE_RAY_TRACING. Expected a number from 0 to 2, but received: {}", val.trim());

Check warning on line 116 in src/common/daft-config/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

src/common/daft-config/src/lib.rs#L113-L116

Added lines #L113 - L116 were not covered by tests
}
}
let shuffle_algorithm_env_var_name = "DAFT_SHUFFLE_ALGORITHM";
if let Ok(val) = std::env::var(shuffle_algorithm_env_var_name) {
Expand Down
4 changes: 2 additions & 2 deletions src/common/daft-config/src/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ impl PyDaftExecutionConfig {
default_morsel_size: Option<usize>,
shuffle_algorithm: Option<&str>,
pre_shuffle_merge_threshold: Option<usize>,
enable_ray_tracing: Option<bool>,
enable_ray_tracing: Option<u32>,
) -> PyResult<Self> {
let mut config = self.config.as_ref().clone();

Expand Down Expand Up @@ -290,7 +290,7 @@ impl PyDaftExecutionConfig {
}

#[getter]
fn enable_ray_tracing(&self) -> PyResult<bool> {
fn enable_ray_tracing(&self) -> PyResult<u32> {
Ok(self.config.enable_ray_tracing)
}
}
Expand Down

0 comments on commit 890fd6a

Please sign in to comment.