diff --git a/pyan/analyzer.py b/pyan/analyzer.py index 25c713f..1eab7a0 100644 --- a/pyan/analyzer.py +++ b/pyan/analyzer.py @@ -242,7 +242,8 @@ def resolve_imports(self): if len(to_nodes) > 0 } - def filter_data(self, function: Union[None, str] = None, namespace: Union[None, str] = None, max_iter: int = 1000): + def filter_data(self, function: Union[None, str] = None, namespace: Union[None, str] = None, max_iter: int = 1000, + filter_down=True, filter_up=False): if function: function_name = function.split(".")[-1] function_namespace = ".".join(function.split(".")[:-1]) @@ -251,9 +252,10 @@ def filter_data(self, function: Union[None, str] = None, namespace: Union[None, else: node = None - self.filter(node=node, namespace=namespace) + self.filter(node=node, namespace=namespace, filter_down=filter_down, filter_up=filter_up) - def filter(self, node: Union[None, Node] = None, namespace: Union[str, None] = None, max_iter: int = 1000): + def filter(self, node: Union[None, Node] = None, namespace: Union[str, None] = None, max_iter: int = 1000, + filter_down: bool = True, filter_up: bool = False): """ filter callgraph nodes that related to `node` or are in `namespace` @@ -262,12 +264,15 @@ def filter(self, node: Union[None, Node] = None, namespace: Union[str, None] = N namespace: namespace to search in (name of top level module), if None, determines namespace from `node` max_iter: maximum number of iterations and nodes to iterate + filter_down: filter nodes in downward + filter_up: filter nodes in upward Returns: self """ # filter the nodes to avoid cluttering the callgraph with irrelevant information - filtered_nodes = self.get_related_nodes(node, namespace=namespace, max_iter=max_iter) + filtered_nodes = self.get_related_nodes(node, namespace=namespace, max_iter=max_iter, + find_downward=filter_down, find_upward=filter_up) self.nodes = {name: [node for node in nodes if node in filtered_nodes] for name, nodes in self.nodes.items()} self.uses_edges = { @@ -283,7 +288,8 @@ def filter(self, node: Union[None, Node] = None, namespace: Union[str, None] = N return self def get_related_nodes( - self, node: Union[None, Node] = None, namespace: Union[str, None] = None, max_iter: int = 1000 + self, node: Union[None, Node] = None, namespace: Union[str, None] = None, max_iter: int = 1000, + find_downward: bool = True, find_upward: bool = False ) -> set: """ get nodes that related to `node` or are in `namespace` @@ -293,6 +299,8 @@ def get_related_nodes( namespace: namespace to search in (name of top level module), if None, determines namespace from `node` max_iter: maximum number of iterations and nodes to iterate + find_downward: look for nodes in downward + find_upward: look for nodes in upward Returns: set: set of nodes related to `node` including `node` itself @@ -316,63 +324,67 @@ def get_related_nodes( namespace = node.namespace.strip(".").split(".", 1)[0] queue = [node] - # use queue system to search through nodes - # essentially add a node to the queue and then search all connected nodes which are in turn added to the queue - # until the queue itself is empty or the maximum limit of max_iter searches have been hit - downstream_new_nodes = new_nodes.copy() - downstream_queue = queue.copy() - i = max_iter - while len(downstream_queue) > 0: - item = downstream_queue.pop() - if item not in downstream_new_nodes: - downstream_new_nodes.add(item) - i -= 1 - if i < 0: - break - # add used nodes that are not already added and are in desired namespace - downstream_queue.extend( - [ - n - for n in self.uses_edges.get(item, []) - if n in self.uses_edges and n not in downstream_new_nodes and namespace in n.namespace - ] - ) - # add defined nodes that are not already added and are in desired namespace - downstream_queue.extend( - [ - n - for n in self.defines_edges.get(item, []) - if n in self.defines_edges and n not in downstream_new_nodes and namespace in n.namespace - ] - ) + downstream_new_nodes = set() + if find_downward: + # use queue system to search through nodes + # essentially add a node to the queue and then search all connected nodes which are in turn added to the queue + # until the queue itself is empty or the maximum limit of max_iter searches have been hit + downstream_new_nodes = new_nodes.copy() + downstream_queue = queue.copy() + i = max_iter + while len(downstream_queue) > 0: + item = downstream_queue.pop() + if item not in downstream_new_nodes: + downstream_new_nodes.add(item) + i -= 1 + if i < 0: + break + # add used nodes that are not already added and are in desired namespace + downstream_queue.extend( + [ + n + for n in self.uses_edges.get(item, []) + if n in self.uses_edges and n not in downstream_new_nodes and namespace in n.namespace + ] + ) + # add defined nodes that are not already added and are in desired namespace + downstream_queue.extend( + [ + n + for n in self.defines_edges.get(item, []) + if n in self.defines_edges and n not in downstream_new_nodes and namespace in n.namespace + ] + ) - # get callers of node - upstream_new_nodes = new_nodes.copy() - upstream_queue = queue.copy() - i = max_iter - while len(upstream_queue) > 0: - item = upstream_queue.pop() - if item not in upstream_new_nodes: - upstream_new_nodes.add(item) - i -= 1 - if i < 0: - break - # add used nodes that are not already added and are in desired namespace - upstream_queue.extend( - [ - n - for n in self.get_callers(self.uses_edges, item) - if n in self.uses_edges and n not in upstream_new_nodes and namespace in n.namespace - ] - ) - # add defined nodes that are not already added and are in desired namespace - upstream_queue.extend( - [ - n - for n in self.get_callers(self.defines_edges, item) - if n in self.defines_edges and n not in upstream_new_nodes and namespace in n.namespace - ] - ) + upstream_new_nodes = set() + if find_upward: + # get callers of node + upstream_new_nodes = new_nodes.copy() + upstream_queue = queue.copy() + i = max_iter + while len(upstream_queue) > 0: + item = upstream_queue.pop() + if item not in upstream_new_nodes: + upstream_new_nodes.add(item) + i -= 1 + if i < 0: + break + # add used nodes that are not already added and are in desired namespace + upstream_queue.extend( + [ + n + for n in self.get_callers(self.uses_edges, item) + if n in self.uses_edges and n not in upstream_new_nodes and namespace in n.namespace + ] + ) + # add defined nodes that are not already added and are in desired namespace + upstream_queue.extend( + [ + n + for n in self.get_callers(self.defines_edges, item) + if n in self.defines_edges and n not in upstream_new_nodes and namespace in n.namespace + ] + ) return downstream_new_nodes.union(upstream_new_nodes) diff --git a/pyan/main.py b/pyan/main.py index f6b2540..9a5293c 100644 --- a/pyan/main.py +++ b/pyan/main.py @@ -45,6 +45,10 @@ def main(cli_args=None): parser.add_argument("--function", dest="function", help="filter for FUNCTION (generates call subtree)", metavar="FUNCTION", default=None) + parser.add_argument("--filterdown", dest="filterdown", help="filter downstream (FUNCTION will be root in call tree)", action="store", default=True) + + parser.add_argument("--filterup", dest="filterup", help="filter upstream (FUNCTION will be a leaf in call tree)", action="store", default=False) + parser.add_argument("-l", "--log", dest="logname", help="write log to LOG", metavar="LOG") parser.add_argument("-v", "--verbose", action="store_true", default=False, dest="verbose", help="verbose output") @@ -214,7 +218,9 @@ def main(cli_args=None): v = CallGraphVisitor(filenames, logger=logger, root=root) if known_args.function or known_args.namespace: - v.filter_data(function=known_args.function, namespace=known_args.namespace) + filter_down = known_args.filterdown in ["T", "t", "True", "true"] + filter_up = known_args.filterup in ["T", "t", "True", "true"] + v.filter_data(function=known_args.function, namespace=known_args.namespace, filter_down=filter_down, filter_up=filter_up) if not known_args.packages: v.remove_packages()