Skip to content

Commit

Permalink
'File partition' option and 'document' directory specification (#213)
Browse files Browse the repository at this point in the history

---------

Co-authored-by: Tyler Murray <[email protected]>
  • Loading branch information
Whattabatt and undfined authored Oct 3, 2024
1 parent c029e94 commit 4615d34
Show file tree
Hide file tree
Showing 11 changed files with 157 additions and 18 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/CI.yml
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ jobs:
if: steps.cache-venv.outputs.cache-hit != 'true'
uses: actions/setup-python@v4
with:
python-version: "3.8"
python-version: "3.9"
architecture: "x64"

- name: Create a new Python environment & install maturin
Expand Down
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "dolma"
version = "1.0.9"
version = "1.1.0"
edition = "2021"
license = "Apache-2.0"

Expand Down
10 changes: 5 additions & 5 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
[project]
name = "dolma"
version = "1.0.14.post1"
version = "1.1.0"
description = "Data filters"
license = { text = "Apache-2.0" }
readme = "README.md"
requires-python = ">=3.8"
requires-python = ">=3.9"
dependencies = [
"anyascii>=0.3.2",
"blingfire==0.1.8",
Expand Down Expand Up @@ -196,12 +196,12 @@ exclude = '''
| tests/work
)
'''
target-version = ["py38", "py39", "py310", "py311", "py312"]
target-version = ["py39", "py310", "py311", "py312"]


[tool.isort]
profile = "black"
py_version = 38
py_version = 39
known_first_party = ["dolma"]
known_local_folder = ["tests", "python"]
extend_skip_glob = [
Expand All @@ -222,7 +222,7 @@ recursive = true
aggressive = 3

[tool.mypy]
python_version = "3.8"
python_version = "3.9"
ignore_missing_imports = true
no_site_packages = true
allow_redefinition = false
Expand Down
24 changes: 22 additions & 2 deletions python/dolma/cli/deduper.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import fnmatch
import os
from contextlib import ExitStack
from dataclasses import dataclass
from pathlib import Path
Expand Down Expand Up @@ -99,6 +101,13 @@ class DedupeConfig:
partition_index: Optional[int] = field(
default=0, help="The index of the partition being processed, in the range [0, num_partitions)."
)
file_partition: Optional[bool] = field(
default=False, help="Whether or not to partition at the document level (vs at the span level)"
)
document_dir: Optional[str] = field(
default="documents",
help="The folder in source paths to replace with 'attributes' to store results, if not 'documents'",
)


@dataclass
Expand Down Expand Up @@ -135,7 +144,6 @@ def run(cls, parsed_config: DeduperConfig):
logger = get_logger("tagger")

dict_config: Dict[str, Any] = {}

with ExitStack() as stack:
work_dirs = stack.enter_context(make_workdirs(parsed_config.work_dir))

Expand All @@ -146,6 +154,8 @@ def run(cls, parsed_config: DeduperConfig):
"min_words": parsed_config.dedupe.min_words,
"num_partitions": parsed_config.dedupe.num_partitions,
"partition_index": parsed_config.dedupe.partition_index,
"file_partition": parsed_config.dedupe.file_partition,
"document_dir": parsed_config.dedupe.document_dir,
}
try_name = parsed_config.dedupe.name if not om.is_missing(parsed_config.dedupe, "name") else None

Expand Down Expand Up @@ -182,7 +192,17 @@ def run(cls, parsed_config: DeduperConfig):
# perform some path validation to make sure we don't call the mixer with invalid config
total_matching_documents = 0
for document in parsed_config.documents:
dict_config.setdefault("documents", []).append(str(document))

if not any(
fnmatch.fnmatch(dict_config["dedupe"]["document_dir"], part) for part in document.split(os.sep)
):
raise DolmaConfigError(
f"Path ({document}) does not contain expected document directory: '/{dict_config['dedupe']['document_dir']}/'. "
)

doc = str(document)

dict_config.setdefault("documents", []).append(doc)

current_matching_documents = sum(1 for _ in glob_path(document))
if current_matching_documents == 0:
Expand Down
7 changes: 4 additions & 3 deletions python/dolma/warc/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,10 @@ def process_single(
extension = extension.replace(".gz", "").replace(".warc", "") + ".jsonl.gz"
destination_path = join_path(prot, *base_dst[:-1], base_dst[-1] + extension)

with smart_open.open(source_path, "rb") as warc_file, smart_open.open(
destination_path, "wb"
) as output_file:
with (
smart_open.open(source_path, "rb") as warc_file,
smart_open.open(destination_path, "wb") as output_file,
):
it = ArchiveIterator(warc_file, record_types=WarcRecordType.response | WarcRecordType.warcinfo)
for record in it:
if record.record_type == WarcRecordType.warcinfo:
Expand Down
34 changes: 32 additions & 2 deletions src/deduper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@ use crate::s3_util;
use crate::shard::shard_config::{CompressionConfig, WorkDirConfig};
use crate::shard::{find_objects_matching_patterns, FileCache};
use crate::wimbd::tokens::tokenize;

use ahash::RandomState;
use deduper_config::*;
use std::hash::{BuildHasher, Hash, Hasher};

pub fn run(config: DeduperConfig) -> Result<u32, u32> {
let bloom_filter = BloomFilter::initialize(&config.bloom_filter).unwrap();
Expand All @@ -33,7 +34,20 @@ pub fn run(config: DeduperConfig) -> Result<u32, u32> {
let threadpool = ThreadPool::new(config.processes);
let failed_shard_count = AtomicU32::new(0);
let failed_shard_count_ref = Arc::new(failed_shard_count);
let hash_builder = RandomState::with_seeds(0, 1, 2, 3);

for p in paths {
let mut hasher = hash_builder.build_hasher();
p.hash(&mut hasher);
let hashed_path = hasher.finish();

if config.dedupe.file_partition.unwrap_or(false)
&& hashed_path % config.dedupe.num_partitions.unwrap_or(1)
!= config.dedupe.partition_index.unwrap_or(0)
{
continue;
}

let path = p.clone();
let work_dirs = config.work_dir.clone();
let dedupe = config.dedupe.clone();
Expand Down Expand Up @@ -121,10 +135,24 @@ fn write_attributes(
);
}

let document_key = dedupe_config
.document_dir
.unwrap_or(String::from("documents"));

let attrs_location = {
let attr_prefix = format!("/attributes/{}/", attr_key);
docs_location.replace("/documents/", &attr_prefix)
docs_location.replace(&format!("/{}/", &document_key), &attr_prefix)
};

if attrs_location == docs_location {
log::error!(
"{} does not contain {} . Not writing its attributes!",
docs_location,
&document_key
);
panic!("Attribute would be written to document location!");
}

let local_output = cache.prepare_output(&attrs_location, label_temp)?;
let mut num_processed = 0;
let mut num_observed = 0;
Expand Down Expand Up @@ -546,6 +574,8 @@ pub mod deduper_config {
pub skip_empty: Option<bool>,
pub num_partitions: Option<u64>,
pub partition_index: Option<u64>,
pub file_partition: Option<bool>,
pub document_dir: Option<String>,
}

#[derive(Serialize, Deserialize, Clone)]
Expand Down
28 changes: 28 additions & 0 deletions tests/config/filepath-bad.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
{
"documents": [
"tests/data/provided/deduper/pathnotd0cumentz/000.json.gz"
],
"work_dir": {
"input": "tests/work/temp/dedupe-para/input",
"output": "tests/work/temp/dedupe-para/output"
},
"dedupe": {
"name": "dedupe_paragraph_ngrams",
"paragraphs": {
"attribute_name": "bff_duplicate_paragraph_spans",
"by_ngram": {
"ngram_length": 6,
"stride": 3,
"overlap_threshold": 0.5
}
}
},
"bloom_filter": {
"file": "tests/work/para_bloom_filter.bin",
"size_in_bytes": 0,
"read_only": false,
"estimated_doc_count": 1000,
"desired_false_positive_rate": 0.001
},
"processes": 1
}
29 changes: 29 additions & 0 deletions tests/config/filepath-good.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
{
"documents": [
"tests/data/provided/deduper/pathnotd0cumentz/000.json.gz"
],
"work_dir": {
"input": "tests/work/temp/dedupe-para/input",
"output": "tests/work/temp/dedupe-para/output"
},
"dedupe": {
"name": "dedupe_paragraph_ngrams",
"document_dir": "pathnotd0cumentz",
"paragraphs": {
"attribute_name": "bff_duplicate_paragraph_spans",
"by_ngram": {
"ngram_length": 6,
"stride": 3,
"overlap_threshold": 0.5
}
}
},
"bloom_filter": {
"file": "tests/work/para_bloom_filter.bin",
"size_in_bytes": 0,
"read_only": false,
"estimated_doc_count": 1000,
"desired_false_positive_rate": 0.001
},
"processes": 1
}
Binary file not shown.
37 changes: 34 additions & 3 deletions tests/python/test_deduper.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from typing_extensions import TypedDict

from dolma.cli.__main__ import main
from dolma.core.errors import DolmaConfigError
from dolma.core.utils import split_words

from .utils import (
Expand All @@ -24,6 +25,9 @@

TEST_DIR = Path(__file__).parent.parent
DEDUPE_BY_URL = TEST_DIR / "config/dedupe-by-url.json"
DEDUPE_BAD_FILENAME = TEST_DIR / "config/filepath-bad.json"
DEDUPE_GOOD_FILENAME = TEST_DIR / "config/filepath-good.json"

DEDUPE_PARAGRAPHS = TEST_DIR / "config/dedupe-paragraphs.json"
DEDUPE_PARAGRAPH_NGRAMS = TEST_DIR / "config/dedupe-paragraph-ngrams.json"

Expand All @@ -48,13 +52,13 @@ def setUp(self) -> None:

# upload test data
upload_s3_prefix(
s3_prefix=f"{self.remote_test_prefix}", local_prefix="tests/data/provided/deduper/documents/*.gz"
s3_prefix=f"{self.remote_test_prefix}", local_prefix="tests/data/provided/deduper/*/*.gz"
)

# copy provided config files to local temp dir
shutil.copytree(
"tests/data/provided/deduper/documents",
f"{self.local_temp_dir}/tests/data/provided/deduper/documents",
"tests/data/provided/deduper",
f"{self.local_temp_dir}/tests/data/provided/deduper",
dirs_exist_ok=True,
)

Expand Down Expand Up @@ -82,6 +86,33 @@ def test_dedupe_by_url(self):
)
return self._compare_dedupe_output(expected, computed) # pyright: ignore

def test_dedupe_bad_filepath(self):
with open(DEDUPE_BAD_FILENAME, "r") as f:
config = json.load(f)

config["documents"][0] = f'{self.local_temp_dir}/{config["documents"][0]}'
config["bloom_filter"]["file"] = f'{self.local_temp_dir}/{config["bloom_filter"]["file"]}'

with NamedTemporaryFile("w") as f:
json.dump(config, f)
f.flush()

with self.assertRaises(DolmaConfigError):
main(argv=["-c", f.name, "dedupe"])

def test_dedupe_good_filepath(self):
with open(DEDUPE_GOOD_FILENAME, "r") as f:
config = json.load(f)

config["documents"][0] = f'{self.local_temp_dir}/{config["documents"][0]}'
config["bloom_filter"]["file"] = f'{self.local_temp_dir}/{config["bloom_filter"]["file"]}'

with NamedTemporaryFile("w") as f:
json.dump(config, f)
f.flush()

main(argv=["-c", f.name, "dedupe"])

def test_dedupe_paragraphs(self):
with open(DEDUPE_PARAGRAPHS, "r") as f:
config = json.load(f)
Expand Down

0 comments on commit 4615d34

Please sign in to comment.