Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add comments for preprocess module #65

Closed
wants to merge 18 commits into from
203 changes: 198 additions & 5 deletions src/python/tools/csv_converter.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
"""Converter for CSV, TSV and TXT dataset files.

This module contains the functions for converting CSV, TSV and TXT dataset
files into Marius input formats.
"""

import argparse
import re
from pathlib import Path
Expand All @@ -9,7 +15,43 @@

def split_dataset(input_dataset, validation_fraction, test_fraction,
entry_regex, num_line_skip, data_cols,
delim, dtype=np.int32):
delim):
"""Splits dataset into training, validation and testing sets.

Splits one input dataset file into training, validation and testing sets
according to the given fractions. During the splitting process, all edges
in the input dataset are randomly sampled into training set, validation
set and testing set according to validation_fraction and test_fraction.
Then only these edges are written to splitted_train_edges.txt,
splitted_valid_edges.txt and splitted_test_edges.txt files in the same
directory of the input dataset file. If either of validation_fraction or
test_fraction is set to zero, the corresponding file will not be created.
The following files are created by this function:
splitted_train_edges.txt: File containing training set edges.
splitted_valid_edges.txt: File containing validation set edges.
splitted_test_edges.txt: File containing testing set edges.

Args:
input_dataset: The path to the original data file to be splitted.
validation_fraction: The proportion of the input dataset that will be
put into the validation set.
test_fraction: The proportion of the input dataset that will be put
into the testing set.
entry_regex: The regular expression of the representation of an edge in
the dataset.
num_line_skip: Number of lines to skip as the header of the dataset
file.
data_cols: A list of index indicating which columns in the dataset file
compose the edges.
delim: The delimiter between two columns in the dataset file.

Returns:
The list of file path to splitted_train_edges.txt,
splitted_valid_edges.txt and splitted_test_edges.txt are returned. In
the meantime, the num_line_skip is set to 0 and data_cols is set to
the first two or three columns based on whether there is relation in
the dataset for the downstream preprocessing operations.
"""
train_fraction = 1 - validation_fraction - test_fraction

