Skip to content

Commit

Permalink
Added simulator code
Browse files Browse the repository at this point in the history
  • Loading branch information
Devesh Sarda committed Feb 21, 2024
1 parent 2f27ffe commit 84f9198
Show file tree
Hide file tree
Showing 16 changed files with 496 additions and 0 deletions.
Empty file added simulator/__init__.py
Empty file.
10 changes: 10 additions & 0 deletions simulator/configs/arvix_linear.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"dataset_name" : "ogbn_arxiv",
"features_stats" : {
"featurizer_type" : "default",
"page_size" : "16.384 KB",
"feature_dimension" : 128,
"feature_size" : "float32"
},
"sampling_depth" : 2
}
10 changes: 10 additions & 0 deletions simulator/configs/arvix_linear_in_mem.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"dataset_name" : "ogbn_arxiv",
"features_stats" : {
"featurizer_type" : "default",
"page_size" : "16.384 KB",
"feature_dimension" : 128,
"feature_size" : "float32"
},
"top_percent_in_mem" : 1
}
9 changes: 9 additions & 0 deletions simulator/configs/arvix_metis.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"dataset_name" : "ogbn_arxiv",
"features_stats" : {
"featurizer_type" : "metis_partition",
"page_size" : "16.384 KB",
"feature_dimension" : 128,
"feature_size" : "float32"
}
}
9 changes: 9 additions & 0 deletions simulator/configs/arvix_neighbors.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"dataset_name" : "ogbn_arxiv",
"features_stats" : {
"featurizer_type" : "neighbors_nearby",
"page_size" : "16.384 KB",
"feature_dimension" : 128,
"feature_size" : "float32"
}
}
9 changes: 9 additions & 0 deletions simulator/configs/arvix_random.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"dataset_name" : "ogbn_arxiv",
"features_stats" : {
"feature_layout" : "random",
"page_size" : "16 KB",
"feature_dimension" : 128,
"feature_size" : "float32"
}
}
3 changes: 3 additions & 0 deletions simulator/configs/papers.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"dataset_name" : "ogbn_papers100m"
}
10 changes: 10 additions & 0 deletions simulator/configs/products_linear.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"dataset_name" : "ogbn_products",
"features_stats" : {
"featurizer_type" : "default",
"page_size" : "16.384 KB",
"feature_dimension" : 100,
"feature_size" : "float32"
},
"sampling_depth" : 1
}
79 changes: 79 additions & 0 deletions simulator/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import os
import json
import argparse
import random
import time

from src.dataset_loader import *
from src.features_loader import *
from src.sampler import *
from src.visualizer import *

def read_config_file(config_file):
with open(config_file, "r") as reader:
return json.load(reader)


def read_arguments():
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument("--config_file", type=str, help="The config file containing the details for the simulation")
parser.add_argument("--save_path", required=True, type=str, help="The path to save the resulting image to")
parser.add_argument("--graph_title", required=True, type=str, help="The title of the saved graph")
parser.add_argument("--num_nodes", default = -1, type = int, help = "The number of nodes we want in our sample")
parser.add_argument("--log_rate", type=int, default=20, help="Log rate of the nodes processed")
return parser.parse_args()

def main():
start_time = time.time()
arguments = read_arguments()
config = read_config_file(arguments.config_file)

# Create the loaders
data_loader = DatasetLoader(config)
print(data_loader.get_num_nodes(), data_loader.get_num_edges())
'''
features_loader = get_featurizer(data_loader, config["features_stats"])
sampler = SubgraphSampler(data_loader, features_loader, config)
print("Finished loading all objects")
# Perform sampling
nodes_to_sample = [i for i in range(data_loader.get_num_nodes())]
random.shuffle(nodes_to_sample)
if arguments.num_nodes > 0:
nodes_to_sample = nodes_to_sample[ : arguments.num_nodes]
log_rate = int(len(nodes_to_sample) / arguments.log_rate)
pages_loaded = []
for curr_node in nodes_to_sample:
num_pages_read = sampler.perform_sampling_for_node(curr_node)
if num_pages_read > 0:
pages_loaded.append(num_pages_read)
if len(pages_loaded) > 0 and len(pages_loaded) % log_rate == 0:
percentage_finished = (100.0 * len(pages_loaded)) / len(nodes_to_sample)
print("Finished processing", round(percentage_finished), "percent of nodes")
# Get the arguments to log
vals_to_log = dict()
for curr_obj in [data_loader, features_loader, sampler]:
vals_to_log.update(curr_obj.get_values_to_log())
# Log the time taken
total_time = time.time() - start_time
print("Processed all", len(nodes_to_sample), "nodes in", total_time, "seconds")
# Save the histogram
os.makedirs(os.path.dirname(arguments.save_path), exist_ok=True)
visualize_arguments = {
"pages_loaded": pages_loaded,
"save_path": arguments.save_path,
"graph_title": arguments.graph_title,
"depth" : config["sampling_depth"],
"dataset_name": config["dataset_name"],
"values_to_log": vals_to_log,
}
visualize_results(visualize_arguments)
'''

