diff --git a/docs/preprocess_datasets/command_line.rst b/docs/preprocess_datasets/command_line.rst index c82b5b6e..c8761b68 100644 --- a/docs/preprocess_datasets/command_line.rst +++ b/docs/preprocess_datasets/command_line.rst @@ -77,6 +77,19 @@ Datasets in delimited file formats such as CSVs can be preprocessed with ``mariu See this `example `_. +Custom datasets stored in s3 can also be preprocessed using spark mode of ``marius_preprocess``. If the supplied +edge paths start with ``s3a://``, then the spark preprocessor reads files from s3 and writes back the processed +output to both local and the given s3 bucket (read from an environment variable). + +``S3_BUCKET``, ``AWS_ACCESS_KEY_ID``, and ``AWS_SECRET_ACCESS_KEY`` environment variables need to be set for this +to work. + +.. code-block:: bash + $ export S3_BUCKET= + $ export AWS_ACCESS_KEY_ID=<...> + $ export AWS_SECRET_ACCESS_KEY=<...> + $ marius_preprocess --edges s3a://fb15k237/train.txt s3a://fb15k237/valid.txt s3a://fb15k237/test.txt + --output_directory datasets/custom_spark_s3/ --spark Usage ----------------------- diff --git a/setup.cfg b/setup.cfg index 5a76bbc8..02b68e7c 100644 --- a/setup.cfg +++ b/setup.cfg @@ -41,6 +41,7 @@ install_requires = psutil>=5.9 GPUtil>=1.4 importlib_metadata>=4.0.0 + s3fs>=2022.1.0 zip_safe = false python_requires = >=3.6 diff --git a/src/python/tools/preprocess/converters/readers/spark_readers.py b/src/python/tools/preprocess/converters/readers/spark_readers.py index 0a01a394..db4d373c 100644 --- a/src/python/tools/preprocess/converters/readers/spark_readers.py +++ b/src/python/tools/preprocess/converters/readers/spark_readers.py @@ -1,4 +1,5 @@ from pathlib import Path +import os from pyspark.sql import SparkSession from pyspark.sql.dataframe import DataFrame @@ -9,7 +10,7 @@ class SparkDelimitedFileReader(Reader): def __init__( self, - spark: SparkSession, + spark: SparkSession.builder.appName("marius_spark").getOrCreate(), train_edges: Path, valid_edges: Path = None, test_edges: Path = None, @@ -39,6 +40,21 @@ def __init__( self.spark = spark + if train_edges.startswith("s3a://"): + if 'AWS_ACCESS_KEY_ID' not in os.environ or 'AWS_SECRET_ACCESS_KEY' not in os.environ or \ + 'S3_BUCKET' not in os.environ: + print("Edge path is an s3 path, but required env variables not set. {}, {} and {} need to be set".format( + "AWS_ACCESS_KEY_ID", + "AWS_SECRET_ACCESS_KEY", + "S3_BUCKET" + )) + exit() + self.spark._jsc.hadoopConfiguration().set('fs.s3a.aws.credentials.provider', + 'org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider') + self.spark._jsc.hadoopConfiguration().set("fs.s3a.access.key", os.getenv('AWS_ACCESS_KEY_ID')) + self.spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key", os.getenv('AWS_SECRET_ACCESS_KEY')) + self.spark._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") + self.train_edges = train_edges self.valid_edges = valid_edges self.test_edges = test_edges diff --git a/src/python/tools/preprocess/converters/spark_converter.py b/src/python/tools/preprocess/converters/spark_converter.py index 20ec31e0..a4cbccbc 100644 --- a/src/python/tools/preprocess/converters/spark_converter.py +++ b/src/python/tools/preprocess/converters/spark_converter.py @@ -118,7 +118,8 @@ def __init__( spark_executor_memory: str = "4g", ): self.output_dir = output_dir - + self.use_s3 = True if train_edges.startswith("s3a://") else False + self.spark = ( SparkSession.builder.appName(SPARK_APP_NAME) .config("spark.driver.memory", spark_driver_memory) @@ -143,7 +144,7 @@ def __init__( else: self.partitioner = None - self.writer = SparkWriter(self.spark, self.output_dir, partitioned_evaluation) + self.writer = SparkWriter(self.spark, self.output_dir, self.use_s3, partitioned_evaluation) self.train_split = None self.valid_split = None @@ -232,7 +233,7 @@ def convert(self): train_edges_df, valid_edges_df, test_edges_df = self.partitioner.partition_edges( train_edges_df, valid_edges_df, test_edges_df, nodes_df, self.num_partitions ) - + return self.writer.write_to_binary( train_edges_df, valid_edges_df, test_edges_df, nodes_df, rels_df, self.num_partitions ) diff --git a/src/python/tools/preprocess/converters/writers/spark_writer.py b/src/python/tools/preprocess/converters/writers/spark_writer.py index b7468817..9add5767 100644 --- a/src/python/tools/preprocess/converters/writers/spark_writer.py +++ b/src/python/tools/preprocess/converters/writers/spark_writer.py @@ -1,6 +1,7 @@ import glob import os import re +import shutil import sys from pathlib import Path from random import randint @@ -8,6 +9,7 @@ import numpy as np import pandas as pd from omegaconf import OmegaConf +import s3fs from marius.tools.configuration.constants import PathConstants from marius.tools.configuration.marius_config import DatasetConfig @@ -109,11 +111,18 @@ def write_partitioned_df_to_csv(partition_triples, num_partitions, output_filena class SparkWriter(object): - def __init__(self, spark, output_dir, partitioned_evaluation): + def __init__(self, spark, output_dir, output_to_s3, partitioned_evaluation): super().__init__() self.spark = spark self.output_dir = output_dir + self.output_to_s3 = output_to_s3 + if self.output_to_s3: + self.s3_bucket = os.getenv("S3_BUCKET") + self.s3 = s3fs.S3FileSystem( + key=os.getenv('AWS_ACCESS_KEY_ID'), + secret=os.getenv('AWS_SECRET_ACCESS_KEY') + ) self.partitioned_evaluation = partitioned_evaluation def write_to_csv(self, train_edges_df, valid_edges_df, test_edges_df, nodes_df, rels_df, num_partitions): @@ -205,15 +214,31 @@ def write_to_binary(self, train_edges_df, valid_edges_df, test_edges_df, nodes_d tmp_test_file = TMP_DATA_DIRECTORY + "tmp_test_edges.tmp" print("Converting to binary") - os.rename(train_file, tmp_train_file) + shutil.move(train_file, tmp_train_file) convert_to_binary(tmp_train_file, train_file) if valid_edges_df is not None: - os.rename(valid_file, tmp_valid_file) + shutil.move(valid_file, tmp_valid_file) convert_to_binary(tmp_valid_file, valid_file) if test_edges_df is not None: - os.rename(test_file, tmp_test_file) + shutil.move(test_file, tmp_test_file) convert_to_binary(tmp_test_file, test_file) + if self.output_to_s3: + self.upload_files_to_s3() + return dataset_stats + + def upload_files_to_s3(self): + dataset_yaml_file = str(self.output_dir / Path("dataset.yaml")) + train_file = str(self.output_dir / Path(PathConstants.train_edges_path)) + valid_file = str(self.output_dir / Path(PathConstants.valid_edges_path)) + test_file = str(self.output_dir / Path(PathConstants.test_edges_path)) + + self.s3.put(dataset_yaml_file, str(self.s3_bucket / Path("dataset.yaml"))) + self.s3.put(train_file, str(self.s3_bucket / Path(PathConstants.train_edges_path))) + self.s3.put(valid_file, str(self.s3_bucket / Path(PathConstants.valid_edges_path))) + self.s3.put(test_file, str(self.s3_bucket / Path(PathConstants.test_edges_path))) + + # we can optionally delete the pre-processed files from the local system