Skip to content

Commit

Permalink
Update pre-commit config with latest versions (NVIDIA-Merlin#208)
Browse files Browse the repository at this point in the history
  • Loading branch information
oliverholworthy authored Feb 4, 2023
1 parent 55f6e59 commit f108a9a
Show file tree
Hide file tree
Showing 11 changed files with 7 additions and 46 deletions.
6 changes: 3 additions & 3 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ repos:
hooks:
- id: absolufy-imports
- repo: https://github.com/timothycrosley/isort
rev: 5.11.2
rev: 5.12.0
hooks:
- id: isort
additional_dependencies: [toml]
Expand All @@ -26,11 +26,11 @@ repos:
exclude: ^docs/
# code style
- repo: https://github.com/python/black
rev: 22.12.0
rev: 23.1.0
hooks:
- id: black
- repo: https://github.com/pycqa/pylint
rev: v2.15.8
rev: v2.16.0
hooks:
- id: pylint
- repo: https://github.com/pycqa/flake8
Expand Down
1 change: 0 additions & 1 deletion merlin/dtypes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ def dtype(external_dtype):
try:
return _dtype_registry.to_merlin(external_dtype)
except TypeError as base_exc:

try:
return _dtype_registry.to_merlin_via_numpy(external_dtype)
except TypeError as exc:
Expand Down
4 changes: 0 additions & 4 deletions merlin/io/avro.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ def __init__(self, paths, part_size, storage_options=None, cpu=False, **kwargs):
raise ValueError("cpu=True not supported for AvroDatasetEngine.")

def to_ddf(self, columns=None, cpu=None):

# Check if we are using cpu
cpu = self.cpu if cpu is None else cpu
if cpu:
Expand Down Expand Up @@ -80,7 +79,6 @@ def to_gpu(self):
self.cpu = False

def process_metadata(self, columns=None):

with open(self.paths[0], "rb") as fo:
header = ua.core.read_header(fo)

Expand Down Expand Up @@ -149,10 +147,8 @@ def process_metadata(self, columns=None):

@classmethod
def read_partition(cls, fs, piece, columns):

path = piece["path"]
if "rows" in piece:

# See: (https://github.com/rapidsai/cudf/issues/6529)
# Using `uavro` library for now. This means we must convert
# data to pandas, and then to cudf (which is much slower
Expand Down
1 change: 0 additions & 1 deletion merlin/io/csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ def __init__(self, paths, part_size, storage_options=None, cpu=False, **kwargs):
self.paths = self.fs.glob(self.fs.sep.join([self.paths[0], "*"]))

def to_ddf(self, columns=None, cpu=None):

# Check if we are using cpu
cpu = self.cpu if cpu is None else cpu
if cpu:
Expand Down
6 changes: 1 addition & 5 deletions merlin/io/dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,6 @@ def _write_subgraph(
cpu,
suffix,
):

fns = fns if isinstance(fns, (tuple, list)) else (fns,)
writer = writer_factory(
output_format,
Expand All @@ -204,7 +203,6 @@ def _write_subgraph(


def _write_metadata_files(md_list, output_path, output_format, cpu, schema):

# Separate and merge metadata
general_md = []
special_md = []
Expand All @@ -223,7 +221,6 @@ def _write_metadata_files(md_list, output_path, output_format, cpu, schema):


def _simple_shuffle(ddf, plan):

# Construct graph for a simple shuffle
token = tokenize(ddf, plan)
name = "shuffled-" + token
Expand Down Expand Up @@ -256,7 +253,6 @@ def _ddf_to_dataset(
partition_on=None,
schema=None,
):

# Construct graph for Dask-based dataset write
token = tokenize(
ddf, shuffle, out_files_per_proc, cat_names, cont_names, label_names, suffix, partition_on
Expand Down Expand Up @@ -377,7 +373,7 @@ def _finish_dataset(client, ddf, output_path, fs, output_format, cpu, schema):

general_md = []
special_md = []
for (gen, spec) in out.values():
for gen, spec in out.values():
general_md.append(gen)
if spec:
special_md.append(spec)
Expand Down
4 changes: 0 additions & 4 deletions merlin/io/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,6 @@ def shuffle_by_keys(self, keys, hive_data=None, npartitions=None):
hive_mapping[_key].append(_val)

if set(hive_mapping.keys()) == set(keys):

# Generate hive-mapping DataFrame summary
hive_mapping = type(ddf._meta)(hive_mapping)
cols = list(hive_mapping.columns)
Expand Down Expand Up @@ -752,7 +751,6 @@ def to_parquet(
"""

if partition_on:

# Check that the user is not expecting a specific output-file
# count/structure that is not supported
if output_files:
Expand All @@ -763,7 +761,6 @@ def to_parquet(
raise ValueError("`preserve_files` not supported when `partition_on` is used.")

else:

# Check that method (algorithm) is valid
if method not in ("subgraph", "worker"):
raise ValueError(f"{method} not a recognized method for `Dataset.to_parquet`")
Expand Down Expand Up @@ -800,7 +797,6 @@ def to_parquet(
# Deal with `method=="subgraph"`.
# Convert `output_files` argument to a dict mapping
if output_files:

# NOTES on `output_files`:
#
# - If a list of file names is specified, a contiguous range of
Expand Down
8 changes: 2 additions & 6 deletions merlin/io/fsspec_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@ def _optimized_read_partition_remote(


def _optimized_read_remote(path, row_groups, columns, fs, **kwargs):

if row_groups is not None and not isinstance(row_groups, list):
row_groups = [row_groups]

Expand Down Expand Up @@ -254,7 +253,6 @@ def _fsspec_data_transfer(
mode="rb",
**kwargs,
):

# Calculate total file size
file_size = file_size or fs.size(path_or_fob)

Expand All @@ -265,7 +263,6 @@ def _fsspec_data_transfer(
# Threaded read into "dummy" buffer
buf = np.zeros(file_size, dtype="b")
if byte_ranges:

# Optimize/merge the ranges
byte_ranges = _merge_ranges(
byte_ranges,
Expand Down Expand Up @@ -320,7 +317,7 @@ def _merge_ranges(byte_ranges, max_block=256_000_000, max_gap=64_000):
return new_ranges

offset, size = byte_ranges[0]
for (new_offset, new_size) in byte_ranges[1:]:
for new_offset, new_size in byte_ranges[1:]:
gap = new_offset - (offset + size)
if gap > max_gap or (size + new_size + gap) > max_block:
# Gap is too large or total read is too large
Expand Down Expand Up @@ -349,9 +346,8 @@ def _read_byte_ranges(
fs,
**kwargs,
):

workers = []
for (offset, nbytes) in ranges:
for offset, nbytes in ranges:
if len(ranges) > 1:
workers.append(
Thread(target=_assign_block, args=(fs, path_or_fob, local_buffer, offset, nbytes))
Expand Down
8 changes: 0 additions & 8 deletions merlin/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ def read_metadata(*args, **kwargs):
if (cudf_version.major == 21 and cudf_version.minor == 10) or (
cudf_version.major == 0 and cudf_version.minor == 0
):

# We only need this work-around for cudf-21.10
return _override_read_metadata(_cudf_read_metadata, *args, **kwargs)
return _override_read_metadata(CudfEngine.read_metadata, *args, **kwargs)
Expand Down Expand Up @@ -287,7 +286,6 @@ def _override_read_metadata(

# Apply file aggregation
if aggregate_row_groups is not None:

# Convert `aggregate_files` to an integer `aggregation_depth`
aggregation_depth = False
if len(parts) and aggregate_files:
Expand Down Expand Up @@ -448,7 +446,6 @@ def _process_parquet_metadata(self):
self._pp_map = _pp_map

def to_ddf(self, columns=None, cpu=None):

# Check if we are using cpu or gpu backend
cpu = self.cpu if cpu is None else cpu
backend_engine = CPUParquetEngine if cpu else GPUParquetEngine
Expand Down Expand Up @@ -825,7 +822,6 @@ def regenerate_dataset(
out_parts = 0
remaining_out_part_rows = rows_per_part
for i, in_part_size in enumerate(size_list):

# The `split` dictionary will be passed to this input
# partition to dictate how that partition will be split
# into different output partitions/files. The "key" of
Expand All @@ -834,7 +830,6 @@ def regenerate_dataset(
split = {}
last = 0
while in_part_size >= remaining_out_part_rows:

gets[out_parts].append(i)
split[out_parts] = (last, last + remaining_out_part_rows)
last += remaining_out_part_rows
Expand Down Expand Up @@ -911,7 +906,6 @@ def regenerate_dataset(


def _write_metadata_file(md_list, fs, output_path, gmd_base):

# Prepare both "general" and parquet metadata
gmd = gmd_base.copy()
pmd = {}
Expand Down Expand Up @@ -939,7 +933,6 @@ def _write_metadata_file(md_list, fs, output_path, gmd_base):


def _write_data(data_list, output_path, fs, fn):

# Initialize chunked writer
path = fs.sep.join([output_path, fn])
writer = pwriter_cudf(path, compression=None)
Expand Down Expand Up @@ -1125,7 +1118,6 @@ def _to_parquet(self, df, sink):
return md

def _append_writer(self, path, schema=None):

# Define "metadata collector" for pyarrow
_md_collector = []
_args = [schema]
Expand Down
1 change: 0 additions & 1 deletion merlin/io/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,6 @@ def _write_thread(self):

@annotate("add_data", color="orange", domain="merlin_python")
def add_data(self, df):

# Early return
if isinstance(df, list) and not df:
return
Expand Down
11 changes: 1 addition & 10 deletions tests/unit/io/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def test_validate_dataset_bad_schema(tmpdir):
pytest.skip("Test requires newer version of Dask.")

path = str(tmpdir)
for (fn, df) in [
for fn, df in [
("part.0.parquet", pd.DataFrame({"a": range(10), "b": range(10)})),
("part.1.parquet", pd.DataFrame({"a": [None] * 10, "b": range(10)})),
]:
Expand Down Expand Up @@ -250,7 +250,6 @@ def test_dask_dataset(datasets, engine, num_files, cpu):
@pytest.mark.parametrize("origin", ["cudf", "dask_cudf", "pd", "dd"])
@pytest.mark.parametrize("cpu", [None, True])
def test_dask_dataset_from_dataframe(tmpdir, origin, cpu):

# Generate a DataFrame-based input
if origin in ("pd", "dd"):
df = pd.DataFrame({"a": range(100)})
Expand Down Expand Up @@ -456,7 +455,6 @@ def test_validate_dataset(datasets, engine):


def test_validate_and_regenerate_dataset(tmpdir):

# Initial timeseries dataset (in cpu memory)
ddf = dask.datasets.timeseries(
start="2000-01-01",
Expand Down Expand Up @@ -502,7 +500,6 @@ def test_validate_and_regenerate_dataset(tmpdir):
@pytest.mark.parametrize("preserve_files", [True, False])
@pytest.mark.parametrize("cpu", [True, False])
def test_dataset_conversion(tmpdir, cpu, preserve_files):

# Generate toy dataset.
# Include "hex" strings to mimic Criteo.
size = 100
Expand Down Expand Up @@ -561,7 +558,6 @@ def test_dataset_conversion(tmpdir, cpu, preserve_files):
@pytest.mark.parametrize("use_file_metadata", [True, None])
@pytest.mark.parametrize("shuffle", [True, False])
def test_parquet_iterator_len(tmpdir, shuffle, use_file_metadata):

ddf1 = dask.datasets.timeseries(
start="2000-01-01",
end="2000-01-6",
Expand Down Expand Up @@ -594,7 +590,6 @@ def test_parquet_iterator_len(tmpdir, shuffle, use_file_metadata):

@pytest.mark.parametrize("cpu", [True, False])
def test_hive_partitioned_data(tmpdir, cpu):

# Initial timeseries dataset (in cpu memory).
# Round the full "timestamp" to the hour for partitioning.
ddf = dask.datasets.timeseries(
Expand Down Expand Up @@ -658,7 +653,6 @@ def test_hive_partitioned_data(tmpdir, cpu):
@pytest.mark.parametrize("keys", [["name"], ["id"], ["name", "id"]])
@pytest.mark.parametrize("npartitions", [None, 2])
def test_dataset_shuffle_on_keys(tmpdir, cpu, partition_on, keys, npartitions):

# Initial timeseries dataset
size = 60
df1 = pd.DataFrame(
Expand Down Expand Up @@ -706,7 +700,6 @@ def test_dataset_shuffle_on_keys(tmpdir, cpu, partition_on, keys, npartitions):

@pytest.mark.parametrize("cpu", [True, False])
def test_parquet_filtered_flat(tmpdir, cpu):

# Initial timeseries dataset (in cpu memory).
# Round the full "timestamp" to the hour for partitioning.
path = str(tmpdir)
Expand All @@ -726,7 +719,6 @@ def test_parquet_filtered_flat(tmpdir, cpu):

@pytest.mark.parametrize("cpu", [True, False])
def test_parquet_filtered_hive(tmpdir, cpu):

# Initial timeseries dataset (in cpu memory).
# Round the full "timestamp" to the hour for partitioning.
path = str(tmpdir)
Expand All @@ -753,7 +745,6 @@ def test_parquet_filtered_hive(tmpdir, cpu):
)
@pytest.mark.parametrize("cpu", [True, False])
def test_parquet_aggregate_files(tmpdir, cpu):

# Initial timeseries dataset (in cpu memory).
# Round the full "timestamp" to the hour for partitioning.
path = str(tmpdir)
Expand Down
3 changes: 0 additions & 3 deletions tests/unit/utils/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@

@pytest.mark.parametrize("cpu", _CPU)
def test_serial_context(client, cpu):

# Set distributed client
set_dask_client(client=client)
assert global_dask_client() == client
Expand All @@ -48,7 +47,6 @@ def test_serial_context(client, cpu):
@pytest.mark.parametrize("cpu", [True, False])
@pytest.mark.parametrize("nested_serial", _CPU)
def test_nvt_distributed(cpu, nested_serial):

if cpu:
distributed = pytest.importorskip("distributed")
cluster_type = "cpu"
Expand Down Expand Up @@ -84,7 +82,6 @@ def test_nvt_distributed(cpu, nested_serial):

@pytest.mark.parametrize("cpu", _CPU)
def test_nvt_distributed_force(client, cpu):

if cpu:
distributed = pytest.importorskip("distributed")
cluster_type = "cpu"
Expand Down

0 comments on commit f108a9a

Please sign in to comment.