if __name__ == "__main__":
main()
Empty file added simulator/src/__init__.py
Empty file.
82 changes: 82 additions & 0 deletions simulator/src/dataset_loader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import subprocess
import os
import numpy as np
import torch
from collections import defaultdict
import marius.storage

class DatasetLoader:
SAVE_DIR = "datasets"
EDGES_PATH = "edges/train_edges.bin"

def __init__(self, config):
self.name = config["dataset_name"]
self.sampling_depth = config["sampling_depth"]
os.makedirs(DatasetLoader.SAVE_DIR, exist_ok=True)
self.save_dir = os.path.join(DatasetLoader.SAVE_DIR, self.name)
if not os.path.exists(self.save_dir):
self.create_dataset()
self.load_dataset()

def create_dataset(self):
command_to_run = f"marius_preprocess --dataset {self.name} --output_directory {self.save_dir}"
print("Running command", command_to_run)
subprocess.check_output(command_to_run, shell=True)

def load_dataset(self):
# Load the file
edges_path = os.path.join(self.save_dir, DatasetLoader.EDGES_PATH)
with open(edges_path, "rb") as reader:
edges_bytes = reader.read()

# Create the adjacency map
edges_flaten_arr = np.frombuffer(edges_bytes, dtype=np.int32)
edges_arr = edges_flaten_arr.reshape((-1, 2))

'''
# Create the graph
self.edge_list = torch.from_numpy(edges_arr)
self.nodes = torch.unique(self.edge_list)
self.current_graph = MariusGraph(self.edge_list, self.edge_list[torch.argsort(edge_list[:, -1])], self.get_num_nodes())
self.sampler = LayeredNeighborSampler(full_graph, [-1 for _ in range(self.sampling_depth)])
# Neighbors cache
'''

def get_num_nodes(self):
return self.nodes.shape(0)

def get_neigbhors_for_node(self, node_id, all_depths = False):
if node_id not in self.adjacency_map:
return []

return list(self.adjacency_map[node_id])

def get_incoming_neighbors(self, node_id):
if node_id not in self.num_incoming_edges:
return 0

return self.num_incoming_edges[node_id]

def get_num_edges(self):
return self.edge_list.shape(0)

def get_average_neighbors(self):
neighbors_count = []
for node_neighbors in self.adjacency_map.values():
neighbors_count.append(len(node_neighbors))

return np.mean(np.array(neighbors_count))

def get_average_incoming(self):
incoming_counts = []
for num_incoming in self.num_incoming_edges.values():
incoming_counts.append(num_incoming)

return np.mean(np.array(incoming_counts))

def get_values_to_log(self):
return {
"Average Node Out Degree": str(round(self.get_average_neighbors(), 2)),
"Average Node In Degree": str(round(self.get_average_incoming(), 2)),
}
134 changes: 134 additions & 0 deletions simulator/src/features_loader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
import humanfriendly
import os
import math
import random
import numpy as np
import metis
import networkx as nx


class FeaturesLoader:
def __init__(self, data_loader, features_stat):
self.data_loader = data_loader
self.features_stat = features_stat
self.page_size = humanfriendly.parse_size(features_stat["page_size"])
self.feature_size = np.dtype(features_stat["feature_size"]).itemsize
self.node_feature_size = self.feature_size * features_stat["feature_dimension"]
self.nodes_per_page = max(int(self.page_size / self.node_feature_size), 1)
self.initialize()

def initialize(self):
total_nodes = self.data_loader.get_num_nodes()
print("Total nodes of", total_nodes)
self.total_pages = int(math.ceil(total_nodes / (1.0 * self.nodes_per_page)))
self.node_location_map = [i for i in range(self.total_pages)]
if "feature_layout" in self.features_stat and self.features_stat["feature_layout"] == "random":
random.shuffle(self.node_location_map)

def get_node_page(self, src_node, neighbor_node):
print("Querying for node", neighbor_node, "with total of", len(self.node_location_map), "nodes")
start_node = int(self.node_location_map[neighbor_node] / self.nodes_per_page)
curr_page_nodes = set(range(start_node, start_node + self.nodes_per_page))
return curr_page_nodes

