Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
LilithWittmann committed Oct 15, 2023
1 parent 1fc72a7 commit faee911
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 22 deletions.
51 changes: 31 additions & 20 deletions causy/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,6 @@ def create_graph_from_data(self, data: List[Dict[str, float]]):
:param data: is a list of dictionaries
:return:
"""

# initialize nodes
keys = data[0].keys()
nodes: Dict[str, List[float]] = {}
Expand Down Expand Up @@ -282,20 +281,12 @@ def execute_pipeline_steps(self):

self.graph.action_history = action_history

def execute_pipeline_step(self, test_fn: IndependenceTestInterface):
"""
Filter the graph
:param test_fn: the test function
:param threshold: the threshold
:return:
"""
combinations = []
actions_taken = []

def _generate_combinations(self, test_fn):
if type(test_fn.NUM_OF_COMPARISON_ELEMENTS) is int:
combinations = itertools.combinations(
for i in itertools.combinations(
self.graph.nodes, test_fn.NUM_OF_COMPARISON_ELEMENTS
)
):
yield i
elif type(test_fn.NUM_OF_COMPARISON_ELEMENTS) is ComparisonSettings:
start = test_fn.NUM_OF_COMPARISON_ELEMENTS.min
# if min is longer then our dataset, we can't create any combinations
Expand All @@ -316,22 +307,42 @@ def execute_pipeline_step(self, test_fn: IndependenceTestInterface):
if stop < start:
return

# create all combinations
# create all combinations
for r in range(start, stop):
combinations.extend(itertools.combinations(self.graph.nodes, r))
print(r)
for i in itertools.combinations(self.graph.nodes, r):
yield i

# initialize the worker pool (we currently use all available cores * 2)
def execute_pipeline_step(self, test_fn: IndependenceTestInterface):
"""
Filter the graph
:param test_fn: the test function
:param threshold: the threshold
:return:
"""
actions_taken = []

args = [[test_fn, [*i], self.graph] for i in combinations]
# initialize the worker pool (we currently use all available cores * 2)

# run all combinations in parallel except if the number of combinations is smaller then the chunk size
# because then we would create more overhead then we would definetly gain from parallel processing
if test_fn.PARALLEL and len(args) > test_fn.CHUNK_SIZE_PARALLEL_PROCESSING:
if test_fn.PARALLEL:
iterator = self.pool.imap_unordered(
unpack_run, args, chunksize=test_fn.CHUNK_SIZE_PARALLEL_PROCESSING
unpack_run,
[
[test_fn, [*i], self.graph]
for i in self._generate_combinations(test_fn)
],
chunksize=test_fn.CHUNK_SIZE_PARALLEL_PROCESSING,
)
else:
iterator = [unpack_run(i) for i in args]
iterator = [
unpack_run(i)
for i in [
[test_fn, [*i], self.graph]
for i in self._generate_combinations(test_fn)
]
]

# run all combinations in parallel
for result in iterator:
Expand Down
4 changes: 2 additions & 2 deletions causy/independence_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ def test(self, nodes: List[str], graph: BaseGraphInterface) -> TestResult:
class ExtendedPartialCorrelationTestMatrix(IndependenceTestInterface):
NUM_OF_COMPARISON_ELEMENTS = ComparisonSettings(min=4, max=AS_MANY_AS_FIELDS)
CHUNK_SIZE_PARALLEL_PROCESSING = 1
PARALLEL = False
PARALLEL = True

def test(self, nodes: List[str], graph: BaseGraphInterface) -> TestResult:
"""
Expand All @@ -196,7 +196,7 @@ def test(self, nodes: List[str], graph: BaseGraphInterface) -> TestResult:
:param nodes: the nodes to test
:return: A TestResult with the action to take
"""
logger.debug(f"ExtendedPartialCorrelationTestMatrix {nodes}")
logger.info(f"ExtendedPartialCorrelationTestMatrix {nodes}")
covariance_matrix = [
[None for _ in range(len(nodes))] for _ in range(len(nodes))
]
Expand Down

0 comments on commit faee911

Please sign in to comment.