Skip to content

Commit

Permalink
Merge pull request #134 from aedwardstx/children_class
Browse files Browse the repository at this point in the history
Abstract children list and dict collections into a class
  • Loading branch information
aedwardstx authored Nov 20, 2024
2 parents 5baa554 + 2561d6e commit ff0794a
Show file tree
Hide file tree
Showing 12 changed files with 341 additions and 199 deletions.
134 changes: 56 additions & 78 deletions hier_config/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
from abc import ABC, abstractmethod
from itertools import chain
from logging import getLogger
from typing import TYPE_CHECKING, Optional, Union
from typing import TYPE_CHECKING, Optional, TypeVar, Union

from .children import HConfigChildren
from .exceptions import DuplicateChildError

if TYPE_CHECKING:
Expand All @@ -15,25 +16,25 @@
from .platforms.driver_base import HConfigDriverBase
from .root import HConfig

_HConfigRootOrChildT = TypeVar(
"_HConfigRootOrChildT", bound=Union[HConfig, HConfigChild]
)

logger = getLogger(__name__)


class HConfigBase(ABC): # noqa: PLR0904
__slots__ = ("children", "children_dict")
__slots__ = ("children",)

def __init__(self) -> None:
self.children: list[HConfigChild] = []
self.children_dict: dict[str, HConfigChild] = {}
self.children = HConfigChildren()

def __len__(self) -> int:
return len(tuple(self.all_children()))

def __bool__(self) -> bool:
return True

def __contains__(self, item: str) -> bool:
return item in self.children_dict

@abstractmethod
def __hash__(self) -> int:
pass
Expand Down Expand Up @@ -68,31 +69,27 @@ def add_child(
self,
text: str,
*,
raise_on_duplicate: bool = False,
force_duplicate: bool = False,
return_if_present: bool = False,
check_if_present: bool = True,
) -> HConfigChild:
"""Add a child instance of HConfigChild."""
if not text:
message = "text was empty"
raise ValueError(message)

# if child does not exist
if text not in self:
new_item = self._instantiate_child(text)
self.children.append(new_item)
self.children_dict[text] = new_item
return new_item
# if child does exist and is allowed to be installed as a duplicate
if self._is_duplicate_child_allowed() or force_duplicate:
new_item = self._instantiate_child(text)
self.children.append(new_item)
return new_item

# If the child is already present and the parent does not allow for it
if raise_on_duplicate:
if check_if_present and (child := self.children.get(text)):
if self._is_duplicate_child_allowed():
new_child = self._instantiate_child(text)
self.children.append(new_child, update_mapping=False)
return new_child
if return_if_present:
return child
message = f"Found a duplicate section: {(*self.path(), text)}"
raise DuplicateChildError(message)
return self.children_dict[text]

new_child = self._instantiate_child(text)
self.children.append(new_child)
return new_child