def get_single_node_feature_size(self):
return self.node_feature_size

def get_nodes_per_page(self):
return self.nodes_per_page

def get_page_size(self):
return self.page_size

def get_total_file_size(self):
total_bytes = self.page_size * self.total_pages
return humanfriendly.format_size(total_bytes)

def get_values_to_log(self):
return {"Nodes per Page": self.nodes_per_page, "Total File Size": self.get_total_file_size()}


class NeighborFeaturesLoader(FeaturesLoader):
def __init__(self, data_loader, features_stat):
super().__init__(data_loader, features_stat)

def initialize(self):
total_nodes = self.data_loader.get_num_nodes()
self.total_pages = total_nodes
num_neighbors = self.nodes_per_page - 1

self.neighbors_in_page = {}
for curr_node in range(total_nodes):
all_neighbors = self.data_loader.get_neigbhors_for_node(curr_node)
neighbors_to_keep = min(len(all_neighbors), num_neighbors)
self.neighbors_in_page[curr_node] = all_neighbors[:neighbors_to_keep]

def get_page_for_node(self, node):
page_nodes = set()
page_nodes.add(node)
for neighbor_node in self.neighbors_in_page[node]:
page_nodes.add(neighbor_node)
return page_nodes

def get_node_page(self, src_node, neighbor_node):
if neighbor_node in self.neighbors_in_page[src_node]:
return self.get_page_for_node(src_node)
return self.get_page_for_node(neighbor_node)


class MetisPartitionLoader(FeaturesLoader):
def __init__(self, data_loader, features_stat):
super().__init__(data_loader, features_stat)

def initialize(self):
total_nodes = self.data_loader.get_num_nodes()
num_partitions = int(math.ceil(total_nodes / (1.0 * self.nodes_per_page)))

# Create the adjancency list
edge_list = []
for node_id in range(total_nodes):
for neighbor in self.data_loader.get_neigbhors_for_node(node_id):
edge_list.append((node_id, neighbor))
graph = nx.Graph(edge_list)

# Partition the graph and store the result
(_, parts) = metis.part_graph(graph, num_partitions)
self.node_to_page = parts
self.page_nodes = {}
self.additional_page = max(self.node_to_page) + 1

# Determine the nodes in each page
for node_idx, node_page in enumerate(self.node_to_page):
# Determine if we need to spill over
if node_page in self.page_nodes and len(self.page_nodes[node_page]) >= self.nodes_per_page:
node_page = self.additional_page
self.node_to_page[node_idx] = node_page

# Record the node for this page
if node_page not in self.page_nodes:
self.page_nodes[node_page] = set()
self.page_nodes[node_page].add(node_idx)
assert len(self.page_nodes[node_page]) <= self.nodes_per_page

# Move to the next additonal page
if node_page == self.additional_page and len(self.page_nodes[node_page]) >= self.nodes_per_page:
self.additional_page += 1

self.total_pages = len(self.page_nodes)
print("Finished metis partitioning")

def get_node_page(self, src_node, neighbor_node):
return self.page_nodes[self.node_to_page[neighbor_node]]

features_class_map = {
"default": FeaturesLoader,
"neighbors_nearby": NeighborFeaturesLoader,
"metis_partition": MetisPartitionLoader,
}

def get_featurizer(data_loader, features_stat):
featurizer_type = features_stat["featurizer_type"]
if featurizer_type not in features_class_map:
raise Exception("Invalid featurizer type of " + str(featurizer_type))

return features_class_map[featurizer_type](data_loader, features_stat)
26 changes: 26 additions & 0 deletions simulator/src/in_mem_storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import heapq


class InMemoryStorage:
def __init__(self, data_loader, percent_in_memory):
self.percent_in_memory = percent_in_memory
total_nodes = data_loader.get_num_nodes()
nodes_in_mem = int((total_nodes * self.percent_in_memory) / 100.0)

# Get the top nodes based on incoming neighbors
heap = []
for node_id in range(total_nodes):
num_incoming = data_loader.get_incoming_neighbors(node_id)
heapq.heappush(heap, (num_incoming, node_id))

top_pairs = heapq.nlargest(nodes_in_mem, heap)
self.in_memory_nodes = set([pair[1] for pair in top_pairs])

def node_in_mem_storage(self, node_id):
return node_id in self.in_memory_nodes

def get_percentage_in_mem(self):
return self.percent_in_memory

def in_mem_nodes_count(self):
return len(self.in_memory_nodes)
Loading

0 comments on commit 84f9198

Please sign in to comment.