Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Data] Make memory an explicit map parameter #50269

Merged
merged 28 commits into from
Feb 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
cb830b0
Initial commit
bveeramani Nov 22, 2024
1d9dd2e
Merge branch 'master' of https://github.com/ray-project/ray
bveeramani Nov 25, 2024
bed2558
Merge branch 'master' of https://github.com/ray-project/ray
bveeramani Nov 26, 2024
c40b06d
Merge branch 'master' of https://github.com/ray-project/ray
bveeramani Dec 2, 2024
49a8d71
Merge branch 'master' of https://github.com/ray-project/ray
bveeramani Dec 2, 2024
cad41d5
Merge branch 'master' of https://github.com/ray-project/ray
bveeramani Dec 3, 2024
17df280
Merge branch 'master' of https://github.com/ray-project/ray
bveeramani Dec 4, 2024
bb0e778
Merge branch 'master' of https://github.com/ray-project/ray
bveeramani Dec 5, 2024
2620239
Merge branch 'master' of https://github.com/ray-project/ray
bveeramani Dec 9, 2024
4dff29a
Merge branch 'master' of https://github.com/ray-project/ray
bveeramani Dec 11, 2024
0393f97
Merge branch 'master' of https://github.com/ray-project/ray
bveeramani Dec 16, 2024
2a92d8e
Merge branch 'master' of https://github.com/ray-project/ray
bveeramani Dec 18, 2024
7d912a8
Merge branch 'master' of https://github.com/ray-project/ray
bveeramani Dec 18, 2024
e38bf85
Merge branch 'master' of https://github.com/ray-project/ray
bveeramani Dec 28, 2024
01c8911
Merge branch 'master' of https://github.com/ray-project/ray
bveeramani Dec 30, 2024
a7e6cb2
Merge branch 'master' of https://github.com/ray-project/ray
bveeramani Jan 2, 2025
53dfa51
Merge branch 'master' of https://github.com/ray-project/ray
bveeramani Jan 8, 2025
27b1315
Merge branch 'master' of https://github.com/ray-project/ray
bveeramani Jan 9, 2025
ba39108
Merge branch 'master' of https://github.com/ray-project/ray
bveeramani Jan 9, 2025
6f9d698
Merge branch 'master' of https://github.com/ray-project/ray
bveeramani Jan 22, 2025
11c498f
Merge branch 'master' of https://github.com/ray-project/ray
bveeramani Jan 27, 2025
ed5fd50
Merge branch 'master' of https://github.com/ray-project/ray
bveeramani Feb 4, 2025
1209f80
Merge branch 'master' of https://github.com/ray-project/ray
bveeramani Feb 4, 2025
f992321
Merge branch 'master' of https://github.com/ray-project/ray
bveeramani Feb 5, 2025
b893608
Initial commit
bveeramani Feb 5, 2025
fe6e5a4
Remove unecessary import
bveeramani Feb 5, 2025
333309f
Add documentation
bveeramani Feb 5, 2025
7574dea
Fix bug and address review comment
bveeramani Feb 6, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions doc/source/data/transforming-data.rst
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,28 @@ To transform data with a Python class, complete these steps:

ds.materialize()

Avoiding out-of-memory errors
=============================

If your user defined function uses lots of memory, you might encounter out-of-memory
errors. To avoid these errors, configure the ``memory`` parameter. It tells Ray how much
memory your function uses, and prevents Ray from scheduling too many tasks on a node.

.. testcode::
:hide:

import ray

ds = ray.data.range(1)

.. testcode::

