Skip to content

Commit

Permalink
feat(IIDSampleGenerator): new shape
Browse files Browse the repository at this point in the history
also: refactor test_pc_graph
  • Loading branch information
this-is-sofia committed Apr 8, 2024
1 parent 5450656 commit 0c58348
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 88 deletions.
15 changes: 15 additions & 0 deletions causy/sample_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,21 @@ def _generate_data(self, size):
internal_repr[to_node] += edge.value * internal_repr[from_node]
return internal_repr

# TODO: adjust sample generator tests to this data shape
def _generate_shaped_data(self, size):
test_data = self._generate_data(size)
data = {}
for variable in test_data.keys():
data[variable] = test_data[variable]

test_data_shaped = []
for i in range(size):
entry = {}
for key in data.keys():
entry[key] = data[key][i]
test_data_shaped.append(entry)
return test_data_shaped

def generate(self, size):
"""
Generate data for a sample graph with a time dimension
Expand Down
141 changes: 53 additions & 88 deletions tests/test_pc_graph.py
Original file line number Diff line number Diff line change
@@ -1,99 +1,60 @@
import csv
import unittest
import numpy as np
import torch

from causy.algorithms import PC
from causy.algorithms import PC, ParallelPC
from causy.graph_utils import retrieve_edges
from causy.sample_generator import IIDSampleGenerator, SampleEdge, NodeReference


# TODO: generate larger toy model to test quadruple orientation rules.
def generate_data_minimal_example(a, b, c, d, sample_size):
np.random.seed(0)
V = d * np.random.normal(0, 1, sample_size)
W = c * np.random.normal(0, 1, sample_size)
Z = W + V + np.random.normal(0, 1, sample_size)
X = a * Z + np.random.normal(0, 1, sample_size)
Y = b * Z + np.random.normal(0, 1, sample_size)

data = {}
data["V"], data["W"], data["Z"], data["X"], data["Y"] = V, W, Z, X, Y
test_data = []

for i in range(sample_size):
entry = {}
for key in data.keys():
entry[key] = data[key][i]
test_data.append(entry)
return test_data


def generate_data_further_example(c, d, e, f, g, sample_size):
np.random.seed(0)
A = np.random.normal(0, 1, sample_size)
B = np.random.normal(0, 1, sample_size)
C = A + c * B + np.random.normal(0, 1, sample_size)
D = d * A + B + C + np.random.normal(0, 1, sample_size)
E = e * B + np.random.normal(0, 1, sample_size)
F = f * E + g * B + C + D + np.random.normal(0, 1, sample_size)

data = {}
data["A"], data["B"], data["C"], data["D"], data["E"], data["F"] = A, B, C, D, E, F
test_data = []

for i in range(sample_size):
entry = {}
for key in data.keys():
entry[key] = data[key][i]
test_data.append(entry)

return test_data
# TODO: seedings are not working yet (failing every 20th time or so, should always be equal for equal data), fix that.


def set_random_seed(seed):
# Ensure reproducability across operating systems
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
np.random.seed(seed)


class PCTestTestCase(unittest.TestCase):
def test_with_rki_data(self):
with open("./tests/fixtures/rki-data.csv") as f:
data = csv.DictReader(f)
test_data = []
for row in data:
for k in row.keys():
if row[k] == "":
row[k] = 0.0
row[k] = float(row[k])
test_data.append(row)
def test_toy_model_minimal_example(self):
set_random_seed(1)
model = IIDSampleGenerator(
edges=[
SampleEdge(NodeReference("Z"), NodeReference("X"), 5),
SampleEdge(NodeReference("Z"), NodeReference("Y"), 6),
SampleEdge(NodeReference("W"), NodeReference("Z"), 1),
SampleEdge(NodeReference("V"), NodeReference("Z"), 1),
]
)

tst = PC()
tst.create_graph_from_data(test_data)
tst.create_all_possible_edges()
tst.execute_pipeline_steps()
retrieve_edges(tst.graph)
model.random_fn = lambda: torch.normal(0, 1, (1, 1))
sample_size = 10000
test_data = model._generate_shaped_data(sample_size)

def test_with_minimal_toy_model(self):
a, b, c, d, sample_size = 5, 6, 7, 8, 100000
test_data = generate_data_minimal_example(a, b, c, d, sample_size)
tst = PC()
tst.create_graph_from_data(test_data)
tst.create_all_possible_edges()
tst.execute_pipeline_steps()
retrieve_edges(tst.graph)
node_mapping = {}

"""
for e in retrieve_edges(tst.graph):
print(tst.graph.nodes[e[0]].name + " => " + tst.graph.nodes[e[1]].name)
print(tst.graph.edges[e[0]][e[1]].metadata)
"""

node_mapping = {}
for key, node in tst.graph.nodes.items():
node_mapping[node.name] = key

# check the direct_effect values
self.assertAlmostEqual(
tst.graph.edges[node_mapping["Z"]][node_mapping["X"]].metadata[
"direct_effect"
],
5.0,
1,
)

self.assertAlmostEqual(
tst.graph.edges[node_mapping["V"]][node_mapping["Z"]].metadata[
"direct_effect"
Expand Down Expand Up @@ -130,22 +91,35 @@ def test_with_minimal_toy_model(self):
)
)

def test_with_larger_toy_model(self):
c, d, e, f, g, sample_size = 2, 3, 4, 5, 6, 10000
test_data = generate_data_further_example(c, d, e, f, g, sample_size)
tst = PC()
node_mapping = {}
def test_second_toy_model_example(self):
set_random_seed(1)
c, d, e, f, g = 2, 3, 4, 5, 6
model = IIDSampleGenerator(
edges=[
SampleEdge(NodeReference("A"), NodeReference("C"), 1),
SampleEdge(NodeReference("B"), NodeReference("C"), c),
SampleEdge(NodeReference("A"), NodeReference("D"), d),
SampleEdge(NodeReference("B"), NodeReference("D"), 1),
SampleEdge(NodeReference("C"), NodeReference("D"), 1),
SampleEdge(NodeReference("B"), NodeReference("E"), e),
SampleEdge(NodeReference("E"), NodeReference("F"), f),
SampleEdge(NodeReference("B"), NodeReference("F"), g),
SampleEdge(NodeReference("C"), NodeReference("F"), 1),
SampleEdge(NodeReference("D"), NodeReference("F"), 1),
],
)

model.random_fn = lambda: torch.normal(0, 1, (1, 1))
sample_size = 10000
test_data = model._generate_shaped_data(sample_size)

tst = PC()
tst.create_graph_from_data(test_data)
tst.create_all_possible_edges()
tst.execute_pipeline_steps()
retrieve_edges(tst.graph)

"""
for e in retrieve_edges(tst.graph):
print(tst.graph.nodes[e[0]].name + " => " + tst.graph.nodes[e[1]].name)
print(tst.graph.edges[e[0]][e[1]].metadata)
"""

node_mapping = {}
for key, node in tst.graph.nodes.items():
node_mapping[node.name] = key

Expand Down Expand Up @@ -181,11 +155,7 @@ def test_with_larger_toy_model(self):
tst.graph.nodes[node_mapping["B"]], tst.graph.nodes[node_mapping["D"]]
)
)
self.assertTrue(
tst.graph.directed_edge_exists(
tst.graph.nodes[node_mapping["C"]], tst.graph.nodes[node_mapping["D"]]
)
)

self.assertTrue(
tst.graph.directed_edge_exists(
tst.graph.nodes[node_mapping["B"]], tst.graph.nodes[node_mapping["E"]]
Expand Down Expand Up @@ -216,11 +186,6 @@ def test_with_larger_toy_model(self):
tst.graph.nodes[node_mapping["A"]], tst.graph.nodes[node_mapping["E"]]
)
)
self.assertFalse(
tst.graph.edge_exists(
tst.graph.nodes[node_mapping["A"]], tst.graph.nodes[node_mapping["F"]]
)
)

self.assertFalse(
tst.graph.edge_exists(
Expand Down

0 comments on commit 0c58348

Please sign in to comment.