Skip to content

Commit

Permalink
merge
Browse files Browse the repository at this point in the history
  • Loading branch information
EmanuelaBoros committed May 31, 2024
2 parents a8a8dc7 + 0f2c69f commit ba1c6f5
Show file tree
Hide file tree
Showing 52 changed files with 1,453 additions and 638 deletions.
8 changes: 4 additions & 4 deletions .github/workflows/testing.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ jobs:
python-version: "3.10"
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install .[quality]
python -m pip install uv
uv pip install --system .[quality]
- name: Check quality
run: |
ruff check tests src examples # linter
Expand All @@ -41,8 +41,8 @@ jobs:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install .[testing]
python -m pip install uv
uv pip install --system .[testing]
python -m nltk.downloader punkt
- name: Test with pytest
run: |
Expand Down
2 changes: 2 additions & 0 deletions CITATION.cff
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ type: software
authors:
- given-names: Guilherme
family-names: Penedo
- given-names: Hynek
family-names: Kydlíček
- given-names: Alessandro
family-names: Cappelli
- given-names: Thomas
Expand Down
11 changes: 9 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ Some options common to all executors:
- `pipeline` a list consisting of the pipeline steps that should be run
- `logging_dir` a datafolder where log files, statistics and more should be saved. Do not reuse folders for different pipelines/jobs as this will overwrite your stats, logs and completions.
- `skip_completed` (_bool_, `True` by default) datatrove keeps track of completed tasks so that when you relaunch a job they can be skipped. Set this to `False` to disable this behaviour
- `randomize_start_duration` (_int_, `0` by default) the maximum number of seconds to delay the start of each task to prevent all tasks from starting simultaneously and potentially overloading the system.

Call an executor's `run` method to execute its pipeline.