def uses_lots_of_memory(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
...

# Tell Ray that the function uses 1 GiB of memory
ds.map_batches(uses_lots_of_memory, memory=1 * 1024 * 1024)

.. _transforming_groupby:

Groupby and transforming groups
Expand Down
17 changes: 17 additions & 0 deletions python/ray/data/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ def map(
fn_constructor_kwargs: Optional[Dict[str, Any]] = None,
num_cpus: Optional[float] = None,
num_gpus: Optional[float] = None,
memory: Optional[float] = None,
concurrency: Optional[Union[int, Tuple[int, int]]] = None,
ray_remote_args_fn: Optional[Callable[[], Dict[str, Any]]] = None,
**ray_remote_args,
Expand Down Expand Up @@ -335,6 +336,7 @@ def parse_filename(row: Dict[str, Any]) -> Dict[str, Any]:
num_gpus: The number of GPUs to reserve for each parallel map worker. For
example, specify `num_gpus=1` to request 1 GPU for each parallel map
worker.
memory: The heap memory in bytes to reserve for each parallel map worker.
concurrency: The number of Ray workers to use concurrently. For a fixed-sized
worker pool of size ``n``, specify ``concurrency=n``. For an autoscaling
worker pool from ``m`` to ``n`` workers, specify ``concurrency=(m, n)``.
Expand Down Expand Up @@ -370,6 +372,9 @@ def parse_filename(row: Dict[str, Any]) -> Dict[str, Any]:
if num_gpus is not None:
ray_remote_args["num_gpus"] = num_gpus

if memory is not None:
ray_remote_args["memory"] = memory

plan = self._plan.copy()
map_op = MapRows(
self._logical_plan.dag,
Expand Down Expand Up @@ -412,6 +417,7 @@ def map_batches(
fn_constructor_kwargs: Optional[Dict[str, Any]] = None,
num_cpus: Optional[float] = None,
num_gpus: Optional[float] = None,
memory: Optional[float] = None,
concurrency: Optional[Union[int, Tuple[int, int]]] = None,
ray_remote_args_fn: Optional[Callable[[], Dict[str, Any]]] = None,
**ray_remote_args,
Expand Down Expand Up @@ -560,6 +566,7 @@ def __call__(self, batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
num_cpus: The number of CPUs to reserve for each parallel map worker.
num_gpus: The number of GPUs to reserve for each parallel map worker. For
example, specify `num_gpus=1` to request 1 GPU for each parallel map worker.
memory: The heap memory in bytes to reserve for each parallel map worker.
concurrency: The number of Ray workers to use concurrently. For a fixed-sized
worker pool of size ``n``, specify ``concurrency=n``. For an autoscaling
worker pool from ``m`` to ``n`` workers, specify ``concurrency=(m, n)``.
Expand Down Expand Up @@ -629,6 +636,7 @@ def __call__(self, batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
fn_constructor_kwargs=fn_constructor_kwargs,
num_cpus=num_cpus,
num_gpus=num_gpus,
memory=memory,
concurrency=concurrency,
ray_remote_args_fn=ray_remote_args_fn,
**ray_remote_args,
Expand All @@ -648,6 +656,7 @@ def _map_batches_without_batch_size_validation(
fn_constructor_kwargs: Optional[Dict[str, Any]],
num_cpus: Optional[float],
num_gpus: Optional[float],
memory: Optional[float],
concurrency: Optional[Union[int, Tuple[int, int]]],
ray_remote_args_fn: Optional[Callable[[], Dict[str, Any]]],
**ray_remote_args,
Expand All @@ -671,6 +680,9 @@ def _map_batches_without_batch_size_validation(
if num_gpus is not None:
ray_remote_args["num_gpus"] = num_gpus

if memory is not None:
ray_remote_args["memory"] = memory

batch_format = _apply_batch_format(batch_format)

min_rows_per_bundled_input = None
Expand Down Expand Up @@ -1103,6 +1115,7 @@ def flat_map(
fn_constructor_kwargs: Optional[Dict[str, Any]] = None,
num_cpus: Optional[float] = None,
num_gpus: Optional[float] = None,
memory: Optional[float] = None,
concurrency: Optional[Union[int, Tuple[int, int]]] = None,
ray_remote_args_fn: Optional[Callable[[], Dict[str, Any]]] = None,
**ray_remote_args,
Expand Down Expand Up @@ -1168,6 +1181,7 @@ def duplicate_row(row: Dict[str, Any]) -> List[Dict[str, Any]]:
num_gpus: The number of GPUs to reserve for each parallel map worker. For
example, specify `num_gpus=1` to request 1 GPU for each parallel map
worker.
memory: The heap memory in bytes to reserve for each parallel map worker.
concurrency: The number of Ray workers to use concurrently. For a
fixed-sized worker pool of size ``n``, specify ``concurrency=n``.
For an autoscaling worker pool from ``m`` to ``n`` workers, specify
Expand Down Expand Up @@ -1202,6 +1216,9 @@ def duplicate_row(row: Dict[str, Any]) -> List[Dict[str, Any]]:
if num_gpus is not None:
ray_remote_args["num_gpus"] = num_gpus

if memory is not None:
ray_remote_args["memory"] = memory

plan = self._plan.copy()
op = FlatMap(
input_op=self._logical_plan.dag,
Expand Down
3 changes: 3 additions & 0 deletions python/ray/data/grouped_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ def map_groups(
fn_constructor_kwargs: Optional[Dict[str, Any]] = None,
num_cpus: Optional[float] = None,
num_gpus: Optional[float] = None,
memory: Optional[float] = None,
concurrency: Optional[Union[int, Tuple[int, int]]] = None,
**ray_remote_args,
) -> "Dataset":
Expand Down Expand Up @@ -175,6 +176,7 @@ def map_groups(
num_gpus: The number of GPUs to reserve for each parallel map worker. For
example, specify `num_gpus=1` to request 1 GPU for each parallel map
worker.
memory: The heap memory in bytes to reserve for each parallel map worker.
ray_remote_args: Additional resource requirements to request from
Ray (e.g., num_gpus=1 to request GPUs for the map tasks). See
:func:`ray.remote` for details.
Expand Down Expand Up @@ -257,6 +259,7 @@ def wrapped_fn(batch, *args, **kwargs):
fn_constructor_kwargs=fn_constructor_kwargs,
num_cpus=num_cpus,
num_gpus=num_gpus,
memory=memory,
concurrency=concurrency,
ray_remote_args_fn=None,
**ray_remote_args,
Expand Down
36 changes: 27 additions & 9 deletions python/ray/data/tests/test_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import ray
from ray._private.test_utils import wait_for_condition
from ray.data import Dataset
from ray.data._internal.execution.interfaces.ref_bundle import (
_ref_bundles_iterator_to_block_refs_list,
)
Expand Down Expand Up @@ -1137,7 +1138,8 @@ def __call__(self, x):
assert values == [11, 15, 19]


def test_map_with_memory_resources(shutdown_only):
@pytest.mark.parametrize("method", [Dataset.map, Dataset.map_batches, Dataset.flat_map])
def test_map_with_memory_resources(method, shutdown_only):
"""Test that we can use memory resource to limit the concurrency."""
num_blocks = 50
memory_per_task = 100 * 1024**2
Expand All @@ -1146,19 +1148,35 @@ def test_map_with_memory_resources(shutdown_only):

concurrency_counter = ConcurrencyCounter.remote()

def map_batches(batch):
def map_fn(row_or_batch):
ray.get(concurrency_counter.inc.remote())
time.sleep(0.5)
ray.get(concurrency_counter.decr.remote())
return batch
if method is Dataset.flat_map:
return [row_or_batch]
else:
return row_or_batch

ds = ray.data.range(num_blocks, override_num_blocks=num_blocks)
ds = ds.map_batches(
map_batches,
batch_size=None,
num_cpus=1,
memory=memory_per_task,
)
if method is Dataset.map:
ds = ds.map(
map_fn,
num_cpus=1,
memory=memory_per_task,
)
elif method is Dataset.map_batches:
ds = ds.map_batches(
map_fn,
batch_size=None,
num_cpus=1,
memory=memory_per_task,
)
elif method is Dataset.flat_map:
ds = ds.flat_map(
map_fn,
num_cpus=1,
memory=memory_per_task,
)
assert len(ds.take(num_blocks)) == num_blocks

actual_max_concurrency = ray.get(concurrency_counter.get_max_concurrency.remote())
Expand Down