Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Update experiment configs and bugfixes #631

Draft
wants to merge 16 commits into
base: main
Choose a base branch
from
15 changes: 9 additions & 6 deletions benchmark/arxiv_kaggle/data_generation.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,14 @@ def data_json(self) -> Path:
def clean_folder(self) -> None:
self.data_json.unlink()

def load_into_dataframe(self) -> pd.DataFrame:
def load_into_dataframe(self, keep_true_category: bool = False) -> pd.DataFrame:
records = []
for line in self.data_json.read_text().splitlines():
record = json.loads(line)
records.append({field: record[field] for field in ["title", "categories", "versions", "update_date"]})

df = pd.DataFrame(records)
return ArxivKaggleDataGenerator.sanitize_dataframe(df)
return ArxivKaggleDataGenerator.sanitize_dataframe(df, keep_true_category=keep_true_category)

def store_data(
self, cleaned_df: pd.DataFrame, resolution: TimeResolution, test_split: bool, dummy_period: bool = False
Expand All @@ -64,7 +64,9 @@ def store_data(
df_to_csv_with_timestamp(partition[self.fields_to_keep], name, self.data_dir / "train")
else:
df_train, df_test = train_test_split(
partition[self.fields_to_keep], test_size=self.test_holdout, random_state=42
partition[self.fields_to_keep],
test_size=self.test_holdout,
random_state=42,
)
df_to_csv_with_timestamp(df_train, name, self.data_dir / "train")
df_to_csv_with_timestamp(df_test, name, self.data_dir / "test")
Expand All @@ -79,7 +81,7 @@ def store_data(
)

@staticmethod
def sanitize_dataframe(raw_df: pd.DataFrame) -> pd.DataFrame:
def sanitize_dataframe(raw_df: pd.DataFrame, keep_true_category: bool = False) -> pd.DataFrame:
def extract_first_version_timestamp(row: Any) -> Any:
versions = row["versions"]
if len(versions) == 0:
Expand All @@ -94,8 +96,9 @@ def extract_first_version_timestamp(row: Any) -> Any:
# we only take the first category (like in the wilds)
transformed["category"] = transformed["categories"].str.split(" ").str[0]

# to int-categorical
transformed["category"] = pd.Categorical(transformed["category"]).codes
if not keep_true_category:
# to int-categorical
transformed["category"] = pd.Categorical(transformed["category"]).codes
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For exploring dataset with original labels.


# we only take the first version timestamp
transformed["first_version_timestamp"] = transformed.apply(extract_first_version_timestamp, axis=1)
Expand Down
15 changes: 9 additions & 6 deletions benchmark/huffpost_kaggle/data_generation.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,14 @@ def data_json(self) -> Path:
def clean_folder(self) -> None:
self.data_json.unlink()

def load_into_dataframe(self) -> pd.DataFrame:
def load_into_dataframe(self, keep_true_category: bool = False) -> pd.DataFrame:
records = []
for line in self.data_json.read_text().splitlines():
record = json.loads(line)
records.append({field: record[field] for field in ["headline", "category", "date"]})

df = pd.DataFrame(records)
return HuffpostKaggleDataGenerator.sanitize_dataframe(df)
return HuffpostKaggleDataGenerator.sanitize_dataframe(df, keep_true_category=keep_true_category)

def store_data(
self, cleaned_df: pd.DataFrame, resolution: TimeResolution, test_split: bool, dummy_period: bool = False
Expand All @@ -64,7 +64,9 @@ def store_data(
df_to_csv_with_timestamp(partition[self.fields_to_keep], name, self.data_dir / "train")
else:
df_train, df_test = train_test_split(
partition[self.fields_to_keep], test_size=self.test_holdout, random_state=42
partition[self.fields_to_keep],
test_size=self.test_holdout,
random_state=42,
)
df_to_csv_with_timestamp(df_train, name, self.data_dir / "train")
df_to_csv_with_timestamp(df_test, name, self.data_dir / "test")
Expand All @@ -79,14 +81,15 @@ def store_data(
)

@staticmethod
def sanitize_dataframe(raw_df: pd.DataFrame) -> pd.DataFrame:
def sanitize_dataframe(raw_df: pd.DataFrame, keep_true_category: bool = False) -> pd.DataFrame:
transformed = raw_df

# escape new lines
transformed["headline"] = transformed["headline"].str.replace("\n", " ").replace(r"\s+", " ", regex=True)

# to int-categorical
transformed["category"] = pd.Categorical(transformed["category"]).codes
if not keep_true_category:
# to int-categorical
transformed["category"] = pd.Categorical(transformed["category"]).codes

# parse the date
transformed["date"] = pd.to_datetime(transformed["date"])
Expand Down
3 changes: 2 additions & 1 deletion benchmark/utils/time_resolution_binning.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ def bin_dataframe_wrt_time_resolution(
pandas_time_unit = {
"year": "Y",
"month": "M",
"week": "W",
"day": "D",
"hour": "h",
"minute": "min",
Expand All @@ -44,7 +45,7 @@ def bin_dataframe_wrt_time_resolution(

def df_to_csv_with_timestamp(df: pd.DataFrame, period: pd.Period, data_dir: Path) -> None:
"""Stores the dataframe in a file with the timestamp."""
label_file = data_dir / f"{period}.csv"
label_file = data_dir / f"{str(period).replace('/', '_')}.csv"
df.to_csv(label_file, index=False, sep="\t", lineterminator="\n", header=False)
timestamp = int(period.to_timestamp().to_pydatetime().timestamp())
os.utime(label_file, (timestamp, timestamp))
61 changes: 32 additions & 29 deletions experiments/arxiv/compare_trigger_policies/pipeline_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,31 @@
)
from modyn.config.schema.pipeline.evaluation.metrics import AccuracyMetricConfig, F1ScoreMetricConfig

arxiv_bytes_parser_function = (
"import torch\n"
"import numpy as np\n"
"def bytes_parser_function(data: bytes) -> str:\n"
" return str(data, 'utf8')"
)
arxiv_evaluation_transformer_function = (
"import torch\n"
"def evaluation_transformer_function(model_output: torch.Tensor) -> torch.Tensor:\n"
" return torch.argmax(model_output, dim=-1)\n"
)


def gen_pipeline_config(
name: str,
config_ref: str,
trigger_config: TriggerConfig,
eval_handlers: list[EvalHandlerConfig],
gpu_device: str,
seed: int,
) -> ModynPipelineConfig:
num_classes = 172
bytes_parser_function = (
"import torch\n"
"import numpy as np\n"
"def bytes_parser_function(data: bytes) -> str:\n"
" return str(data, 'utf8')"
)
evaluation_transformer_function = (
"import torch\n"
"def evaluation_transformer_function(model_output: torch.Tensor) -> torch.Tensor:\n"
" return torch.argmax(model_output, dim=-1)\n"
)
return ModynPipelineConfig(
pipeline=Pipeline(name=name, description="Arxiv pipeline for comparing trigger policies", version="0.0.1"),
pipeline=Pipeline(
name=config_ref, description="Arxiv pipeline for comparing trigger policies", version="0.0.1"
),
model=ModelConfig(id="ArticleNet", config={"num_classes": num_classes}),
model_storage=PipelineModelStorageConfig(full_model_strategy=FullModelStrategy(name="PyTorchFullModel")),
training=TrainingConfig(
Expand All @@ -48,18 +51,14 @@ def gen_pipeline_config(
dataloader_workers=1,
use_previous_model=True,
initial_model="random",
batch_size=128,
batch_size=128, # gpu memory limit does't allow for larger batch sizes
shuffle=True,
optimizers=[
OptimizerConfig(
name="default",
algorithm="SGD",
algorithm="AdamW",
source="PyTorch",
param_groups=[
OptimizerParamGroup(
module="model", config={"lr": 0.00002, "momentum": 0.9, "weight_decay": 0.01}
)
],
param_groups=[OptimizerParamGroup(module="model", config={"lr": 0.00002, "weight_decay": 0.01})],
)
],
optimization_criterion=OptimizationCriterion(name="CrossEntropyLoss"),
Expand All @@ -73,41 +72,45 @@ def gen_pipeline_config(
),
data=DataConfig(
dataset_id="arxiv_kaggle_train",
bytes_parser_function=bytes_parser_function,
bytes_parser_function=arxiv_bytes_parser_function,
tokenizer="DistilBertTokenizerTransform",
),
trigger=trigger_config,
evaluation=EvaluationConfig(
handlers=eval_handlers,
device=gpu_device,
after_pipeline_evaluation_workers=12,
after_training_evaluation_workers=12,
after_pipeline_evaluation_workers=8,
after_training_evaluation_workers=8,
datasets=[
EvalDataConfig(
dataset_id=yb_dataset_name,
bytes_parser_function=bytes_parser_function,
batch_size=128,
bytes_parser_function=arxiv_bytes_parser_function,
batch_size=512,
dataloader_workers=1,
tokenizer="DistilBertTokenizerTransform",
metrics=[
AccuracyMetricConfig(evaluation_transformer_function=evaluation_transformer_function, topn=1),
AccuracyMetricConfig(
evaluation_transformer_function=arxiv_evaluation_transformer_function, topn=1
),
AccuracyMetricConfig(evaluation_transformer_function="", topn=2),
AccuracyMetricConfig(evaluation_transformer_function="", topn=5),
AccuracyMetricConfig(evaluation_transformer_function="", topn=10),
F1ScoreMetricConfig(
evaluation_transformer_function=evaluation_transformer_function,
evaluation_transformer_function=arxiv_evaluation_transformer_function,
num_classes=num_classes,
average="weighted",
),
F1ScoreMetricConfig(
evaluation_transformer_function=evaluation_transformer_function,
evaluation_transformer_function=arxiv_evaluation_transformer_function,
num_classes=num_classes,
average="macro",
),
F1ScoreMetricConfig(
evaluation_transformer_function=evaluation_transformer_function,
evaluation_transformer_function=arxiv_evaluation_transformer_function,
num_classes=num_classes,
average="micro",
),
# RocAucMetric is traditionally used for binary classification
],
)
for yb_dataset_name in ["arxiv_kaggle_all", "arxiv_kaggle_train", "arxiv_kaggle_test"]
Expand Down
Loading
Loading