Expand Down Expand Up @@ -223,6 +224,12 @@ For a pipeline with `logging_dir` **mylogspath/exp1**, the following folder stru
```
</details>

### Colorization
Log messages support colorization. By default, colorization will be auto detected for console messages and disabled for log files (logs/task_XXXXX.log).
To explicitly enable or disable colorization, you may set the following environment variables:
- `DATATROVE_COLORIZE_LOGS` "1" to add ANSI colors to console log messages and "0" to disable colorization.
- `DATATROVE_COLORIZE_LOG_FILES` set to "1" to add ANSI colors to log messages saved to logs/task_XXXXX.log.

## DataFolder / paths
Datatrove supports a wide variety of input/output sources through [fsspec](https://filesystem-spec.readthedocs.io/en/latest/).

Expand Down Expand Up @@ -405,11 +412,11 @@ pytest -sv ./tests/

```bibtex
@misc{penedo2024datatrove,
author = {Penedo, Guilherme and Cappelli, Alessandro and Wolf, Thomas and Sasko, Mario},
author = {Penedo, Guilherme and Kydlíček, Hynek and Cappelli, Alessandro and Wolf, Thomas and Sasko, Mario},
title = {DataTrove: large scale data processing},
year = {2024},
publisher = {GitHub},
journal = {GitHub repository},
url = {https://github.com/huggingface/datatrove}
}
```
```
2 changes: 1 addition & 1 deletion examples/fineweb.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
"""
we first ran the following pipeline for each dump
"""
DUMP_TO_PROCESS = "CC-MAIN-2O23-5O" # example
DUMP_TO_PROCESS = "CC-MAIN-2023-50" # example

MAIN_OUTPUT_PATH = "s3://some_s3_bucket"
FILTERING_OUTPUT_PATH = f"{MAIN_OUTPUT_PATH}/base_processing"
Expand Down
13 changes: 12 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,18 +56,29 @@ processing = [
"tokenizers",
"ftfy",
"fasteners",
"xxhash"
"xxhash",
"pyahocorasick"
]
decont = [
"lighteval>=0.3.0"
]
multilingual = [
"spacy",
"stanza",
"pyvi",
"pythainlp",
"jieba",
"indic-nlp-library",
"kiwipiepy",
]
quality = [
"ruff>=0.1.5"
]
testing = [
"datatrove[cli]",
"datatrove[io]",
"datatrove[processing]",
"datatrove[multilingual]",
"datatrove[s3]",
"datatrove[decont]",
"pytest",
Expand Down
19 changes: 16 additions & 3 deletions src/datatrove/executor/base.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,22 @@
import dataclasses
import json
import random
import time
from abc import ABC, abstractmethod
from collections import deque
from collections.abc import Sequence
from typing import Callable

from loguru import logger

from datatrove.io import DataFolderLike, get_datafolder
from datatrove.pipeline.base import PipelineStep
from datatrove.utils.logging import add_task_logger, close_task_logger, get_random_str, get_timestamp, log_pipeline
from datatrove.utils.logging import (
add_task_logger,
close_task_logger,
get_random_str,
get_timestamp,
log_pipeline,
logger,
)
from datatrove.utils.stats import PipelineStats


Expand All @@ -22,6 +29,7 @@ class PipelineExecutor(ABC):
logging_dir: where to save logs, stats, etc. Should be parsable into a datatrove.io.DataFolder
skip_completed: whether to skip tasks that were completed in
previous runs. default: True
randomize_start_duration: the maximum number of seconds to delay the start of each task.
"""

@abstractmethod
Expand All @@ -30,10 +38,12 @@ def __init__(
pipeline: list[PipelineStep | Callable],
logging_dir: DataFolderLike = None,
skip_completed: bool = True,
randomize_start_duration: int = 0,
):
self.pipeline: list[PipelineStep | Callable] = pipeline
self.logging_dir = get_datafolder(logging_dir if logging_dir else f"logs/{get_timestamp()}_{get_random_str()}")
self.skip_completed = skip_completed
self.randomize_start_duration = randomize_start_duration

@abstractmethod
def run(self):
Expand Down Expand Up @@ -69,6 +79,9 @@ def _run_for_rank(self, rank: int, local_rank: int = 0) -> PipelineStats:
return PipelineStats()
logfile = add_task_logger(self.logging_dir, rank, local_rank)
log_pipeline(self.pipeline)

if self.randomize_start_duration > 0:
time.sleep(random.randint(0, self.randomize_start_duration))
try:
# pipe data from one step to the next
pipelined_data = None
Expand Down
6 changes: 4 additions & 2 deletions src/datatrove/executor/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@
from typing import Callable

import multiprocess
from loguru import logger

from datatrove.executor.base import PipelineExecutor
from datatrove.io import DataFolderLike
from datatrove.pipeline.base import PipelineStep
from datatrove.utils.logging import logger
from datatrove.utils.stats import PipelineStats


Expand All @@ -30,6 +30,7 @@ class LocalPipelineExecutor(PipelineExecutor):
Tasks [local_rank_offset, local_rank_offset + local_tasks] will be run.
depends: another LocalPipelineExecutor that should run
before this one
randomize_start_duration: the maximum number of seconds to delay the start of each task.
"""

def __init__(
Expand All @@ -43,8 +44,9 @@ def __init__(
start_method: str = "forkserver",
local_tasks: int = -1,
local_rank_offset: int = 0,
randomize_start_duration: int = 0,
):
super().__init__(pipeline, logging_dir, skip_completed)
super().__init__(pipeline, logging_dir, skip_completed, randomize_start_duration)
self.tasks = tasks
self.workers = workers if workers != -1 else tasks
self.start_method = start_method
Expand Down
14 changes: 5 additions & 9 deletions src/datatrove/executor/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import json
import math
import os
import random
import signal
import subprocess
import sys
Expand All @@ -15,12 +14,11 @@

import dill
from dill import CONTENTS_FMODE
from loguru import logger

from datatrove.executor.base import PipelineExecutor
from datatrove.io import DataFolderLike
from datatrove.pipeline.base import PipelineStep
from datatrove.utils.logging import get_random_str, get_timestamp
from datatrove.utils.logging import get_random_str, get_timestamp, logger


def requeue_handler(signum, _frame):
Expand Down Expand Up @@ -74,7 +72,7 @@ class SlurmPipelineExecutor(PipelineExecutor):
stagger_max_array_jobs: when max_array_launch_parallel is True, this determines how many seconds to wait
between launching each of the parallel jobs
run_on_dependency_fail: start executing when a job we depend on finishes even if it has failed
randomize_start: randomize the start of each task in a job in a ~3 min window
randomize_start_duration: the maximum number of seconds to delay the start of each task.
requeue_signals: requeue the job and exit when one of these signals is received. Useful for when an instance
is being reclaimed and jobs must be stopped for example. Set to None to disable
mail_type: see https://slurm.schedmd.com/sbatch.html. Common values are (NONE, BEGIN, END, FAIL, REQUEUE, ALL)
Expand Down Expand Up @@ -107,15 +105,15 @@ def __init__(
max_array_launch_parallel: bool = False,
stagger_max_array_jobs: int = 0,
run_on_dependency_fail: bool = False,
randomize_start: bool = False,
randomize_start_duration: int = 0,
requeue_signals: tuple[str] | None = ("SIGUSR1",),
mail_type: str = "ALL",
mail_user: str = None,
requeue: bool = True,
srun_args: dict = None,
tasks_per_job: int = 1,
):
super().__init__(pipeline, logging_dir, skip_completed)
super().__init__(pipeline, logging_dir, skip_completed, randomize_start_duration)
self.tasks = tasks
self.workers = workers
self.partition = partition
Expand All @@ -135,7 +133,7 @@ def __init__(
self.max_array_launch_parallel = max_array_launch_parallel
self.stagger_max_array_jobs = stagger_max_array_jobs
self.run_on_dependency_fail = run_on_dependency_fail
self.randomize_start = randomize_start
self.randomize_start_duration = randomize_start_duration
self.job_id = None
self.requeue_signals = requeue_signals
self.mail_type = mail_type
Expand Down Expand Up @@ -179,8 +177,6 @@ def run(self):
break
rank = all_ranks[rank_to_run]

if self.randomize_start:
time.sleep(random.randint(0, 60 * 3))
self._run_for_rank(rank)
else:
# we still have to launch the job
Expand Down
3 changes: 2 additions & 1 deletion src/datatrove/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
from fsspec.implementations.dirfs import DirFileSystem
from fsspec.implementations.local import LocalFileSystem
from huggingface_hub import HfFileSystem, cached_assets_path
from loguru import logger

from datatrove.utils.logging import logger


class OutputFileManager:
Expand Down
35 changes: 2 additions & 33 deletions src/datatrove/pipeline/base.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
from abc import ABC, abstractmethod
from itertools import chain
from typing import NoReturn

from datatrove.data import Document, DocumentsPipeline
from datatrove.utils._import_utils import _is_package_available
from datatrove.utils._import_utils import check_required_dependencies
from datatrove.utils.stats import Stats


Expand All @@ -29,14 +28,7 @@ def __new__(cls, *args, **kwargs):
"""
required_dependencies = chain.from_iterable(getattr(t, "_requires_dependencies", []) for t in cls.mro())
if required_dependencies:
missing_dependencies: dict[str, str] = {}
for dependency in required_dependencies:
dependency = dependency if isinstance(dependency, tuple) else (dependency, dependency)
package_name, pip_name = dependency
if not _is_package_available(package_name):
missing_dependencies[package_name] = pip_name
if missing_dependencies:
_raise_error_for_missing_dependencies(cls.__name__, missing_dependencies)
check_required_dependencies(cls.__name__, required_dependencies)
return super().__new__(cls)

def __init__(self):
Expand Down Expand Up @@ -125,26 +117,3 @@ def __call__(self, data: DocumentsPipeline = None, rank: int = 0, world_size: in
"""
return self.run(data, rank, world_size)


def _raise_error_for_missing_dependencies(step_name: str, dependencies: dict[str, str]) -> NoReturn:
"""Helper to raise an ImportError for missing dependencies and prompt the user to install said dependencies
Args:
step_name: str
The name of the step
dependencies: dict[str, str]
The missing dependencies
"""
dependencies = dict(sorted(dependencies.items()))
package_names = list(dependencies)
if len(dependencies) > 1:
package_names = (
f"{','.join('`' + package_name + '`' for package_name in package_names[:-1])} and `{package_names[-1]}`"
)
else:
package_names = f"`{package_names[0]}`"
raise ImportError(
f"Please install {package_names} to use {step_name} (`pip install {' '.join(list(dependencies.values()))}`)."
)
Loading

0 comments on commit ba1c6f5

Please sign in to comment.