From f550ccc56afc74c9960207f24c6fc571e3bff214 Mon Sep 17 00:00:00 2001 From: "Richard (Rick) Zamora" Date: Wed, 20 Nov 2024 13:00:42 -0600 Subject: [PATCH] Extract ``GPUEngine`` config options at translation time (#17339) Follow up to https://github.com/rapidsai/cudf/pull/16944 That PR added `config: GPUEngine` to the arguments of every `IR.do_evaluate` function. In order to simplify future multi-GPU development, this PR extracts the necessary configuration argument at `IR` translation time instead. Authors: - Richard (Rick) Zamora (https://github.com/rjzamora) - Lawrence Mitchell (https://github.com/wence-) Approvers: - https://github.com/brandon-b-miller - Lawrence Mitchell (https://github.com/wence-) URL: https://github.com/rapidsai/cudf/pull/17339 --- python/cudf_polars/cudf_polars/callback.py | 6 +- python/cudf_polars/cudf_polars/dsl/ir.py | 62 +++++++------------ .../cudf_polars/cudf_polars/dsl/translate.py | 8 ++- .../cudf_polars/testing/asserts.py | 2 +- python/cudf_polars/docs/overview.md | 3 +- python/cudf_polars/tests/dsl/test_to_ast.py | 4 +- .../cudf_polars/tests/dsl/test_traversal.py | 12 ++-- .../tests/expressions/test_sort.py | 6 +- 8 files changed, 42 insertions(+), 61 deletions(-) diff --git a/python/cudf_polars/cudf_polars/callback.py b/python/cudf_polars/cudf_polars/callback.py index c446ce0384e..7915c9e6b18 100644 --- a/python/cudf_polars/cudf_polars/callback.py +++ b/python/cudf_polars/cudf_polars/callback.py @@ -129,7 +129,6 @@ def set_device(device: int | None) -> Generator[int, None, None]: def _callback( ir: IR, - config: GPUEngine, with_columns: list[str] | None, pyarrow_predicate: str | None, n_rows: int | None, @@ -146,7 +145,7 @@ def _callback( set_device(device), set_memory_resource(memory_resource), ): - return ir.evaluate(cache={}, config=config).to_polars() + return ir.evaluate(cache={}).to_polars() def validate_config_options(config: dict) -> None: @@ -201,7 +200,7 @@ def execute_with_cudf(nt: NodeTraverser, *, config: GPUEngine) -> None: validate_config_options(config.config) with nvtx.annotate(message="ConvertIR", domain="cudf_polars"): - translator = Translator(nt) + translator = Translator(nt, config) ir = translator.translate_ir() ir_translation_errors = translator.errors if len(ir_translation_errors): @@ -225,7 +224,6 @@ def execute_with_cudf(nt: NodeTraverser, *, config: GPUEngine) -> None: partial( _callback, ir, - config, device=device, memory_resource=memory_resource, ) diff --git a/python/cudf_polars/cudf_polars/dsl/ir.py b/python/cudf_polars/cudf_polars/dsl/ir.py index e44a0e0857a..62a2da9dcea 100644 --- a/python/cudf_polars/cudf_polars/dsl/ir.py +++ b/python/cudf_polars/cudf_polars/dsl/ir.py @@ -37,8 +37,6 @@ from collections.abc import Callable, Hashable, MutableMapping, Sequence from typing import Literal - from polars import GPUEngine - from cudf_polars.typing import Schema @@ -182,9 +180,7 @@ def get_hashable(self) -> Hashable: translation phase should fail earlier. """ - def evaluate( - self, *, cache: MutableMapping[int, DataFrame], config: GPUEngine - ) -> DataFrame: + def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: """ Evaluate the node (recursively) and return a dataframe. @@ -193,8 +189,6 @@ def evaluate( cache Mapping from cached node ids to constructed DataFrames. Used to implement evaluation of the `Cache` node. - config - GPU engine configuration. Notes ----- @@ -214,9 +208,8 @@ def evaluate( translation phase should fail earlier. """ return self.do_evaluate( - config, *self._non_child_args, - *(child.evaluate(cache=cache, config=config) for child in self.children), + *(child.evaluate(cache=cache) for child in self.children), ) @@ -263,6 +256,7 @@ class Scan(IR): "typ", "reader_options", "cloud_options", + "config_options", "paths", "with_columns", "skip_rows", @@ -275,6 +269,7 @@ class Scan(IR): "typ", "reader_options", "cloud_options", + "config_options", "paths", "with_columns", "skip_rows", @@ -288,6 +283,8 @@ class Scan(IR): """Reader-specific options, as dictionary.""" cloud_options: dict[str, Any] | None """Cloud-related authentication options, currently ignored.""" + config_options: dict[str, Any] + """GPU-specific configuration options""" paths: list[str] """List of paths to read from.""" with_columns: list[str] | None @@ -310,6 +307,7 @@ def __init__( typ: str, reader_options: dict[str, Any], cloud_options: dict[str, Any] | None, + config_options: dict[str, Any], paths: list[str], with_columns: list[str] | None, skip_rows: int, @@ -321,6 +319,7 @@ def __init__( self.typ = typ self.reader_options = reader_options self.cloud_options = cloud_options + self.config_options = config_options self.paths = paths self.with_columns = with_columns self.skip_rows = skip_rows @@ -331,6 +330,7 @@ def __init__( schema, typ, reader_options, + config_options, paths, with_columns, skip_rows, @@ -412,6 +412,7 @@ def get_hashable(self) -> Hashable: self.typ, json.dumps(self.reader_options), json.dumps(self.cloud_options), + json.dumps(self.config_options), tuple(self.paths), tuple(self.with_columns) if self.with_columns is not None else None, self.skip_rows, @@ -423,10 +424,10 @@ def get_hashable(self) -> Hashable: @classmethod def do_evaluate( cls, - config: GPUEngine, schema: Schema, typ: str, reader_options: dict[str, Any], + config_options: dict[str, Any], paths: list[str], with_columns: list[str] | None, skip_rows: int, @@ -509,7 +510,7 @@ def do_evaluate( colnames[0], ) elif typ == "parquet": - parquet_options = config.config.get("parquet_options", {}) + parquet_options = config_options.get("parquet_options", {}) if parquet_options.get("chunked", True): reader = plc.io.parquet.ChunkedParquetReader( plc.io.SourceInfo(paths), @@ -657,16 +658,14 @@ def __init__(self, schema: Schema, key: int, value: IR): @classmethod def do_evaluate( - cls, config: GPUEngine, key: int, df: DataFrame + cls, key: int, df: DataFrame ) -> DataFrame: # pragma: no cover; basic evaluation never calls this """Evaluate and return a dataframe.""" # Our value has already been computed for us, so let's just # return it. return df - def evaluate( - self, *, cache: MutableMapping[int, DataFrame], config: GPUEngine - ) -> DataFrame: + def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: """Evaluate and return a dataframe.""" # We must override the recursion scheme because we don't want # to recurse if we're in the cache. @@ -674,9 +673,7 @@ def evaluate( return cache[self.key] except KeyError: (value,) = self.children - return cache.setdefault( - self.key, value.evaluate(cache=cache, config=config) - ) + return cache.setdefault(self.key, value.evaluate(cache=cache)) class DataFrameScan(IR): @@ -722,7 +719,6 @@ def get_hashable(self) -> Hashable: @classmethod def do_evaluate( cls, - config: GPUEngine, schema: Schema, df: Any, projection: tuple[str, ...] | None, @@ -770,7 +766,6 @@ def __init__( @classmethod def do_evaluate( cls, - config: GPUEngine, exprs: tuple[expr.NamedExpr, ...], should_broadcast: bool, # noqa: FBT001 df: DataFrame, @@ -806,7 +801,6 @@ def __init__( @classmethod def do_evaluate( cls, - config: GPUEngine, exprs: tuple[expr.NamedExpr, ...], df: DataFrame, ) -> DataFrame: # pragma: no cover; not exposed by polars yet @@ -899,7 +893,6 @@ def check_agg(agg: expr.Expr) -> int: @classmethod def do_evaluate( cls, - config: GPUEngine, keys_in: Sequence[expr.NamedExpr], agg_requests: Sequence[expr.NamedExpr], maintain_order: bool, # noqa: FBT001 @@ -1021,7 +1014,6 @@ def __init__( @classmethod def do_evaluate( cls, - config: GPUEngine, predicate: plc.expressions.Expression, zlice: tuple[int, int] | None, suffix: str, @@ -1194,7 +1186,6 @@ def _reorder_maps( @classmethod def do_evaluate( cls, - config: GPUEngine, left_on_exprs: Sequence[expr.NamedExpr], right_on_exprs: Sequence[expr.NamedExpr], options: tuple[ @@ -1318,7 +1309,6 @@ def __init__( @classmethod def do_evaluate( cls, - config: GPUEngine, exprs: Sequence[expr.NamedExpr], should_broadcast: bool, # noqa: FBT001 df: DataFrame, @@ -1381,7 +1371,6 @@ def __init__( @classmethod def do_evaluate( cls, - config: GPUEngine, keep: plc.stream_compaction.DuplicateKeepOption, subset: frozenset[str] | None, zlice: tuple[int, int] | None, @@ -1471,7 +1460,6 @@ def __init__( @classmethod def do_evaluate( cls, - config: GPUEngine, by: Sequence[expr.NamedExpr], order: Sequence[plc.types.Order], null_order: Sequence[plc.types.NullOrder], @@ -1527,9 +1515,7 @@ def __init__(self, schema: Schema, offset: int, length: int, df: IR): self.children = (df,) @classmethod - def do_evaluate( - cls, config: GPUEngine, offset: int, length: int, df: DataFrame - ) -> DataFrame: + def do_evaluate(cls, offset: int, length: int, df: DataFrame) -> DataFrame: """Evaluate and return a dataframe.""" return df.slice((offset, length)) @@ -1549,9 +1535,7 @@ def __init__(self, schema: Schema, mask: expr.NamedExpr, df: IR): self.children = (df,) @classmethod - def do_evaluate( - cls, config: GPUEngine, mask_expr: expr.NamedExpr, df: DataFrame - ) -> DataFrame: + def do_evaluate(cls, mask_expr: expr.NamedExpr, df: DataFrame) -> DataFrame: """Evaluate and return a dataframe.""" (mask,) = broadcast(mask_expr.evaluate(df), target_length=df.num_rows) return df.filter(mask) @@ -1569,7 +1553,7 @@ def __init__(self, schema: Schema, df: IR): self.children = (df,) @classmethod - def do_evaluate(cls, config: GPUEngine, schema: Schema, df: DataFrame) -> DataFrame: + def do_evaluate(cls, schema: Schema, df: DataFrame) -> DataFrame: """Evaluate and return a dataframe.""" # This can reorder things. columns = broadcast( @@ -1645,9 +1629,7 @@ def __init__(self, schema: Schema, name: str, options: Any, df: IR): self._non_child_args = (name, self.options) @classmethod - def do_evaluate( - cls, config: GPUEngine, name: str, options: Any, df: DataFrame - ) -> DataFrame: + def do_evaluate(cls, name: str, options: Any, df: DataFrame) -> DataFrame: """Evaluate and return a dataframe.""" if name == "rechunk": # No-op in our data model @@ -1726,9 +1708,7 @@ def __init__(self, schema: Schema, zlice: tuple[int, int] | None, *children: IR) raise NotImplementedError("Schema mismatch") @classmethod - def do_evaluate( - cls, config: GPUEngine, zlice: tuple[int, int] | None, *dfs: DataFrame - ) -> DataFrame: + def do_evaluate(cls, zlice: tuple[int, int] | None, *dfs: DataFrame) -> DataFrame: """Evaluate and return a dataframe.""" # TODO: only evaluate what we need if we have a slice? return DataFrame.from_table( @@ -1777,7 +1757,7 @@ def _extend_with_nulls(table: plc.Table, *, nrows: int) -> plc.Table: ) @classmethod - def do_evaluate(cls, config: GPUEngine, *dfs: DataFrame) -> DataFrame: + def do_evaluate(cls, *dfs: DataFrame) -> DataFrame: """Evaluate and return a dataframe.""" max_rows = max(df.num_rows for df in dfs) # Horizontal concatenation extends shorter tables with nulls diff --git a/python/cudf_polars/cudf_polars/dsl/translate.py b/python/cudf_polars/cudf_polars/dsl/translate.py index e8ed009cdf2..12fc2a196cd 100644 --- a/python/cudf_polars/cudf_polars/dsl/translate.py +++ b/python/cudf_polars/cudf_polars/dsl/translate.py @@ -26,6 +26,8 @@ from cudf_polars.utils import dtypes, sorting if TYPE_CHECKING: + from polars import GPUEngine + from cudf_polars.typing import NodeTraverser __all__ = ["Translator", "translate_named_expr"] @@ -39,10 +41,13 @@ class Translator: ---------- visitor Polars NodeTraverser object + config + GPU engine configuration. """ - def __init__(self, visitor: NodeTraverser): + def __init__(self, visitor: NodeTraverser, config: GPUEngine): self.visitor = visitor + self.config = config self.errors: list[Exception] = [] def translate_ir(self, *, n: int | None = None) -> ir.IR: @@ -228,6 +233,7 @@ def _( typ, reader_options, cloud_options, + translator.config.config.copy(), node.paths, with_columns, skip_rows, diff --git a/python/cudf_polars/cudf_polars/testing/asserts.py b/python/cudf_polars/cudf_polars/testing/asserts.py index 1821cfedfb8..ba0bb12a0fb 100644 --- a/python/cudf_polars/cudf_polars/testing/asserts.py +++ b/python/cudf_polars/cudf_polars/testing/asserts.py @@ -122,7 +122,7 @@ def assert_ir_translation_raises(q: pl.LazyFrame, *exceptions: type[Exception]) AssertionError If the specified exceptions were not raised. """ - translator = Translator(q._ldf.visit()) + translator = Translator(q._ldf.visit(), GPUEngine()) translator.translate_ir() if errors := translator.errors: for err in errors: diff --git a/python/cudf_polars/docs/overview.md b/python/cudf_polars/docs/overview.md index 2f2361223d2..2231dd34e35 100644 --- a/python/cudf_polars/docs/overview.md +++ b/python/cudf_polars/docs/overview.md @@ -459,11 +459,12 @@ and convert back to polars: ```python from cudf_polars.dsl.translate import Translator +import polars as pl q = ... # Convert to our IR -ir = Translator(q._ldf.visit()).translate_ir() +ir = Translator(q._ldf.visit(), pl.GPUEngine()).translate_ir() # DataFrame living on the device result = ir.evaluate(cache={}) diff --git a/python/cudf_polars/tests/dsl/test_to_ast.py b/python/cudf_polars/tests/dsl/test_to_ast.py index 795ba991c62..60ff7a655e6 100644 --- a/python/cudf_polars/tests/dsl/test_to_ast.py +++ b/python/cudf_polars/tests/dsl/test_to_ast.py @@ -60,10 +60,10 @@ def df(): ) def test_compute_column(expr, df): q = df.select(expr) - ir = Translator(q._ldf.visit()).translate_ir() + ir = Translator(q._ldf.visit(), pl.GPUEngine()).translate_ir() assert isinstance(ir, ir_nodes.Select) - table = ir.children[0].evaluate(cache={}, config=pl.GPUEngine()) + table = ir.children[0].evaluate(cache={}) name_to_index = {c.name: i for i, c in enumerate(table.columns)} def compute_column(e): diff --git a/python/cudf_polars/tests/dsl/test_traversal.py b/python/cudf_polars/tests/dsl/test_traversal.py index 8849629e0fd..2f4df9289f8 100644 --- a/python/cudf_polars/tests/dsl/test_traversal.py +++ b/python/cudf_polars/tests/dsl/test_traversal.py @@ -109,7 +109,7 @@ def test_rewrite_ir_node(): df = pl.LazyFrame({"a": [1, 2, 1], "b": [1, 3, 4]}) q = df.group_by("a").agg(pl.col("b").sum()).sort("b") - orig = Translator(q._ldf.visit()).translate_ir() + orig = Translator(q._ldf.visit(), pl.GPUEngine()).translate_ir() new_df = pl.DataFrame({"a": [1, 1, 2], "b": [-1, -2, -4]}) @@ -124,7 +124,7 @@ def replace_df(node, rec): new = mapper(orig) - result = new.evaluate(cache={}, config=pl.GPUEngine()).to_polars() + result = new.evaluate(cache={}).to_polars() expect = pl.DataFrame({"a": [2, 1], "b": [-4, -3]}) @@ -150,10 +150,10 @@ def replace_scan(node, rec): mapper = CachingVisitor(replace_scan) - orig = Translator(q._ldf.visit()).translate_ir() + orig = Translator(q._ldf.visit(), pl.GPUEngine()).translate_ir() new = mapper(orig) - result = new.evaluate(cache={}, config=pl.GPUEngine()).to_polars() + result = new.evaluate(cache={}).to_polars() expect = q.collect() @@ -174,7 +174,7 @@ def test_rewrite_names_and_ops(): .collect() ) - qir = Translator(q._ldf.visit()).translate_ir() + qir = Translator(q._ldf.visit(), pl.GPUEngine()).translate_ir() @singledispatch def _transform(e: expr.Expr, fn: ExprTransformer) -> expr.Expr: @@ -224,6 +224,6 @@ def _(node: ir.Select, fn: IRTransformer): new_ir = rewriter(qir) - got = new_ir.evaluate(cache={}, config=pl.GPUEngine()).to_polars() + got = new_ir.evaluate(cache={}).to_polars() assert_frame_equal(expect, got) diff --git a/python/cudf_polars/tests/expressions/test_sort.py b/python/cudf_polars/tests/expressions/test_sort.py index 49e075e0338..dd080f41483 100644 --- a/python/cudf_polars/tests/expressions/test_sort.py +++ b/python/cudf_polars/tests/expressions/test_sort.py @@ -68,11 +68,7 @@ def test_setsorted(descending, nulls_last, with_nulls): assert_gpu_result_equal(q) - df = ( - Translator(q._ldf.visit()) - .translate_ir() - .evaluate(cache={}, config=pl.GPUEngine()) - ) + df = Translator(q._ldf.visit(), pl.GPUEngine()).translate_ir().evaluate(cache={}) a = df.column_map["a"]