Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark preprocessor now works with s3 #118

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions docs/preprocess_datasets/command_line.rst
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,19 @@ Datasets in delimited file formats such as CSVs can be preprocessed with ``mariu

See this `example <custom_dataset_example_>`_.

Custom datasets stored in s3 can also be preprocessed using spark mode of ``marius_preprocess``. If the supplied
edge paths start with ``s3a://``, then the spark preprocessor reads files from s3 and writes back the processed
output to both local and the given s3 bucket (read from an environment variable).

``S3_BUCKET``, ``AWS_ACCESS_KEY_ID``, and ``AWS_SECRET_ACCESS_KEY`` environment variables need to be set for this
to work.

.. code-block:: bash
$ export S3_BUCKET=<bucket to which the preprocessed files will be written>
$ export AWS_ACCESS_KEY_ID=<...>
$ export AWS_SECRET_ACCESS_KEY=<...>
$ marius_preprocess --edges s3a://fb15k237/train.txt s3a://fb15k237/valid.txt s3a://fb15k237/test.txt
--output_directory datasets/custom_spark_s3/ --spark

Usage
-----------------------
Expand Down
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ install_requires =
psutil>=5.9
GPUtil>=1.4
importlib_metadata>=4.0.0
s3fs>=2022.1.0

zip_safe = false
python_requires = >=3.6
Expand Down
18 changes: 17 additions & 1 deletion src/python/tools/preprocess/converters/readers/spark_readers.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from pathlib import Path
import os

from pyspark.sql import SparkSession
from pyspark.sql.dataframe import DataFrame
Expand All @@ -9,7 +10,7 @@
class SparkDelimitedFileReader(Reader):
def __init__(
self,
spark: SparkSession,
spark: SparkSession.builder.appName("marius_spark").getOrCreate(),
train_edges: Path,
valid_edges: Path = None,
test_edges: Path = None,
Expand Down Expand Up @@ -39,6 +40,21 @@ def __init__(

self.spark = spark

if train_edges.startswith("s3a://"):
if 'AWS_ACCESS_KEY_ID' not in os.environ or 'AWS_SECRET_ACCESS_KEY' not in os.environ or \
'S3_BUCKET' not in os.environ:
print("Edge path is an s3 path, but required env variables not set. {}, {} and {} need to be set".format(
"AWS_ACCESS_KEY_ID",
"AWS_SECRET_ACCESS_KEY",
"S3_BUCKET"
))
exit()
self.spark._jsc.hadoopConfiguration().set('fs.s3a.aws.credentials.provider',
'org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider')
self.spark._jsc.hadoopConfiguration().set("fs.s3a.access.key", os.getenv('AWS_ACCESS_KEY_ID'))
self.spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key", os.getenv('AWS_SECRET_ACCESS_KEY'))
self.spark._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")

self.train_edges = train_edges
self.valid_edges = valid_edges
self.test_edges = test_edges
Expand Down
7 changes: 4 additions & 3 deletions src/python/tools/preprocess/converters/spark_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ def __init__(
spark_executor_memory: str = "4g",
):
self.output_dir = output_dir

self.use_s3 = True if train_edges.startswith("s3a://") else False

self.spark = (
SparkSession.builder.appName(SPARK_APP_NAME)
.config("spark.driver.memory", spark_driver_memory)
Expand All @@ -143,7 +144,7 @@ def __init__(
else:
self.partitioner = None

self.writer = SparkWriter(self.spark, self.output_dir, partitioned_evaluation)
self.writer = SparkWriter(self.spark, self.output_dir, self.use_s3, partitioned_evaluation)

self.train_split = None
self.valid_split = None
Expand Down Expand Up @@ -232,7 +233,7 @@ def convert(self):
train_edges_df, valid_edges_df, test_edges_df = self.partitioner.partition_edges(
train_edges_df, valid_edges_df, test_edges_df, nodes_df, self.num_partitions
)

return self.writer.write_to_binary(
train_edges_df, valid_edges_df, test_edges_df, nodes_df, rels_df, self.num_partitions
)
33 changes: 29 additions & 4 deletions src/python/tools/preprocess/converters/writers/spark_writer.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
import glob
import os
import re
import shutil
import sys
from pathlib import Path
from random import randint

import numpy as np
import pandas as pd
from omegaconf import OmegaConf
import s3fs

from marius.tools.configuration.constants import PathConstants
from marius.tools.configuration.marius_config import DatasetConfig
Expand Down Expand Up @@ -109,11 +111,18 @@ def write_partitioned_df_to_csv(partition_triples, num_partitions, output_filena


class SparkWriter(object):
def __init__(self, spark, output_dir, partitioned_evaluation):
def __init__(self, spark, output_dir, output_to_s3, partitioned_evaluation):
super().__init__()

self.spark = spark
self.output_dir = output_dir
self.output_to_s3 = output_to_s3
if self.output_to_s3:
self.s3_bucket = os.getenv("S3_BUCKET")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we write a readme document or have some documentation that lists all the env variables that a user is required to set

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, will check in a README too.

self.s3 = s3fs.S3FileSystem(
key=os.getenv('AWS_ACCESS_KEY_ID'),
secret=os.getenv('AWS_SECRET_ACCESS_KEY')
)
self.partitioned_evaluation = partitioned_evaluation

def write_to_csv(self, train_edges_df, valid_edges_df, test_edges_df, nodes_df, rels_df, num_partitions):
Expand Down Expand Up @@ -205,15 +214,31 @@ def write_to_binary(self, train_edges_df, valid_edges_df, test_edges_df, nodes_d
tmp_test_file = TMP_DATA_DIRECTORY + "tmp_test_edges.tmp"

print("Converting to binary")
os.rename(train_file, tmp_train_file)
shutil.move(train_file, tmp_train_file)
convert_to_binary(tmp_train_file, train_file)

if valid_edges_df is not None:
os.rename(valid_file, tmp_valid_file)
shutil.move(valid_file, tmp_valid_file)
convert_to_binary(tmp_valid_file, valid_file)

if test_edges_df is not None:
os.rename(test_file, tmp_test_file)
shutil.move(test_file, tmp_test_file)
convert_to_binary(tmp_test_file, test_file)

if self.output_to_s3:
self.upload_files_to_s3()

return dataset_stats

def upload_files_to_s3(self):
dataset_yaml_file = str(self.output_dir / Path("dataset.yaml"))
train_file = str(self.output_dir / Path(PathConstants.train_edges_path))
valid_file = str(self.output_dir / Path(PathConstants.valid_edges_path))
test_file = str(self.output_dir / Path(PathConstants.test_edges_path))

self.s3.put(dataset_yaml_file, str(self.s3_bucket / Path("dataset.yaml")))
self.s3.put(train_file, str(self.s3_bucket / Path(PathConstants.train_edges_path)))
self.s3.put(valid_file, str(self.s3_bucket / Path(PathConstants.valid_edges_path)))
self.s3.put(test_file, str(self.s3_bucket / Path(PathConstants.test_edges_path)))

# we can optionally delete the pre-processed files from the local system