Skip to content

Commit

Permalink
Add CSVFileWrapper (#273)
Browse files Browse the repository at this point in the history
In this PR, the CSV file wrapper is introduced.

Bytes are decoded to strings, the required rows are extracted and labels
are separated from samples. The labels are returned as integers, the
samples are reassembled in csv-like format and converted to bytes.
Pandas is not needed.

The bytes_parser_function takes care of decoding the bytes, splitting
the string to separators and converting/casting the data.
  • Loading branch information
francescodeaglio authored Jun 28, 2023
1 parent 66c8fdc commit 61d66bb
Show file tree
Hide file tree
Showing 6 changed files with 679 additions and 5 deletions.
1 change: 1 addition & 0 deletions integrationtests/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ python $SCRIPT_DIR/test_docker_compose.py
python $SCRIPT_DIR/test_ftp_connections.py
echo "Running storage integration tests"
python $SCRIPT_DIR/storage/integrationtest_storage.py
python $SCRIPT_DIR/storage/integrationtest_storage_csv.py
echo "Running selector integration tests"
python $SCRIPT_DIR/selector/integrationtest_selector.py
echo "Running model storage integration tests"
Expand Down
174 changes: 174 additions & 0 deletions integrationtests/storage/integrationtest_storage_csv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
############
# storage integration tests adapted to CSV input format.
# Unchanged functions are imported from the original test
# Instead of images, we have CSV files. Each file has 25 rows end each row has 5 columns.
# f"A{index}file{file},B{index}file{file},C{index}file{file},{counter}"
# where index is a random number, file is the fileindex and the label (last column) is a global counter

import json
import os
import random
import time
from typing import Tuple

# unchanged functions are imported from the original test file
from integrationtests.storage.integrationtest_storage import (
DATASET_PATH,
check_dataset_availability,
check_get_current_timestamp,
cleanup_dataset_dir,
cleanup_storage_database,
connect_to_storage,
create_dataset_dir,
get_data_in_interval,
get_new_data_since,
)
from modyn.storage.internal.grpc.generated.storage_pb2 import GetRequest, RegisterNewDatasetRequest
from modyn.storage.internal.grpc.generated.storage_pb2_grpc import StorageStub

# Because we have no mapping of file to key (happens in the storage service), we have to keep
# track of the samples we added to the dataset ourselves and compare them to the samples we get
# from the storage service.
FIRST_ADDED_CSVS = []
SECOND_ADDED_CSVS = []
CSV_UPDATED_TIME_STAMPS = []


def register_new_dataset() -> None:
storage_channel = connect_to_storage()

storage = StorageStub(storage_channel)

request = RegisterNewDatasetRequest(
base_path=str(DATASET_PATH),
dataset_id="test_dataset",
description="Test dataset for integration tests of CSV wrapper.",
file_wrapper_config=json.dumps({"file_extension": ".csv", "separator": ",", "label_index": 3}),
file_wrapper_type="CsvFileWrapper",
filesystem_wrapper_type="LocalFilesystemWrapper",
version="0.1.0",
)

response = storage.RegisterNewDataset(request)

assert response.success, "Could not register new dataset."


def add_file_to_dataset(csv_file_content: str, name: str) -> None:
with open(DATASET_PATH / name, "w") as f:
f.write(csv_file_content)
CSV_UPDATED_TIME_STAMPS.append(int(round(os.path.getmtime(DATASET_PATH / name) * 1000)))


def create_random_csv_row(file: int, counter: int) -> str:
index = random.randint(1, 1000)
return f"A{index}file{file},B{index}file{file},C{index}file{file},{counter}"


def create_random_csv_file(file: int, counter: int) -> Tuple[str, list[str], int]:
rows = []
samples = []
for repeat in range(25):
row = create_random_csv_row(file, counter)
counter += 1
rows.append(row)
sample = ",".join(row.split(",")[:3]) # remove the label
samples.append(sample)

return "\n".join(rows), samples, counter


def add_files_to_dataset(start_number: int, end_number: int, files_added: list[bytes], rows_added: list[bytes]) -> None:
create_dataset_dir()
counter = 0
for i in range(start_number, end_number):
csv_file, samples_csv_file, counter = create_random_csv_file(i, counter)
add_file_to_dataset(csv_file, f"csv_{i}.csv")
files_added.append(bytes(csv_file, "utf-8"))
[rows_added.append(bytes(row, "utf-8")) for row in samples_csv_file]


def check_data(keys: list[str], expected_samples: list[bytes]) -> None:
storage_channel = connect_to_storage()

storage = StorageStub(storage_channel)

request = GetRequest(
dataset_id="test_dataset",
keys=keys,
)
samples_counter = 0
for _, response in enumerate(storage.Get(request)):
if len(response.samples) == 0:
assert False, f"Could not get sample with key {keys[samples_counter]}."
for sample in response.samples:
if sample is None:
assert False, f"Could not get sample with key {keys[samples_counter]}."
if sample not in expected_samples:
raise ValueError(
f"Sample {sample} with key {keys[samples_counter]} is not present in the "
f"expected samples {expected_samples}. "
)
samples_counter += 1
assert samples_counter == len(
keys
), f"Could not get all samples. Samples missing: keys: {sorted(keys)} i: {samples_counter}"


def test_storage() -> None:
check_get_current_timestamp() # Check if the storage service is available.
create_dataset_dir()
add_files_to_dataset(0, 10, [], FIRST_ADDED_CSVS) # Add samples to the dataset.
register_new_dataset()
check_dataset_availability() # Check if the dataset is available.

response = None
for i in range(500):
responses = list(get_new_data_since(0))
assert len(responses) < 2, f"Received batched response, shouldn't happen: {responses}"
if len(responses) == 1:
response = responses[0]
if len(response.keys) == 250: # 10 files, each one with 250 samples
break
time.sleep(1)

assert response is not None, "Did not get any response from Storage"
assert len(response.keys) == 250, f"Not all samples were returned. Samples returned: {response.keys}"

check_data(response.keys, FIRST_ADDED_CSVS)

add_files_to_dataset(10, 20, [], SECOND_ADDED_CSVS) # Add more samples to the dataset.

for i in range(500):
responses = list(get_new_data_since(CSV_UPDATED_TIME_STAMPS[9] + 1))
assert len(responses) < 2, f"Received batched response, shouldn't happen: {responses}"
if len(responses) == 1:
response = responses[0]
if len(response.keys) == 250:
break
time.sleep(1)

assert response is not None, "Did not get any response from Storage"
assert len(response.keys) == 250, f"Not all samples were returned. Samples returned: {response.keys}"

check_data(response.keys, SECOND_ADDED_CSVS)

responses = list(get_data_in_interval(0, CSV_UPDATED_TIME_STAMPS[9]))
assert len(responses) == 1, f"Received batched/no response, shouldn't happen: {responses}"
response = responses[0]

check_data(response.keys, FIRST_ADDED_CSVS)

check_get_current_timestamp() # Check if the storage service is still available.


def main() -> None:
try:
test_storage()
finally:
cleanup_dataset_dir()
cleanup_storage_database()


if __name__ == "__main__":
main()
37 changes: 32 additions & 5 deletions modyn/config/schema/modyn_config_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ properties:
sample_batch_size:
type: number
description: |
The size of a batch when requesting new samples from storage. All new samples are returned, however, to reduce the size of a single answer the keys are batched in sizes of `sample_batch_size`.
The size of a batch when requesting new samples from storage. All new samples are returned, however, to reduce
the size of a single answer the keys are batched in sizes of `sample_batch_size`.
sample_dbinsertion_batchsize:
type: number
description: |
Expand All @@ -49,7 +50,9 @@ properties:
sample_table_unlogged:
type: boolean
description: |
This configures whether the table storing all samples is UNLOGGED (= high performance) or crash resilient. Defaults to True. For datasets with many samples (such as Criteo), this is recommended for highest insertion performance. In other scenarios, this might not be necessary.
This configures whether the table storing all samples is UNLOGGED (= high performance) or crash resilient.
Defaults to True. For datasets with many samples (such as Criteo), this is recommended for highest insertion performance.
In other scenarios, this might not be necessary.
force_fallback_insert:
type: boolean
description: |
Expand Down Expand Up @@ -99,15 +102,39 @@ properties:
record_size:
type: number
description: |
The size of each full record in bytes (label + features) for a binary file wrapper.
[BinaryFileWrapper] The size of each full record in bytes (label + features).
label_size:
type: number
description: |
The size of the label field in bytes for a binary file wrapper.
[BinaryFileWrapper] The size of the label field in bytes for a binary file wrapper.
byteorder:
type: string
description: |
The byteorder when reading an integer from multibyte data in a binary file. Should either be "big" or "little".
[BinaryFileWrapper] The byteorder when reading an integer from multibyte data in a binary file.
Should either be "big" or "little".
separator:
type: string
description: |
[CsvFileWrapper] The separator used in the CSV file. The default is ",".
label_index:
type: number
description: |
[CsvFileWrapper] Column index of the label.
For example, if the columns are "width", "height", "age", "label" you should set label_index to 3.
ignore_first_line:
type: boolean
description: |
[CsvFileWrapper] If the first line is the table header, you can skip it setting this parameter to True.
Default is False.
encoding:
type: string
description: |
[CsvFileWrapper] Encoding of the CSV file. Default is utf-8.
validate_file_content:
type: boolean
description: |
[CsvFileWrapper] Whether to validate the file content before inserting the data. It checks that it
is a csv, that all rows are the same size and that the 'label' column exists. Default is True
ignore_last_timestamp:
type: boolean
description: |
Expand Down
Loading

0 comments on commit 61d66bb

Please sign in to comment.