assert(train_fraction > 0)
Expand Down Expand Up @@ -84,6 +126,26 @@ def split_dataset(input_dataset, validation_fraction, test_fraction,


def get_header_length(input_file, entry_regex):
"""Detects the number of rows to skip as the file header.

This function counts the number of rows do not contain the substring that
matches the edge regular expression from the start of the file to detects
the number of rows to skip as the file header.

Args:
input_file: The object file for detecting number of header rows.
entry_regex: The regular expression of the representation of an edge in
the dataset.

Returns:
The number of rows to skip as the file header.

Raises:
RuntimeError: An error occurred when the process of detecting file
header length fails. A common failure case is that the file header
also contains the regular expression for edges. In this case,
number of rows to skip as file header should be manually set.
"""
num_line_skip = 0
with open(input_file, 'r') as f:
n = 0
Expand All @@ -99,15 +161,33 @@ def get_header_length(input_file, entry_regex):
raise RuntimeError("Please give number of rows to skip " +
"at file header.")

if a == n:
raise RuntimeWarning("No nodes detected, dataset format may " +
"be incorrect.")

return num_line_skip


def check_given_num_line_skip_start_col(input_file, num_line_skip, data_cols,
delim, start_col):
"""Check if the given combination of num_line_skip and start_col is valid.

This function splits the first row after the file header with the given
delimiter and check if start_col index is within the valid range (less than
the number of tokens splitted).

Args:
input_file: A dataset file used to check the validity of the given
combination of num_line_skip and start_col.
num_line_skip: Number of lines to skip as the header of the dataset
file.
data_cols: A list of index indicating which columns in the dataset file
compose the edges.
delim: The delimiter between two columns in the dataset file.
start_col: The index of the first column of the edge representations in
the dataset file.

Returns:
True if the given combination of num_line_skip and start_col is valid.
False if the given combination of num_line_skip and start_col is not
valid.
"""
with open(input_file, 'r') as f:
for i in range(num_line_skip):
line = next(f)
Expand All @@ -121,6 +201,22 @@ def check_given_num_line_skip_start_col(input_file, num_line_skip, data_cols,


def partition_edges(edges, num_partitions, num_nodes):
"""Split the nodes into num_partitions partitions.

In the case of large scale graphs that have an embedding table which
AnzeXie marked this conversation as resolved.
Show resolved Hide resolved
exceeds CPU memory capacity, this function can partition the graph nodes
uniformly into num_partitions partitions and group the edges into edge
buckets, see partition_scheme for more details.

Args:
edges: All edges of original dataset.
num_partitions: The number of graph partitions that the graph nodes are
uniformly partitioned into.
num_nodes: The total number of nodes.

Returns:
Reordered edges and a list of offsets indicating node partitions.
"""
partition_size = int(np.ceil(num_nodes / num_partitions))
src_partitions = edges[:, 0] // partition_size
dst_partitions = edges[:, 2] // partition_size
Expand All @@ -135,6 +231,30 @@ def partition_edges(edges, num_partitions, num_nodes):


def join_files(files, regex, num_line_skip, data_cols, delim):
"""Joins multiple dataset files into one dataset file

Joins edges from multiple dataset files into one dataset file. This
function should only be called when there are more than one file. During
the process of joining, only edges from each dataset file is extracted and
then written to joined_file.txt.
The following files are created by this function:
joined_file.txt: The file contains all edges from the current dataset.

Args:
files: A list of dataset files to be joined.
regex: The regular expression of the representation of an edge in the
dataset.
num_line_skip: Number of lines to skip as the header of the dataset
file.
data_cols: A list of index indicating which columns in the dataset file
compose the edges.
delim: The delimiter between two columns in the dataset file.

Retruns:
AnzeXie marked this conversation as resolved.
Show resolved Hide resolved
The joint file is returned as a list of one file. Meaning while,
num_line_skip is set to zero and data_cols is set to the first two or
three columns depends on if the edges in the dataset has relations.
"""
assert(len(files) > 1)
base_path = "/".join(files[0].split("/")[:-1])
joined_file = base_path + "/joined_file.txt"
Expand All @@ -160,6 +280,74 @@ def join_files(files, regex, num_line_skip, data_cols, delim):
def general_parser(files, format, output_dir, delim="", num_partitions=1,
dtype=np.int32, remap_ids=True, dataset_split=(-1, -1),
start_col=0, num_line_skip=None):
"""Parses dataset in the format of CSV, TSV and TXT to marius input format.

This function retrieves all edges from given dataset file. Each node and
edge_type is randomly assigned an integer id. The mappings from these
integer ids to the original ids are stored in node_mapping.txt and
rel_mapping.txt.
The original edges list is converted to an [|E|, 3] int32 tensor, shuffled and
then the contents of the tensor are written to the train_edges.pt file
and/or valid_edges.pt and test_edges.pt depend on dataset_split.
The following files are created by this function:
train_edges.pt: Dump of tensor memory for edges in the training set.
valid_edges.pt: Dump of tensor memroy for edges in the validation set.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

memroy->memory

test_edges.pt: Dump of tensor memroy for edges in the testing set.
node_mapping.txt: Mapping of original node ids to unique int32 ids.
rel_mapping.txt: Mapping of original edge_type ids to unique int32 ids.

If num_partitions is set to a value greater than one, then the
following file is also created:
train_edges_partitions.txt: text file with num_partitions^2 lines,
where each line denotes the size of an edge bucket

Args:
files: The list of original dataset files. If there are three files,
they are treated as training, validation and testing set based on
their order by default (if dataset_split is not set).
format: A string denotes the order of edge components. The value of
this string can only be "s" for source nodes, "r" for relation,
"d" for destination nodes. The length of this string can be two or
three depends on if the edges have relations.
output_dir: The directory where all the files created will be stored.
delim: The delimiter between two columns in the dataset file.
num_partitions: The number of graph partitions that the graph nodes are
uniformly partitioned into.
dtype: The data type of the edge list. The common values for this
argument is np.int32 or np.int64. If there are less then 2 billion
nodes (which is almost every dataset), int32 should be used. If the
value is set to np.int32, then each edge takes 3*4/2*4 bytes of
space to store. In the case of np.int64, each edge takes 3*8/2*8
bytes of space to store.
remap_ids: Whether to assign node and relations random ids or
sequential ids based on their order in original dataset file.
dataset_split: The proportion of the input data that will be used for
validation and testing during training. The argument takes a tuple
of length two where the first value is the proportion of validation
set and the second value is the proportion of testing set.
start_col: The index of the first column of the edge representations in
the dataset file.
num_line_skip: Number of lines to skip as the header of the dataset
file.

Returns:
The created files described above will be stored into the output_dir
directory. Statistics of the preprocessed dataset are put into a list
and returned. These statistics are placed in the following order:
number of edges in train_edges.pt, number of edges in valid_edges.pt,
number of edges in test_edgs.pt, number of relations, and number of
nodes. These statistics are also printed to the terminal.

Raises:
RuntimeError: An error occurred when the denotation of source node "s"
or destination node "d" is not found in the value of argument
format.
This error also occurred if the delimiter given or the delimiter
detected is not correct. In this case, a new delimiter should be
assigned manually.
Detailed helper messages indicating the possible causes are printed
when this error is raised.
"""
assert(len(files) != 0), "Number of data files cannot be 0"
assert(len(format) == 1), "Format is specified incorrectly"
assert((start_col == 0) or
Expand Down Expand Up @@ -382,6 +570,11 @@ def general_parser(files, format, output_dir, delim="", num_partitions=1,


def set_args():
"""Sets command line arguments for this csv_converter modules.

Returns:
A dict containing all command line arguments and their values.
"""
parser = argparse.ArgumentParser(
description='csv converter', prog='csv_converter',
formatter_class=argparse.RawTextHelpFormatter)
Expand Down
Loading