Skip to content

Commit

Permalink
Introduce policies from DeepCore (#304)
Browse files Browse the repository at this point in the history
With this PR, we introduce several new downsampling methods: `Craig`,
`GradMatch`, `Submodular` (FacilityLocation, GraphCut and
LogDeterminant), `KcenterGreedy` and `Uncertainty` (Margin, Entropy, and
LeastConfidence). These methods are adapted from
[DEEPCORE](https://github.com/PatrickZH/DeepCore)

The following functionalities are implemented:
- `CoresetSupportingModule` support in both BTS and STB mode. Embeddings
are registered and provided in the `inform_samples` method if required.
- addition of the `device` in the constructor of the downsampling
methods. For example, kcenter uses a different implementation depending
on whether it runs on CPU or GPU.
- implementation of the techniques mentioned above. Many of which,
having a common behaviour, are abstracted with the
`AbstractMatrixDownsamplingStrategy` class
- fixing some bugs found by running the experiments (typically moving
weights to the correct device)

All methods were tested by comparing the results obtained with deepcore
in various controlled experiments. The available tests serve this
purpose.
  • Loading branch information
francescodeaglio authored Oct 11, 2023
1 parent 73f9c0a commit cc139e0
Show file tree
Hide file tree
Showing 46 changed files with 2,782 additions and 40 deletions.
1 change: 1 addition & 0 deletions .pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ ignore-paths=^modyn/trainer_server/internal/grpc/generated/.*$,
^modyn/models/dlrm/cuda_src/.*$,
^modyn/models/dlrm/utils/.*$,
^modyn/models/dlrm/nn/.*$,
^modyn/trainer_server/internal/trainer/remote_downsamplers/deepcore_utils/.*$,

# Files or directories matching the regex patterns are skipped. The regex
# matches against base names, not paths. The default value ignores Emacs file
Expand Down
1 change: 1 addition & 0 deletions environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ dependencies:
- numpy
- pandas
- tensorboard
- scipy
- pyftpdlib
- types-protobuf
- types-psycopg2
Expand Down
7 changes: 7 additions & 0 deletions modyn/common/trigger_sample/trigger_sample_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,13 @@ def _parse_file(self, file_path: Path) -> np.ndarray:
Returns:
list[tuple[int, float]]: List of trigger samples.
"""

# When there are few samples, it may happen that some workers don't have any samples to store.
# Therefore, they do not write anything; you get an error if you try to read their file.
# This way, it returns an empty array if the worker has not written anything.
if not os.path.isfile(file_path):
return np.ndarray(0, dtype="i8,f8")

return np.load(file_path, allow_pickle=False, fix_imports=False)

def _parse_file_subset(self, file_path: Path, start_index: int, end_index: int) -> np.memmap:
Expand Down
2 changes: 1 addition & 1 deletion modyn/models/articlenet/articlenet.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def __init__(self, model_configuration: dict[str, Any], device: str, amp: bool)
self.model.to(device)


class DistilBertFeaturizer(DistilBertModel):
class DistilBertFeaturizer(DistilBertModel): # pylint: disable=abstract-method
def __init__(self, config: Any) -> None:
super().__init__(config)
self.d_out = config.hidden_size
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
import os

from .abstract_downsampling_strategy import AbstractDownsamplingStrategy # noqa: F401
from .craig_downsampling_strategy import CraigDownsamplingStrategy # noqa: F401
from .downsampling_scheduler import DownsamplingScheduler, instantiate_scheduler # noqa: F401
from .gradmatch_downsampling_strategy import GradMatchDownsamplingStrategy # noqa: F401
from .gradnorm_downsampling_strategy import GradNormDownsamplingStrategy # noqa: F401
from .kcentergreedy_downsampling_strategy import KcenterGreedyDownsamplingStrategy # noqa: F401
from .loss_downsampling_strategy import LossDownsamplingStrategy # noqa: F401
from .no_downsampling_strategy import NoDownsamplingStrategy # noqa: F401
from .submodular_downsampling_strategy import SubmodularDownsamplingStrategy # noqa: F401
from .uncertainty_downsampling_strategy import UncertaintyDownsamplingStrategy # noqa: F401
from .utils import instantiate_downsampler # noqa: F401

files = os.listdir(os.path.dirname(__file__))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ def __init__(self, downsampling_config: dict, maximum_keys_in_memory: int) -> No

self.requires_remote_computation = True
self.maximum_keys_in_memory = maximum_keys_in_memory
self.downsampling_config = downsampling_config
self.downsampling_params = self._build_downsampling_params()
self.status_bar_scale = self._compute_status_bar_scale()

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from modyn.selector.internal.selector_strategies.downsampling_strategies import AbstractDownsamplingStrategy
from modyn.utils import DownsamplingMode


class CraigDownsamplingStrategy(AbstractDownsamplingStrategy):
def __init__(self, downsampling_config: dict, maximum_keys_in_memory: int):
super().__init__(downsampling_config, maximum_keys_in_memory)

self.remote_downsampling_strategy_name = "RemoteCraigDownsamplingStrategy"

def _build_downsampling_params(self) -> dict:
config = super()._build_downsampling_params()
config["selection_batch"] = self.downsampling_config.get("selection_batch", 64)
config["balance"] = self.downsampling_config.get("balance", False)
config["greedy"] = self.downsampling_config.get("greedy", "NaiveGreedy")
if config["balance"] and self.downsampling_mode == DownsamplingMode.BATCH_THEN_SAMPLE:
raise ValueError("Balanced sampling (balance=True) can be used only in Sample then Batch mode.")
return config
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from modyn.selector.internal.selector_strategies.downsampling_strategies import AbstractDownsamplingStrategy
from modyn.utils import DownsamplingMode


class GradMatchDownsamplingStrategy(AbstractDownsamplingStrategy):
def __init__(self, downsampling_config: dict, maximum_keys_in_memory: int):
super().__init__(downsampling_config, maximum_keys_in_memory)

self.remote_downsampling_strategy_name = "RemoteGradMatchDownsamplingStrategy"

def _build_downsampling_params(self) -> dict:
config = super()._build_downsampling_params()
config["balance"] = self.downsampling_config.get("balance", False)
if config["balance"] and self.downsampling_mode == DownsamplingMode.BATCH_THEN_SAMPLE:
raise ValueError("Balanced sampling (balance=True) can be used only in Sample then Batch mode.")
return config
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from modyn.selector.internal.selector_strategies.downsampling_strategies import AbstractDownsamplingStrategy
from modyn.utils import DownsamplingMode


class KcenterGreedyDownsamplingStrategy(AbstractDownsamplingStrategy):
def __init__(self, downsampling_config: dict, maximum_keys_in_memory: int):
super().__init__(downsampling_config, maximum_keys_in_memory)

self.remote_downsampling_strategy_name = "RemoteKcenterGreedyDownsamplingStrategy"

def _build_downsampling_params(self) -> dict:
config = super()._build_downsampling_params()
config["balance"] = self.downsampling_config.get("balance", False)
if config["balance"] and self.downsampling_mode == DownsamplingMode.BATCH_THEN_SAMPLE:
raise ValueError("Balanced sampling (balance=True) can be used only in Sample then Batch mode.")
return config
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from modyn.selector.internal.selector_strategies.downsampling_strategies import AbstractDownsamplingStrategy
from modyn.trainer_server.internal.trainer.remote_downsamplers.deepcore_utils.submodular_function import (
SUBMODULAR_FUNCTIONS,
)
from modyn.utils import DownsamplingMode


class SubmodularDownsamplingStrategy(AbstractDownsamplingStrategy):
def __init__(self, downsampling_config: dict, maximum_keys_in_memory: int):
super().__init__(downsampling_config, maximum_keys_in_memory)

self.remote_downsampling_strategy_name = "RemoteSubmodularDownsamplingStrategy"

def _build_downsampling_params(self) -> dict:
config = super()._build_downsampling_params()

if "submodular_function" not in self.downsampling_config:
raise ValueError(
f"Please specify the submodular function used to select the datapoints. "
f"Available functions: {SUBMODULAR_FUNCTIONS}, param submodular_function"
)
config["submodular_function"] = self.downsampling_config["submodular_function"]

if "submodular_optimizer" in self.downsampling_config:
config["submodular_optimizer"] = self.downsampling_config["submodular_optimizer"]

config["selection_batch"] = self.downsampling_config.get("selection_batch", 64)

config["balance"] = self.downsampling_config.get("balance", False)
if config["balance"] and self.downsampling_mode == DownsamplingMode.BATCH_THEN_SAMPLE:
raise ValueError("Balanced sampling (balance=True) can be used only in Sample then Batch mode.")

return config
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from modyn.selector.internal.selector_strategies.downsampling_strategies import AbstractDownsamplingStrategy
from modyn.utils import DownsamplingMode


class UncertaintyDownsamplingStrategy(AbstractDownsamplingStrategy):
def __init__(self, downsampling_config: dict, maximum_keys_in_memory: int):
super().__init__(downsampling_config, maximum_keys_in_memory)

self.remote_downsampling_strategy_name = "RemoteUncertaintyDownsamplingStrategy"

def _build_downsampling_params(self) -> dict:
config = super()._build_downsampling_params()

if "score_metric" not in self.downsampling_config:
raise ValueError(
"Please specify the metric used to score uncertainty for the datapoints. "
"Available metrics : LeastConfidence, Entropy, Margin"
"Use the pipeline parameter score_metric"
)
config["score_metric"] = self.downsampling_config["score_metric"]

config["balance"] = self.downsampling_config.get("balance", False)
if config["balance"] and self.downsampling_mode == DownsamplingMode.BATCH_THEN_SAMPLE:
raise ValueError("Balanced sampling (balance=True) can be used only in Sample then Batch mode.")

return config
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import math

import pytest
import torch
from modyn.trainer_server.internal.dataset.key_sources import LocalKeySource
from modyn.trainer_server.internal.dataset.local_dataset_writer import LocalDatasetWriter
Expand Down Expand Up @@ -75,9 +74,9 @@ def test_read_dirty_directory():
write_directory(other_pipeline, 1, TMP_PATH_TEST, number_of_files=10, maximum_keys_in_memory=maximum_keys_in_memory)

keysource = LocalKeySource(pipeline_id=current_pipeline, trigger_id=1, offline_dataset_path=TMP_PATH_TEST)

assert keysource.get_num_data_partitions() == 0
with pytest.raises(FileNotFoundError):
keysource.get_keys_and_weights(0, 0)
assert keysource.get_keys_and_weights(0, 0) == ([], [])

write_directory(
current_pipeline, 1, TMP_PATH_TEST, number_of_files=4, maximum_keys_in_memory=maximum_keys_in_memory
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import numpy as np
import torch
from modyn.models.coreset_methods_support import CoresetSupportingModule
from torch import nn


class DummyModel(CoresetSupportingModule):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.hidden_layer = nn.Linear(in_features=1, out_features=10)
self.output_layer = nn.Linear(in_features=10, out_features=1)

def forward(self, input_tensor):
input_tensor = torch.relu(self.hidden_layer(input_tensor))
input_tensor = self.embedding_recorder(input_tensor)
outputs = self.output_layer(input_tensor)
return outputs

def get_last_layer(self):
return self.output_layer


def assert_close_matrices(matrix1, matrix2):
for row1, row2 in zip(matrix1, matrix2):
assert len(row1) == len(row2)
for el1, el2 in zip(row1, row2):
assert np.isclose(el1, el2, 1e-2)
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
# pylint: disable=abstract-class-instantiated,unused-argument
from unittest.mock import patch

import numpy as np
import torch
from modyn.trainer_server.internal.trainer.remote_downsamplers.abstract_matrix_downsampling_strategy import (
AbstractMatrixDownsamplingStrategy,
MatrixContent,
)


def get_sampler_config(balance=False):
downsampling_ratio = 50
per_sample_loss_fct = torch.nn.CrossEntropyLoss(reduction="none")

params_from_selector = {
"downsampling_ratio": downsampling_ratio,
"sample_then_batch": False,
"args": {},
"balance": balance,
}
return 0, 0, 0, params_from_selector, per_sample_loss_fct, "cpu"


@patch.multiple(AbstractMatrixDownsamplingStrategy, __abstractmethods__=set())
def test_init():
amds = AbstractMatrixDownsamplingStrategy(*get_sampler_config())

assert amds.requires_coreset_supporting_module
assert not amds.matrix_elements
assert amds.matrix_content is None


@patch.multiple(AbstractMatrixDownsamplingStrategy, __abstractmethods__=set())
def test_collect_embeddings():
amds = AbstractMatrixDownsamplingStrategy(*get_sampler_config())

amds.matrix_content = MatrixContent.EMBEDDINGS

assert amds.requires_coreset_supporting_module
assert not amds.matrix_elements # thank you pylint! amds.matrix_elements == []

first_embedding = torch.randn((4, 5))
second_embedding = torch.randn((3, 5))
amds.inform_samples([1, 2, 3, 4], None, None, first_embedding)
amds.inform_samples([21, 31, 41], None, None, second_embedding)

assert np.concatenate(amds.matrix_elements).shape == (7, 5)
assert all(torch.equal(el1, el2) for el1, el2 in zip(amds.matrix_elements, [first_embedding, second_embedding]))
assert amds.index_sampleid_map == [1, 2, 3, 4, 21, 31, 41]

third_embedding = torch.randn((23, 5))
amds.inform_samples(list(range(1000, 1023)), None, None, third_embedding)

assert np.concatenate(amds.matrix_elements).shape == (30, 5)
assert all(
torch.equal(el1, el2)
for el1, el2 in zip(amds.matrix_elements, [first_embedding, second_embedding, third_embedding])
)
assert amds.index_sampleid_map == [1, 2, 3, 4, 21, 31, 41] + list(range(1000, 1023))


@patch.multiple(AbstractMatrixDownsamplingStrategy, __abstractmethods__=set())
@patch.object(
AbstractMatrixDownsamplingStrategy, "_select_indexes_from_matrix", return_value=([0, 2], torch.Tensor([1.0, 3.0]))
)
def test_collect_embedding_balance(test_amds):
amds = AbstractMatrixDownsamplingStrategy(*get_sampler_config(True))

amds.matrix_content = MatrixContent.EMBEDDINGS

assert amds.requires_coreset_supporting_module
assert amds.requires_data_label_by_label
assert not amds.matrix_elements # thank you pylint! amds.matrix_elements == []

first_embedding = torch.randn((4, 5))
second_embedding = torch.randn((3, 5))
amds.inform_samples([1, 2, 3, 4], None, None, first_embedding)
amds.inform_samples([21, 31, 41], None, None, second_embedding)

assert np.concatenate(amds.matrix_elements).shape == (7, 5)
assert all(torch.equal(el1, el2) for el1, el2 in zip(amds.matrix_elements, [first_embedding, second_embedding]))
assert amds.index_sampleid_map == [1, 2, 3, 4, 21, 31, 41]

amds.inform_end_of_current_label()

third_embedding = torch.randn((23, 5))
assert len(amds.matrix_elements) == 0
amds.inform_samples(list(range(1000, 1023)), None, None, third_embedding)

assert np.concatenate(amds.matrix_elements).shape == (23, 5)
assert all(torch.equal(el1, el2) for el1, el2 in zip(amds.matrix_elements, [third_embedding]))
assert amds.index_sampleid_map == list(range(1000, 1023))
assert amds.already_selected_samples == [1, 3]
amds.inform_end_of_current_label()
assert amds.already_selected_samples == [1, 3, 1000, 1002]


@patch.multiple(AbstractMatrixDownsamplingStrategy, __abstractmethods__=set())
def test_collect_gradients():
amds = AbstractMatrixDownsamplingStrategy(*get_sampler_config())
amds.matrix_content = MatrixContent.GRADIENTS

first_output = torch.randn((4, 2))
first_output.requires_grad = True
first_target = torch.tensor([1, 1, 1, 0])
first_embedding = torch.randn((4, 5))
amds.inform_samples([1, 2, 3, 4], first_output, first_target, first_embedding)

second_output = torch.randn((3, 2))
second_output.requires_grad = True
second_target = torch.tensor([0, 1, 0])
second_embedding = torch.randn((3, 5))
amds.inform_samples([21, 31, 41], second_output, second_target, second_embedding)

assert len(amds.matrix_elements) == 2

# expected shape = (a,b)
# a = 7 (4 samples in the first batch and 3 samples in the second batch)
# b = 5 * 2 + 2 where 5 is the input dimension of the last layer and 2 is the output one
assert np.concatenate(amds.matrix_elements).shape == (7, 12)

assert amds.index_sampleid_map == [1, 2, 3, 4, 21, 31, 41]
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ def test_batch_then_sample_general():
downsampling_ratio = 50

params_from_selector = {"downsampling_ratio": downsampling_ratio}
sampler = AbstractRemoteDownsamplingStrategy(154, 128, 64, params_from_selector)
sampler = AbstractRemoteDownsamplingStrategy(154, 128, 64, params_from_selector, "cpu")

assert hasattr(sampler, "downsampling_ratio")
assert sampler.downsampling_ratio == 50
Expand Down
Loading

0 comments on commit cc139e0

Please sign in to comment.