def path(self) -> Iterator[str]: # noqa: PLR6301
yield from ()
Expand All @@ -110,19 +107,6 @@ def add_deep_copy_of(

return new_child

def delete_child_by_text(self, text: str) -> None:
"""Delete all children with the provided text."""
if text in self.children_dict:
self.children[:] = [c for c in self.children if c.text != text]
self.rebuild_children_dict()

def delete_child(self, child: HConfigChild) -> None:
"""Delete a child from self.children and self.children_dict."""
old_len = len(self.children)
self.children = [c for c in self.children if c is not child]
if old_len != len(self.children):
self.rebuild_children_dict()

def all_children_sorted(self) -> Iterator[HConfigChild]:
"""Recursively find and yield all children sorted at each hierarchy."""
for child in sorted(self.children):
Expand Down Expand Up @@ -197,7 +181,7 @@ def get_children(
isinstance(equals, str)
and startswith is endswith is contains is re_search is None
):
if child := self.children_dict.get(equals):
if child := self.children.get(equals):
yield child
children_slice = slice(self.children.index(child) + 1, None)
else:
Expand All @@ -208,13 +192,13 @@ def get_children(
and equals is endswith is contains is re_search is None
):
duplicates_allowed = None
for child_text, child in self.children_dict.items():
if child_text.startswith(startswith):
for index, child in enumerate(self.children):
if child.text.startswith(startswith):
yield child
if duplicates_allowed is None:
duplicates_allowed = self._is_duplicate_child_allowed()
if duplicates_allowed:
children_slice = slice(self.children.index(child) + 1, None)
children_slice = slice(index + 1, None)
break
else:
return
Expand Down Expand Up @@ -247,17 +231,6 @@ def add_shallow_copy_of(

return new_child

def rebuild_children_dict(self) -> None:
"""Rebuild self.children_dict."""
self.children_dict = {}
for child in self.children:
self.children_dict.setdefault(child.text, child)

def delete_all_children(self) -> None:
"""Delete all children."""
self.children.clear()
self.rebuild_children_dict()

def unified_diff(self, target: Union[HConfig, HConfigChild]) -> Iterator[str]:
"""In its current state, this algorithm does not consider duplicate child differences.
e.g. two instances `endif` in an IOS-XR route-policy. It also does not respect the
Expand All @@ -269,7 +242,7 @@ def unified_diff(self, target: Union[HConfig, HConfigChild]) -> Iterator[str]:
# if a self child is missing from the target "- self_child.text"
for self_child in self.children:
self_iter = iter((f"{self_child.indentation}{self_child.text}",))
if target_child := target.children_dict.get(self_child.text, None):
if target_child := target.children.get(self_child.text, None):
found = self_child.unified_diff(target_child)
if peek := next(found, None):
yield from chain(self_iter, (peek,), found)
Expand All @@ -281,7 +254,7 @@ def unified_diff(self, target: Union[HConfig, HConfigChild]) -> Iterator[str]:
)
# if a target child is missing from self "+ target_child.text"
for target_child in target.children:
if target_child.text not in self.children_dict:
if target_child.text not in self.children:
yield f"{target_child.indentation}+ {target_child.text}"
yield from (
f"{c.indentation}+ {c.text}"
Expand Down Expand Up @@ -371,8 +344,8 @@ def _is_duplicate_child_allowed(self) -> bool:
def _with_tags(
self,
tags: frozenset[str],
new_instance: Union[HConfig, HConfigChild],
) -> Union[HConfig, HConfigChild]:
new_instance: _HConfigRootOrChildT,
) -> _HConfigRootOrChildT:
"""Adds children recursively that have a subset of tags."""
for child in self.children:
if tags.issubset(child.tags):
Expand All @@ -383,9 +356,9 @@ def _with_tags(

def _config_to_get_to(
self,
target: Union[HConfig, HConfigChild],
delta: Union[HConfig, HConfigChild],
) -> Union[HConfig, HConfigChild]:
target: _HConfigRootOrChildT,
delta: _HConfigRootOrChildT,
) -> _HConfigRootOrChildT:
"""Figures out what commands need to be executed to transition from self to target.
self is the source data structure(i.e. the running_config),
target is the destination(i.e. generated_config).
Expand All @@ -405,12 +378,12 @@ def _strip_acl_sequence_number(hier_child: HConfigChild) -> str:

def _difference(
self,
target: Union[HConfig, HConfigChild],
delta: Union[HConfig, HConfigChild],
target: _HConfigRootOrChildT,
delta: _HConfigRootOrChildT,
target_acl_children: Optional[dict[str, HConfigChild]] = None,
*,
in_acl: bool = False,
) -> Union[HConfig, HConfigChild]:
) -> _HConfigRootOrChildT:
acl_sw_matches = tuple(f"ip{x} access-list " for x in ("", "v4", "v6"))

for self_child in self.children:
Expand Down Expand Up @@ -460,7 +433,7 @@ def _config_to_get_to_left(
# Also, find out if another command in self.children will overwrite
# i.e. be idempotent
for self_child in self.children:
if self_child.text in target:
if self_child.text in target.children:
continue
if self_child.is_idempotent_command(target.children):
continue
Expand All @@ -476,25 +449,30 @@ def _config_to_get_to_right(
target: Union[HConfig, HConfigChild],
delta: Union[HConfig, HConfigChild],
) -> None:
# find what would need to be added to source_config to get to self
# Find what would need to be added to source_config to get to self
for target_child in target.children:
# if the child exist, recurse into its children
if self_child := self.get_child(equals=target_child.text):
# This creates a new HConfigChild object just in case there are some delta children
# Not very efficient, think of a way to not do this
subtree = delta.add_child(target_child.text)
self_child._config_to_get_to(target_child, subtree) # noqa: SLF001
if not subtree.children:
subtree.delete()
# If the child exist, recurse into its children
if self_child := self.children.get(target_child.text):
# Do we need to rewrite the child and its children as well?
elif self_child.use_sectional_overwrite():
target_child.overwrite_with(self_child, delta)
elif self_child.use_sectional_overwrite_without_negation():
target_child.overwrite_with(self_child, delta, negate=False)
# the child is absent, add it
if self_child.use_sectional_overwrite():
self_child.overwrite_with(target_child, delta)
continue
if self_child.use_sectional_overwrite_without_negation():
self_child.overwrite_with(target_child, delta, negate=False)
continue
# This creates a new HConfigChild object just in case there are some delta children.
# This is not very efficient, think of a way to not do this.
subtree = self._instantiate_child(target_child.text)
self_child._config_to_get_to(target_child, subtree) # noqa: SLF001
if subtree.children:
delta.children.append(subtree)
# The child is absent, add it.
else:
# If the target_child is already in the delta, that means it was negated in the target config
if target_child.text in delta.children:
continue
new_item = delta.add_deep_copy_of(target_child)
# mark the new item and all of its children as new_in_config
# Mark the new item and all of its children as new_in_config.
new_item.new_in_config = True
for child in new_item.all_children():
child.new_in_config = True
Expand Down
46 changes: 20 additions & 26 deletions hier_config/child.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,23 +73,12 @@ def __eq__(self, other: object) -> bool:
if not isinstance(other, HConfigChild):
return NotImplemented

if (
self.text != other.text
or self.tags != other.tags
or self.comments != other.comments
):
return False

if len(self.children) != len(other.children):
# We are intentionally not including the
# comments, facts, instances, new_in_config, order_weight attributes.
if self.text != other.text or self.tags != other.tags:
return False

return all(
self_child == other_child
for self_child, other_child in zip(
sorted(self.children),
sorted(other.children),
)
)
return self.children == other.children

def __ne__(self, other: object) -> bool:
return not self.__eq__(other)
Expand All @@ -108,7 +97,7 @@ def text(self, value: str) -> None:
is instantiated to rebuild the children dictionary.
"""
self._text = value.strip()
self.parent.rebuild_children_dict()
self.parent.children.rebuild_mapping()

@property
def text_without_negation(self) -> str:
Expand Down Expand Up @@ -169,7 +158,6 @@ def move(self, new_parent: Union[HConfig, HConfigChild]) -> None:
:param new_parent: HConfigChild object -> type list
"""
new_parent.children.append(self)
new_parent.rebuild_children_dict()
self.delete()

def lineage(self) -> Iterator[HConfigChild]:
Expand Down Expand Up @@ -217,7 +205,7 @@ def indentation(self) -> str:

def delete(self) -> None:
"""Delete the current object from its parent."""
self.parent.delete_child(self)
self.parent.children.delete(self)

def tags_add(self, tag: Union[str, Iterable[str]]) -> None:
"""Add a tag to self._tags on all leaf nodes."""
Expand Down Expand Up @@ -314,20 +302,26 @@ def use_sectional_overwrite(self) -> bool:

def overwrite_with(
self,
other: HConfigChild,
target: HConfigChild,
delta: Union[HConfig, HConfigChild],
*,
negate: bool = True,
) -> None:
"""Deletes delta.child[self.text], adds a deep copy of self to delta."""
if other.children != self.children:
"""Deletes delta.child[self.text], adds a deep copy of target to delta."""
if self.children != target.children:
if negate:
delta.delete_child_by_text(self.text)
deleted = delta.add_child(self.text).negate()
deleted.comments.add("dropping section")
if negated := delta.children.get(self.text):
negated.negate()
else:
negated = delta.add_child(
self.text, check_if_present=False
).negate()

negated.comments.add("dropping section")
else:
delta.children.delete(self.text)
if self.children:
delta.delete_child_by_text(self.text)
new_item = delta.add_deep_copy_of(self)
new_item = delta.add_deep_copy_of(target)
new_item.comments.add("re-create section")

def line_inclusion_test(
Expand Down
Loading

0 comments on commit ff0794a

Please sign in to comment.