diff --git a/simulator/__init__.py b/simulator/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/simulator/configs/arvix_linear.json b/simulator/configs/arvix_linear.json new file mode 100644 index 00000000..3b1b1adf --- /dev/null +++ b/simulator/configs/arvix_linear.json @@ -0,0 +1,10 @@ +{ + "dataset_name" : "ogbn_arxiv", + "features_stats" : { + "featurizer_type" : "default", + "page_size" : "16.384 KB", + "feature_dimension" : 128, + "feature_size" : "float32" + }, + "sampling_depth" : 2 +} \ No newline at end of file diff --git a/simulator/configs/arvix_linear_in_mem.json b/simulator/configs/arvix_linear_in_mem.json new file mode 100644 index 00000000..cce9a188 --- /dev/null +++ b/simulator/configs/arvix_linear_in_mem.json @@ -0,0 +1,10 @@ +{ + "dataset_name" : "ogbn_arxiv", + "features_stats" : { + "featurizer_type" : "default", + "page_size" : "16.384 KB", + "feature_dimension" : 128, + "feature_size" : "float32" + }, + "top_percent_in_mem" : 1 +} \ No newline at end of file diff --git a/simulator/configs/arvix_metis.json b/simulator/configs/arvix_metis.json new file mode 100644 index 00000000..a30a4594 --- /dev/null +++ b/simulator/configs/arvix_metis.json @@ -0,0 +1,9 @@ +{ + "dataset_name" : "ogbn_arxiv", + "features_stats" : { + "featurizer_type" : "metis_partition", + "page_size" : "16.384 KB", + "feature_dimension" : 128, + "feature_size" : "float32" + } +} \ No newline at end of file diff --git a/simulator/configs/arvix_neighbors.json b/simulator/configs/arvix_neighbors.json new file mode 100644 index 00000000..9442d2ee --- /dev/null +++ b/simulator/configs/arvix_neighbors.json @@ -0,0 +1,9 @@ +{ + "dataset_name" : "ogbn_arxiv", + "features_stats" : { + "featurizer_type" : "neighbors_nearby", + "page_size" : "16.384 KB", + "feature_dimension" : 128, + "feature_size" : "float32" + } +} \ No newline at end of file diff --git a/simulator/configs/arvix_random.json b/simulator/configs/arvix_random.json new file mode 100644 index 00000000..ae7963ff --- /dev/null +++ b/simulator/configs/arvix_random.json @@ -0,0 +1,9 @@ +{ + "dataset_name" : "ogbn_arxiv", + "features_stats" : { + "feature_layout" : "random", + "page_size" : "16 KB", + "feature_dimension" : 128, + "feature_size" : "float32" + } +} \ No newline at end of file diff --git a/simulator/configs/papers.json b/simulator/configs/papers.json new file mode 100644 index 00000000..a0378734 --- /dev/null +++ b/simulator/configs/papers.json @@ -0,0 +1,3 @@ +{ + "dataset_name" : "ogbn_papers100m" +} \ No newline at end of file diff --git a/simulator/configs/products_linear.json b/simulator/configs/products_linear.json new file mode 100644 index 00000000..380ee78c --- /dev/null +++ b/simulator/configs/products_linear.json @@ -0,0 +1,10 @@ +{ + "dataset_name" : "ogbn_products", + "features_stats" : { + "featurizer_type" : "default", + "page_size" : "16.384 KB", + "feature_dimension" : 100, + "feature_size" : "float32" + }, + "sampling_depth" : 1 +} \ No newline at end of file diff --git a/simulator/main.py b/simulator/main.py new file mode 100644 index 00000000..3fd6402f --- /dev/null +++ b/simulator/main.py @@ -0,0 +1,79 @@ +import os +import json +import argparse +import random +import time + +from src.dataset_loader import * +from src.features_loader import * +from src.sampler import * +from src.visualizer import * + +def read_config_file(config_file): + with open(config_file, "r") as reader: + return json.load(reader) + + +def read_arguments(): + parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) + parser.add_argument("--config_file", type=str, help="The config file containing the details for the simulation") + parser.add_argument("--save_path", required=True, type=str, help="The path to save the resulting image to") + parser.add_argument("--graph_title", required=True, type=str, help="The title of the saved graph") + parser.add_argument("--num_nodes", default = -1, type = int, help = "The number of nodes we want in our sample") + parser.add_argument("--log_rate", type=int, default=20, help="Log rate of the nodes processed") + return parser.parse_args() + +def main(): + start_time = time.time() + arguments = read_arguments() + config = read_config_file(arguments.config_file) + + # Create the loaders + data_loader = DatasetLoader(config) + print(data_loader.get_num_nodes(), data_loader.get_num_edges()) + ''' + features_loader = get_featurizer(data_loader, config["features_stats"]) + sampler = SubgraphSampler(data_loader, features_loader, config) + print("Finished loading all objects") + + # Perform sampling + nodes_to_sample = [i for i in range(data_loader.get_num_nodes())] + random.shuffle(nodes_to_sample) + if arguments.num_nodes > 0: + nodes_to_sample = nodes_to_sample[ : arguments.num_nodes] + log_rate = int(len(nodes_to_sample) / arguments.log_rate) + + pages_loaded = [] + for curr_node in nodes_to_sample: + num_pages_read = sampler.perform_sampling_for_node(curr_node) + if num_pages_read > 0: + pages_loaded.append(num_pages_read) + + if len(pages_loaded) > 0 and len(pages_loaded) % log_rate == 0: + percentage_finished = (100.0 * len(pages_loaded)) / len(nodes_to_sample) + print("Finished processing", round(percentage_finished), "percent of nodes") + + # Get the arguments to log + vals_to_log = dict() + for curr_obj in [data_loader, features_loader, sampler]: + vals_to_log.update(curr_obj.get_values_to_log()) + + # Log the time taken + total_time = time.time() - start_time + print("Processed all", len(nodes_to_sample), "nodes in", total_time, "seconds") + + # Save the histogram + os.makedirs(os.path.dirname(arguments.save_path), exist_ok=True) + visualize_arguments = { + "pages_loaded": pages_loaded, + "save_path": arguments.save_path, + "graph_title": arguments.graph_title, + "depth" : config["sampling_depth"], + "dataset_name": config["dataset_name"], + "values_to_log": vals_to_log, + } + visualize_results(visualize_arguments) + ''' + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/simulator/src/__init__.py b/simulator/src/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/simulator/src/dataset_loader.py b/simulator/src/dataset_loader.py new file mode 100644 index 00000000..2d65e0b6 --- /dev/null +++ b/simulator/src/dataset_loader.py @@ -0,0 +1,82 @@ +import subprocess +import os +import numpy as np +import torch +from collections import defaultdict +import marius.storage + +class DatasetLoader: + SAVE_DIR = "datasets" + EDGES_PATH = "edges/train_edges.bin" + + def __init__(self, config): + self.name = config["dataset_name"] + self.sampling_depth = config["sampling_depth"] + os.makedirs(DatasetLoader.SAVE_DIR, exist_ok=True) + self.save_dir = os.path.join(DatasetLoader.SAVE_DIR, self.name) + if not os.path.exists(self.save_dir): + self.create_dataset() + self.load_dataset() + + def create_dataset(self): + command_to_run = f"marius_preprocess --dataset {self.name} --output_directory {self.save_dir}" + print("Running command", command_to_run) + subprocess.check_output(command_to_run, shell=True) + + def load_dataset(self): + # Load the file + edges_path = os.path.join(self.save_dir, DatasetLoader.EDGES_PATH) + with open(edges_path, "rb") as reader: + edges_bytes = reader.read() + + # Create the adjacency map + edges_flaten_arr = np.frombuffer(edges_bytes, dtype=np.int32) + edges_arr = edges_flaten_arr.reshape((-1, 2)) + + ''' + # Create the graph + self.edge_list = torch.from_numpy(edges_arr) + self.nodes = torch.unique(self.edge_list) + self.current_graph = MariusGraph(self.edge_list, self.edge_list[torch.argsort(edge_list[:, -1])], self.get_num_nodes()) + self.sampler = LayeredNeighborSampler(full_graph, [-1 for _ in range(self.sampling_depth)]) + + # Neighbors cache + ''' + + def get_num_nodes(self): + return self.nodes.shape(0) + + def get_neigbhors_for_node(self, node_id, all_depths = False): + if node_id not in self.adjacency_map: + return [] + + return list(self.adjacency_map[node_id]) + + def get_incoming_neighbors(self, node_id): + if node_id not in self.num_incoming_edges: + return 0 + + return self.num_incoming_edges[node_id] + + def get_num_edges(self): + return self.edge_list.shape(0) + + def get_average_neighbors(self): + neighbors_count = [] + for node_neighbors in self.adjacency_map.values(): + neighbors_count.append(len(node_neighbors)) + + return np.mean(np.array(neighbors_count)) + + def get_average_incoming(self): + incoming_counts = [] + for num_incoming in self.num_incoming_edges.values(): + incoming_counts.append(num_incoming) + + return np.mean(np.array(incoming_counts)) + + def get_values_to_log(self): + return { + "Average Node Out Degree": str(round(self.get_average_neighbors(), 2)), + "Average Node In Degree": str(round(self.get_average_incoming(), 2)), + } diff --git a/simulator/src/features_loader.py b/simulator/src/features_loader.py new file mode 100644 index 00000000..03cd7528 --- /dev/null +++ b/simulator/src/features_loader.py @@ -0,0 +1,134 @@ +import humanfriendly +import os +import math +import random +import numpy as np +import metis +import networkx as nx + + +class FeaturesLoader: + def __init__(self, data_loader, features_stat): + self.data_loader = data_loader + self.features_stat = features_stat + self.page_size = humanfriendly.parse_size(features_stat["page_size"]) + self.feature_size = np.dtype(features_stat["feature_size"]).itemsize + self.node_feature_size = self.feature_size * features_stat["feature_dimension"] + self.nodes_per_page = max(int(self.page_size / self.node_feature_size), 1) + self.initialize() + + def initialize(self): + total_nodes = self.data_loader.get_num_nodes() + print("Total nodes of", total_nodes) + self.total_pages = int(math.ceil(total_nodes / (1.0 * self.nodes_per_page))) + self.node_location_map = [i for i in range(self.total_pages)] + if "feature_layout" in self.features_stat and self.features_stat["feature_layout"] == "random": + random.shuffle(self.node_location_map) + + def get_node_page(self, src_node, neighbor_node): + print("Querying for node", neighbor_node, "with total of", len(self.node_location_map), "nodes") + start_node = int(self.node_location_map[neighbor_node] / self.nodes_per_page) + curr_page_nodes = set(range(start_node, start_node + self.nodes_per_page)) + return curr_page_nodes + + def get_single_node_feature_size(self): + return self.node_feature_size + + def get_nodes_per_page(self): + return self.nodes_per_page + + def get_page_size(self): + return self.page_size + + def get_total_file_size(self): + total_bytes = self.page_size * self.total_pages + return humanfriendly.format_size(total_bytes) + + def get_values_to_log(self): + return {"Nodes per Page": self.nodes_per_page, "Total File Size": self.get_total_file_size()} + + +class NeighborFeaturesLoader(FeaturesLoader): + def __init__(self, data_loader, features_stat): + super().__init__(data_loader, features_stat) + + def initialize(self): + total_nodes = self.data_loader.get_num_nodes() + self.total_pages = total_nodes + num_neighbors = self.nodes_per_page - 1 + + self.neighbors_in_page = {} + for curr_node in range(total_nodes): + all_neighbors = self.data_loader.get_neigbhors_for_node(curr_node) + neighbors_to_keep = min(len(all_neighbors), num_neighbors) + self.neighbors_in_page[curr_node] = all_neighbors[:neighbors_to_keep] + + def get_page_for_node(self, node): + page_nodes = set() + page_nodes.add(node) + for neighbor_node in self.neighbors_in_page[node]: + page_nodes.add(neighbor_node) + return page_nodes + + def get_node_page(self, src_node, neighbor_node): + if neighbor_node in self.neighbors_in_page[src_node]: + return self.get_page_for_node(src_node) + return self.get_page_for_node(neighbor_node) + + +class MetisPartitionLoader(FeaturesLoader): + def __init__(self, data_loader, features_stat): + super().__init__(data_loader, features_stat) + + def initialize(self): + total_nodes = self.data_loader.get_num_nodes() + num_partitions = int(math.ceil(total_nodes / (1.0 * self.nodes_per_page))) + + # Create the adjancency list + edge_list = [] + for node_id in range(total_nodes): + for neighbor in self.data_loader.get_neigbhors_for_node(node_id): + edge_list.append((node_id, neighbor)) + graph = nx.Graph(edge_list) + + # Partition the graph and store the result + (_, parts) = metis.part_graph(graph, num_partitions) + self.node_to_page = parts + self.page_nodes = {} + self.additional_page = max(self.node_to_page) + 1 + + # Determine the nodes in each page + for node_idx, node_page in enumerate(self.node_to_page): + # Determine if we need to spill over + if node_page in self.page_nodes and len(self.page_nodes[node_page]) >= self.nodes_per_page: + node_page = self.additional_page + self.node_to_page[node_idx] = node_page + + # Record the node for this page + if node_page not in self.page_nodes: + self.page_nodes[node_page] = set() + self.page_nodes[node_page].add(node_idx) + assert len(self.page_nodes[node_page]) <= self.nodes_per_page + + # Move to the next additonal page + if node_page == self.additional_page and len(self.page_nodes[node_page]) >= self.nodes_per_page: + self.additional_page += 1 + + self.total_pages = len(self.page_nodes) + print("Finished metis partitioning") + + def get_node_page(self, src_node, neighbor_node): + return self.page_nodes[self.node_to_page[neighbor_node]] + +features_class_map = { + "default": FeaturesLoader, + "neighbors_nearby": NeighborFeaturesLoader, + "metis_partition": MetisPartitionLoader, +} + +def get_featurizer(data_loader, features_stat): + featurizer_type = features_stat["featurizer_type"] + if featurizer_type not in features_class_map: + raise Exception("Invalid featurizer type of " + str(featurizer_type)) + + return features_class_map[featurizer_type](data_loader, features_stat) diff --git a/simulator/src/in_mem_storage.py b/simulator/src/in_mem_storage.py new file mode 100644 index 00000000..06163013 --- /dev/null +++ b/simulator/src/in_mem_storage.py @@ -0,0 +1,26 @@ +import heapq + + +class InMemoryStorage: + def __init__(self, data_loader, percent_in_memory): + self.percent_in_memory = percent_in_memory + total_nodes = data_loader.get_num_nodes() + nodes_in_mem = int((total_nodes * self.percent_in_memory) / 100.0) + + # Get the top nodes based on incoming neighbors + heap = [] + for node_id in range(total_nodes): + num_incoming = data_loader.get_incoming_neighbors(node_id) + heapq.heappush(heap, (num_incoming, node_id)) + + top_pairs = heapq.nlargest(nodes_in_mem, heap) + self.in_memory_nodes = set([pair[1] for pair in top_pairs]) + + def node_in_mem_storage(self, node_id): + return node_id in self.in_memory_nodes + + def get_percentage_in_mem(self): + return self.percent_in_memory + + def in_mem_nodes_count(self): + return len(self.in_memory_nodes) diff --git a/simulator/src/sampler.py b/simulator/src/sampler.py new file mode 100644 index 00000000..e0534257 --- /dev/null +++ b/simulator/src/sampler.py @@ -0,0 +1,62 @@ +from .in_mem_storage import * +import humanfriendly +import math + + +class SubgraphSampler: + def __init__(self, data_loader, features_loader, config): + self.data_loader = data_loader + self.features_loader = features_loader + self.in_memory_storage = None + if "top_percent_in_mem" in config: + self.in_memory_storage = InMemoryStorage(data_loader, config["top_percent_in_mem"]) + + self.nodes_loaded = set() + self.pages_loaded = 0 + self.depth = config["sampling_depth"] + + def reset(self): + self.nodes_loaded.clear() + self.pages_loaded = 0 + + def is_node_in_mem(self, node_id): + return self.in_memory_storage is not None and self.in_memory_storage.node_in_mem_storage(node_id) + + def get_node_features(self, src_node, neighbor_node): + if neighbor_node in self.nodes_loaded or self.is_node_in_mem(neighbor_node): + return + + self.nodes_loaded.update(self.features_loader.get_node_page(src_node, neighbor_node)) + self.pages_loaded += 1 + + def perform_sampling_for_node(self, node_id): + # Read for this node + self.reset() + + # Perform bfs + curr_queue = [(node_id, node_id)] + curr_depth = 0 + while curr_depth <= self.depth and len(curr_queue) > 0: + # Get all of the nodes in the level + level_nodes = len(curr_queue) + for _ in range(level_nodes): + src_node, curr_node = curr_queue.pop(0) + self.get_node_features(src_node, curr_node) + for neighbor in self.data_loader.get_neigbhors_for_node(curr_node): + curr_queue.append((curr_node, neighbor)) + + # Move to the next level + curr_depth += 1 + + return self.pages_loaded + + def get_values_to_log(self): + values_to_return = {} + if self.in_memory_storage is not None: + nodes_in_memory = self.in_memory_storage.in_mem_nodes_count() + values_to_return["Percentage Nodes In Memory"] = self.in_memory_storage.get_percentage_in_mem() + in_mem_pages = int(math.ceil(nodes_in_memory / self.features_loader.get_nodes_per_page())) + all_pages_size = humanfriendly.format_size(in_mem_pages * self.features_loader.get_page_size()) + values_to_return["In Memory Space Used"] = all_pages_size + + return values_to_return diff --git a/simulator/src/visualizer.py b/simulator/src/visualizer.py new file mode 100644 index 00000000..0a32aa52 --- /dev/null +++ b/simulator/src/visualizer.py @@ -0,0 +1,51 @@ +import matplotlib.pyplot as plt +import os +import numpy as np + + +def visualize_results(visualize_args, num_bins=60, write_location=(0.7, 0.6)): + # Get the number of pages read + pages_loaded = visualize_args["pages_loaded"] + np_arr = np.array(pages_loaded) + page_mean = round(np.mean(np_arr), 2) + page_std = round(np.std(np_arr), 2) + pages_upper_bound = int(np.percentile(pages_loaded, 99)) + + # Create the histogram + plt.figure() + plt.ecdf(pages_loaded, label="CDF") + plt.hist(pages_loaded, bins=num_bins, histtype="step", density=True, cumulative=True, label="Cumulative histogram") + plt.xlabel("Number of pages loaded for node inference") + plt.ylabel("Percentage of nodes") + title = visualize_args["graph_title"] + " for dataset " + visualize_args["dataset_name"] + " (Sampling depth = " + str(visualize_args["depth"]) + ")" + plt.title(title, fontsize = 10) + plt.xlim((0, pages_upper_bound)) + plt.legend() + + # Write some resulting text + txt_lines = ["Mean Pages Loaded: " + str(page_mean), "Std dev of Pages Loaded: " + str(page_std)] + if "values_to_log" in visualize_args: + vals_to_log = visualize_args["values_to_log"] + for key, value in vals_to_log.items(): + txt_lines.append(str(key).strip() + ": " + str(value).strip()) + + text_to_write = "Key Metrics:\n" + "\n".join(txt_lines) + + # Get the current axis limits + xlim = plt.xlim() + ylim = plt.ylim() + actual_x = write_location[0] * (xlim[1] - xlim[0]) + xlim[0] + actual_y = write_location[1] * (ylim[1] - ylim[0]) + ylim[0] + plt.text( + actual_x, + actual_y, + text_to_write, + fontsize=10, + horizontalalignment="center", + verticalalignment="center", + bbox=dict(facecolor="red", alpha=0.5), + ) + + # Save the result + plt.tight_layout() + plt.savefig(visualize_args["save_path"]) diff --git a/simulator/temp.py b/simulator/temp.py new file mode 100644 index 00000000..8120489b --- /dev/null +++ b/simulator/temp.py @@ -0,0 +1,2 @@ +values = ['cmake', '/working_dir', '-DCMAKE_LIBRARY_OUTPUT_DIRECTORY=/working_dir/build/lib.linux-x86_64-3.10/marius', '-DPYTHON_EXECUTABLE=/usr/bin/python3', '-DCMAKE_BUILD_TYPE=Release', '-DCMAKE_BUILD_WITH_INSTALL_RPATH=TRUE', '-DCMAKE_INSTALL_RPATH_USE_LINK_PATH=TRUE', '-DCMAKE_INSTALL_RPATH=$ORIGIN'] +print(" ".join(values)) \ No newline at end of file