diff --git a/README.md b/README.md index f6dd602..e5222d2 100644 --- a/README.md +++ b/README.md @@ -22,5 +22,46 @@ The code documentation can be found at: https://hier-config.readthedocs.io/en/la Installation ============ -### Pip -Install from PyPi: `pip install hier-config` +### PIP +Install from PyPi: + +```shell +pip install hier-config +``` + +Quick Start +=========== + +### Step 1: Import Required Classes +```python +from hier_config import WorkflowRemediation, get_hconfig, Platform +from hier_config.utils import read_text_from_file +``` + +### Step 2: Load Configurations +Load the running and intended configurations as strings: + +```python +running_config_text = read_text_from_file("./tests/fixtures/running_config.conf") +generated_config_text = read_text_from_file("./tests/fixtures/generated_config.conf") +``` + +### Step 3: Create HConfig Objects +Specify the device platform (e.g., `Platform.CISCO_IOS`): + +```python +running_config = get_hconfig(Platform.CISCO_IOS, running_config_text) +generated_config = get_hconfig(Platform.CISCO_IOS, generated_config_text) +``` + +### Step 4: Initialize WorkflowRemediation +Compare configurations and generate remediation steps: + +```python +workflow = WorkflowRemediation(running_config, generated_config) + +print("Remediation Configuration:") +print(workflow.remediation_config) +``` + +This guide gets you started with Hier Config in minutes! For more details, visit [Hier Config Documentation Site](https://hier-config.readthedocs.io/en/latest/). diff --git a/docs/custom-workflows.md b/docs/custom-workflows.md index 0b18c9c..f9e9ea7 100644 --- a/docs/custom-workflows.md +++ b/docs/custom-workflows.md @@ -12,14 +12,14 @@ Start by importing the necessary modules and loading the running and intended co ```python from hier_config import WorkflowRemediation, get_hconfig, Platform -from hier_config.utils import load_device_config +from hier_config.utils import read_text_from_file ``` Load the configurations from files: ```python -running_config = load_device_config("./tests/fixtures/running_config_acl.conf") -generated_config = load_device_config("./tests/fixtures/generated_config_acl.conf") +running_config = read_text_from_file("./tests/fixtures/running_config_acl.conf") +generated_config = read_text_from_file("./tests/fixtures/generated_config_acl.conf") ``` These configurations represent the current and desired states of the device. diff --git a/docs/drivers.md b/docs/drivers.md index 2089af9..dfbe329 100644 --- a/docs/drivers.md +++ b/docs/drivers.md @@ -497,11 +497,11 @@ class CustomPlatform(str, Enum): ```python from .custom_platform import CustomPlatform from hier_config import get_hconfig -from hier_config.utils import load_device_config +from hier_config.utils import read_text_from_file # Load running and intended configurations from files -running_config_text = load_device_config("./tests/fixtures/running_config.conf") -generated_config_text = load_device_config("./tests/fixtures/remediation_config.conf") +running_config_text = read_text_from_file("./tests/fixtures/running_config.conf") +generated_config_text = read_text_from_file("./tests/fixtures/remediation_config.conf") # Create HConfig objects for running and intended configurations running_config = get_hconfig(CustomPlatform.CUSTOM_DRIVER, running_config_text) diff --git a/docs/future-config.md b/docs/future-config.md index 731e65b..0ead60f 100644 --- a/docs/future-config.md +++ b/docs/future-config.md @@ -26,11 +26,11 @@ Currently, this algorithm does not account for: ```bash >>> from hier_config import get_hconfig, Platform ->>> from hier_config.utils import load_device_config +>>> from hier_config.utils import read_text_from_file >>> ->>> running_config_text = load_device_config("./tests/fixtures/running_config.conf") ->>> generated_config_text = load_device_config("./tests/fixtures/remediation_config_without_tags.conf") +>>> running_config_text = read_text_from_file("./tests/fixtures/running_config.conf") +>>> generated_config_text = read_text_from_file("./tests/fixtures/remediation_config_without_tags.conf") >>> >>> running_config = get_hconfig(Platform.CISCO_IOS, running_config_text) >>> remediation_config = get_hconfig(Platform.CISCO_IOS, remediation_config_text) diff --git a/docs/getting-started.md b/docs/getting-started.md index 9ced331..9bae278 100644 --- a/docs/getting-started.md +++ b/docs/getting-started.md @@ -8,7 +8,7 @@ To use `WorkflowRemediation`, you’ll import it along with `get_hconfig` (for g ```python >>> from hier_config import WorkflowRemediation, get_hconfig, Platform ->>> from hier_config.utils import load_device_config +>>> from hier_config.utils import read_text_from_file >>> ``` @@ -20,8 +20,8 @@ Use `get_hconfig` to create HConfig objects for both the running and intended co ```python # Define running and intended configurations as strings ->>> running_config_text = load_device_config("./tests/fixtures/running_config.conf") ->>> generated_config_text = load_device_config("./tests/fixtures/remediation_config.conf") +>>> running_config_text = read_text_from_file("./tests/fixtures/running_config.conf") +>>> generated_config_text = read_text_from_file("./tests/fixtures/remediation_config.conf") >>> # Create HConfig objects for running and intended configurations diff --git a/docs/junos-style-syntax-remediation.md b/docs/junos-style-syntax-remediation.md index 9126930..1601f20 100644 --- a/docs/junos-style-syntax-remediation.md +++ b/docs/junos-style-syntax-remediation.md @@ -26,10 +26,10 @@ set interfaces irb unit 3 family inet description "switch_mgmt_10.0.4.0/24" $ python3 >>> from hier_config import WorkflowRemediation, get_hconfig, Platform ->>> from hier_config.utils import load_device_config +>>> from hier_config.utils import read_text_from_file >>> ->>> running_config_text = load_device_config("./tests/fixtures/running_config_flat_junos.conf") ->>> generated_config_text = load_device_config("./tests/fixtures/generated_config_flat_junos.conf") +>>> running_config_text = read_text_from_file("./tests/fixtures/running_config_flat_junos.conf") +>>> generated_config_text = read_text_from_file("./tests/fixtures/generated_config_flat_junos.conf") # Create HConfig objects for the running and generated configurations using JunOS syntax >>> running_config = get_hconfig(Platform.JUNIPER_JUNOS, running_config_text) >>> generated_config = get_hconfig(Platform.JUNIPER_JUNOS, generated_config_text) @@ -119,10 +119,10 @@ interfaces { $ python3 >>> from hier_config import WorkflowRemediation, get_hconfig, Platform ->>> from hier_config.utils import load_device_config +>>> from hier_config.utils import read_text_from_file >>> ->>> running_config_text = load_device_config("./tests/fixtures/running_config_junos.conf") ->>> generated_config_text = load_device_config("./tests/fixtures/generated_config_junos.conf") +>>> running_config_text = read_text_from_file("./tests/fixtures/running_config_junos.conf") +>>> generated_config_text = read_text_from_file("./tests/fixtures/generated_config_junos.conf") # Create HConfig objects for the running and generated configurations using JunOS syntax >>> running_config = get_hconfig(Platform.JUNIPER_JUNOS, running_config_text) >>> generated_config = get_hconfig(Platform.JUNIPER_JUNOS, generated_config_text) diff --git a/docs/tags.md b/docs/tags.md index 101f48e..493ac50 100644 --- a/docs/tags.md +++ b/docs/tags.md @@ -110,11 +110,11 @@ With the tags loaded, you can create a targeted remediation based on those tags # Import necessary libraries from hier_config import WorkflowRemediation, get_hconfig, Platform -from hier_config.utils import load_device_config, load_hier_config_tags +from hier_config.utils import read_text_from_file, load_hier_config_tags # Load the running and generated configurations from files -running_config = load_device_config("./tests/fixtures/running_config.conf") -generated_config = load_device_config("./tests/fixtures/generated_config.conf") +running_config = read_text_from_file("./tests/fixtures/running_config.conf") +generated_config = read_text_from_file("./tests/fixtures/generated_config.conf") # Load tag rules from a file tags = load_hier_config_tags("./tests/fixtures/tag_rules_ios.yml") diff --git a/docs/unified-diff.md b/docs/unified-diff.md index 75d07a6..1e0001a 100644 --- a/docs/unified-diff.md +++ b/docs/unified-diff.md @@ -10,8 +10,8 @@ Currently, the algorithm does not account for duplicate child entries (e.g., mul >>> from hier_config import get_hconfig, Platform >>> from pprint import pprint >>> ->>> running_config_text = load_device_config("./tests/fixtures/running_config.conf") ->>> generated_config_text = load_device_config("./tests/fixtures/generated_config.conf") +>>> running_config_text = read_text_from_file("./tests/fixtures/running_config.conf") +>>> generated_config_text = read_text_from_file("./tests/fixtures/generated_config.conf") >>> >>> running_config = get_hconfig(Platform.CISCO_IOS, running_config_text) >>> generated_config = get_hconfig(Platform.CISCO_IOS, generated_config_text) diff --git a/docs/utilities.md b/docs/utilities.md new file mode 100644 index 0000000..1515b3d --- /dev/null +++ b/docs/utilities.md @@ -0,0 +1,180 @@ +# Utilities + +## read_text_from_file + +**Description**: +Reads the contents of a file and loads its contents into memory. + +**Arguments**: + + `file_path (str)`: The path to the device configuration file. + +**Returns**: + + `str`: The contents of the file as a string. + +**Example**: +```python +from hier_config.utils import read_text_from_file + +device_config = read_text_from_file("path/to/device_config.txt") +print(device_config) +``` + +## load_hier_config_tags + +**Description**: +Parses a YAML file containing configuration tags and converts them into a format compatible with Hier Config. + +**Arguments**: + + `file_path (str)`: The path to the YAML file containing tag rules. + +**Returns**: + + `List[Dict[str, Any]]`: A list of dictionaries representing tag rules. + +**Example**: +```python +from hier_config.utils import load_hier_config_tags + +tag_rules = load_hier_config_tags("path/to/tag_rules.yml") + +print(tag_rules) +``` + +## Hier Config V2 to V3 Migration Utilities + +Hier Config version 3 introduces breaking changes compared to version 2. These utilities are designed to help you transition seamlessly by enabling the continued use of version 2 configurations while you update your tooling to support the new version. + +### hconfig_v2_os_v3_platform_mapper +**Description**: +Maps a Hier Config v2 OS name to a v3 Platform enumeration. + +**Arguments**: + + `os_name (str)`: The name of the OS as defined in Hier Config v2. + +**Returns**: + + `Platform`: The corresponding Platform enumeration for Hier Config v3. + +**Raises**: + + `ValueError`: If the provided OS name is not supported in v2. + +**Example**: +```python +from hier_config.utils import hconfig_v2_os_v3_platform_mapper + +platform = hconfig_v2_os_v3_platform_mapper("ios") + +print(platform) # Output: +``` + +### hconfig_v3_platform_v2_os_mapper +**Description**: +Maps a Hier Config v3 Platform enumeration to a v2 OS name. + +**Arguments**: + + `platform (Platform)`: A Platform enumeration from Hier Config v3. + +**Returns**: + + `str`: The corresponding OS name for Hier Config v2. + +**Raises**: + + `ValueError`: If the provided Platform is not supported in v3. + +**Example**: +```python +from hier_config.utils import hconfig_v3_platform_v2_os_mapper + +os_name = hconfig_v3_platform_v2_os_mapper(Platform.CISCO_IOS) +print(os_name) # Output: "ios" +``` + +### load_hconfig_v2_options +**Description**: +Loads v2-style configuration options into a v3-compatible driver. + +**Arguments**: + + `v2_options (Dict[str, Any])`: A dictionary of v2-style options. + `platform (Platform)`: A Platform enumeration from Hier Config v3. + +**Returns**: + + `HConfigDriverBase`: Hier Config Platform Driver. + +**Example loading options from a dictionary**: +```python +from hier_config import Platform +from hier_config.utils import load_hconfig_v2_options + +v2_options = { + "negation": "no", + "ordering": [{"lineage": [{"startswith": "ntp"}], "order": 700}], + "per_line_sub": [{"search": "^!.*Generated.*$", "replace": ""}], + "sectional_exiting": [ + {"lineage": [{"startswith": "router bgp"}], "exit_text": "exit"} + ], + "idempotent_commands": [{"lineage": [{"startswith": "interface"}]}], +} +platform = Platform.CISCO_IOS +driver = load_hconfig_v2_options(v2_options, platform) + +print(driver) +``` + +*Output*: +``` +print(driver.rules) +full_text_sub=[] idempotent_commands=[IdempotentCommandsRule(match_rules=(MatchRule(equals=None, startswith='vlan', endswith=None, contains=None, re_search=None), MatchRule(equals=None, startswith='name', endswith=None, contains=None, re_search=None))), IdempotentCommandsRule(match_rules=(MatchRule(equals=None, startswith='interface ', endswith=None, contains=None, re_search=None), MatchRule(equals=None, startswith='description ', endswith=None, contains=None, re_search=None))), IdempotentCommandsRule(match_rules=(MatchRule(equals=None, startswith='interface ', endswith=None, contains=None, re_search=None), MatchRule(equals=None, startswith='ip address ', endswith=None, contains=None, re_search=None))), IdempotentCommandsRule(match_rules=(MatchRule(equals=None, startswith='interface ', endswith=None, contains=None, re_search=None), MatchRule(equals=None, startswith='switchport mode ', endswith=None, contains=None, re_search=None))), IdempotentCommandsRule(match_rules=(MatchRule(equals=None, startswith='interface ', endswith=None, contains=None, re_search=None), MatchRule(equals=None, startswith='authentication host-mode ', endswith=None, contains=None, re_search=None))), IdempotentCommandsRule(match_rules=(MatchRule(equals=None, startswith='interface ', endswith=None, contains=None, re_search=None), MatchRule(equals=None, startswith='authentication event server dead action authorize vlan ', endswith=None, contains=None, re_search=None))), IdempotentCommandsRule(match_rules=(MatchRule(equals=None, startswith='errdisable recovery interval ', endswith=None, contains=None, re_search=None),)), IdempotentCommandsRule(match_rules=(MatchRule(equals=None, startswith=None, endswith=None, contains=None, re_search='^(no )?logging console.*'),)), IdempotentCommandsRule(match_rules=(MatchRule(equals=None, startswith='interface', endswith=None, contains=None, re_search=None),))] idempotent_commands_avoid=[] indent_adjust=[] indentation=2 negation_default_when=[] negate_with=[NegationDefaultWithRule(match_rules=(MatchRule(equals=None, startswith='logging console ', endswith=None, contains=None, re_search=None),), use='logging console debugging'), NegationDefaultWithRule(match_rules=(MatchRule(equals=None, startswith='', endswith=None, contains=None, re_search=None),), use='no')] ordering=[OrderingRule(match_rules=(MatchRule(equals=None, startswith='interface', endswith=None, contains=None, re_search=None), MatchRule(equals=None, startswith='switchport mode ', endswith=None, contains=None, re_search=None)), weight=-10), OrderingRule(match_rules=(MatchRule(equals=None, startswith='no vlan filter', endswith=None, contains=None, re_search=None),), weight=200), OrderingRule(match_rules=(MatchRule(equals=None, startswith='interface', endswith=None, contains=None, re_search=None), MatchRule(equals=None, startswith='no shutdown', endswith=None, contains=None, re_search=None)), weight=200), OrderingRule(match_rules=(MatchRule(equals=None, startswith='aaa group server tacacs+ ', endswith=None, contains=None, re_search=None), MatchRule(equals=None, startswith='no server ', endswith=None, contains=None, re_search=None)), weight=10), OrderingRule(match_rules=(MatchRule(equals=None, startswith='no tacacs-server ', endswith=None, contains=None, re_search=None),), weight=10), OrderingRule(match_rules=(MatchRule(equals=None, startswith='ntp', endswith=None, contains=None, re_search=None),), weight=700)] parent_allows_duplicate_child=[] per_line_sub=[PerLineSubRule(search='^Building configuration.*', replace=''), PerLineSubRule(search='^Current configuration.*', replace=''), PerLineSubRule(search='^! Last configuration change.*', replace=''), PerLineSubRule(search='^! NVRAM config last updated.*', replace=''), PerLineSubRule(search='^ntp clock-period .*', replace=''), PerLineSubRule(search='^version.*', replace=''), PerLineSubRule(search='^ logging event link-status$', replace=''), PerLineSubRule(search='^ logging event subif-link-status$', replace=''), PerLineSubRule(search='^\\s*ipv6 unreachables disable$', replace=''), PerLineSubRule(search='^end$', replace=''), PerLineSubRule(search='^\\s*[#!].*', replace=''), PerLineSubRule(search='^ no ip address', replace=''), PerLineSubRule(search='^ exit-peer-policy', replace=''), PerLineSubRule(search='^ exit-peer-session', replace=''), PerLineSubRule(search='^ exit-address-family', replace=''), PerLineSubRule(search='^crypto key generate rsa general-keys.*$', replace=''), PerLineSubRule(search='^!.*Generated.*$', replace='')] post_load_callbacks=[, , ] sectional_exiting=[SectionalExitingRule(match_rules=(MatchRule(equals=None, startswith='router bgp', endswith=None, contains=None, re_search=None), MatchRule(equals=None, startswith='template peer-policy', endswith=None, contains=None, re_search=None)), exit_text='exit-peer-policy'), SectionalExitingRule(match_rules=(MatchRule(equals=None, startswith='router bgp', endswith=None, contains=None, re_search=None), MatchRule(equals=None, startswith='template peer-session', endswith=None, contains=None, re_search=None)), exit_text='exit-peer-session'), SectionalExitingRule(match_rules=(MatchRule(equals=None, startswith='router bgp', endswith=None, contains=None, re_search=None), MatchRule(equals=None, startswith='address-family', endswith=None, contains=None, re_search=None)), exit_text='exit-address-family'), SectionalExitingRule(match_rules=(MatchRule(equals=None, startswith='router bgp', endswith=None, contains=None, re_search=None),), exit_text='exit')] sectional_overwrite=[] sectional_overwrite_no_negate=[] +``` + +**Example loading options from a file**: +```python +from hier_config import Platform +from hier_config.utils import load_hconfig_v2_options_from_file + +platform = Platform.CISCO_IOS +driver = load_hconfig_v2_options("/path/to/options.yml", platform) +``` + +### load_hconfig_v2_tags +**Description**: +Converts v2-style tags into a tuple of TagRule Pydantic objects compatible with Hier Config v3. + +**Arguments**: + + `v2_tags (List[Dict[str, Any]])`: A list of dictionaries representing v2-style tags. + +**Returns**: + + `Tuple[TagRule, ...]`: A tuple of TagRule Pydantic objects. + +**Example loading tags from a dictionary**: +```python +from hier_config.utils import load_hconfig_v2_tags + +v3_tags = load_hconfig_v2_tags([ + { + "lineage": [{"startswith": ["ip name-server", "ntp"]}], + "add_tags": "ntp" + } +]) + +print(v3_tags) # Output: (TagRule(match_rules=(MatchRule(equals=None, startswith=('ip name-server', 'ntp'), endswith=None, contains=None, re_search=None),), apply_tags=frozenset({'ntp'})),) +``` + +**Example loading tags from a file**: +```python +from hier_config.utils import load_hconfig_v2_tags_from_file + +v3_tags = load_hconfig_v2_tags("path/to/v2_tags.yml") + +print(v3_tags) +``` \ No newline at end of file diff --git a/hier_config/utils.py b/hier_config/utils.py index 3219ca5..0e6a229 100644 --- a/hier_config/utils.py +++ b/hier_config/utils.py @@ -1,13 +1,56 @@ +from collections.abc import Callable from pathlib import Path +from typing import Any, Optional, Union import yaml from pydantic import TypeAdapter -from hier_config.models import TagRule +from hier_config import Platform, get_hconfig_driver +from hier_config.models import ( + FullTextSubRule, + IdempotentCommandsAvoidRule, + IdempotentCommandsRule, + IndentAdjustRule, + MatchRule, + NegationDefaultWhenRule, + OrderingRule, + ParentAllowsDuplicateChildRule, + PerLineSubRule, + SectionalExitingRule, + SectionalOverwriteNoNegateRule, + SectionalOverwriteRule, + TagRule, +) +from hier_config.platforms.driver_base import HConfigDriverBase +HCONFIG_PLATFORM_V2_TO_V3_MAPPING = { + "ios": Platform.CISCO_IOS, + "iosxe": Platform.CISCO_IOS, + "iosxr": Platform.CISCO_XR, + "nxos": Platform.CISCO_NXOS, + "eos": Platform.ARISTA_EOS, + "junos": Platform.JUNIPER_JUNOS, + "vyos": Platform.VYOS, +} -def load_device_config(file_path: str) -> str: - """Reads a device configuration file and loads its contents into memory. + +def _set_match_rule(lineage: dict[str, Any]) -> Optional[MatchRule]: + if startswith := lineage.get("startswith"): + return MatchRule(startswith=startswith) + if endswith := lineage.get("endswith"): + return MatchRule(endswith=endswith) + if contains := lineage.get("contains"): + return MatchRule(contains=contains) + if equals := lineage.get("equals"): + return MatchRule(equals=equals) + if re_search := lineage.get("re_search"): + return MatchRule(re_search=re_search) + + return None + + +def read_text_from_file(file_path: str) -> str: + """Function that loads the contents of a file into memory. Args: file_path (str): The path to the configuration file. @@ -29,5 +72,249 @@ def load_hier_config_tags(tags_file: str) -> tuple[TagRule, ...]: Tuple[TagRule, ...]: A tuple of validated TagRule objects. """ - tags_data = yaml.safe_load(Path(tags_file).read_text(encoding="utf-8")) + tags_data = yaml.safe_load(read_text_from_file(file_path=tags_file)) return TypeAdapter(tuple[TagRule, ...]).validate_python(tags_data) + + +def hconfig_v2_os_v3_platform_mapper(os_name: str) -> Platform: + """Map a Hier Config v2 operating system name to a v3 Platform enumeration. + + Args: + os_name (str): The name of the OS as defined in Hier Config v2. + + Returns: + Platform: The corresponding Platform enumeration for Hier Config v3. + + Example: + >>> hconfig_v2_os_v3_platform_mapper("CISCO_IOS") + + + """ + return HCONFIG_PLATFORM_V2_TO_V3_MAPPING.get(os_name, Platform.GENERIC) + + +def hconfig_v3_platform_v2_os_mapper(platform: Platform) -> str: + """Map a Hier Config v3 Platform enumeration to a v2 operating system name. + + Args: + platform (Platform): A Platform enumeration from Hier Config v3. + + Returns: + str: The corresponding OS name for Hier Config v2. + + Example: + >>> hconfig_v3_platform_v2_os_mapper(Platform.CISCO_IOS) + "ios" + + """ + for os_name, plat in HCONFIG_PLATFORM_V2_TO_V3_MAPPING.items(): + if plat == platform: + return os_name + + return "generic" + + +def load_hconfig_v2_options( + v2_options: Union[dict[str, Any], str], platform: Platform +) -> HConfigDriverBase: + """Load Hier Config v2 options to v3 driver format from either a dictionary or a file. + + Args: + v2_options (Union[dict, str]): Either a dictionary containing v2 options or + a file path to a YAML file containing the v2 options. + platform (Platform): The Hier Config v3 Platform enum for the target platform. + + Returns: + HConfigDriverBase: A v3 driver instance with the migrated rules. + + """ + # Load options from a file if a string is provided + if isinstance(v2_options, str): + v2_options = yaml.safe_load(read_text_from_file(file_path=v2_options)) + + # Ensure v2_options is a dictionary + if not isinstance(v2_options, dict): + msg = "v2_options must be a dictionary or a valid file path." + raise TypeError(msg) + + driver = get_hconfig_driver(platform) + + def process_rules( + key: str, + rule_class: type[Any], + append_to: Callable[[Any], None], + lineage_key: str = "lineage", + ) -> None: + """Helper to process rules.""" + for rule in v2_options.get(key, ()): + match_rules = [ + match_rule + for lineage in rule.get(lineage_key, []) + if (match_rule := _set_match_rule(lineage)) is not None + ] + append_to(rule_class(match_rules=match_rules)) + + # sectional_overwrite + process_rules( + "sectional_overwrite", + SectionalOverwriteRule, + driver.rules.sectional_overwrite.append, + ) + + # sectional_overwrite_no_negate + process_rules( + "sectional_overwrite_no_negate", + SectionalOverwriteNoNegateRule, + driver.rules.sectional_overwrite_no_negate.append, + ) + + # ordering + for rule in v2_options.get("ordering", ()): + lineage_rules = rule.get("lineage") + match_rules = tuple( + match_rule + for lineage in lineage_rules + if (match_rule := _set_match_rule(lineage)) is not None + ) + weight = rule.get("order", 500) - 500 + + driver.rules.ordering.append( + OrderingRule(match_rules=match_rules, weight=weight), + ) + + # indent_adjust + for rule in v2_options.get("indent_adjust", ()): + start_expression = rule.get("start_expression") + end_expression = rule.get("end_expression") + + driver.rules.indent_adjust.append( + IndentAdjustRule( + start_expression=start_expression, end_expression=end_expression + ) + ) + + # parent_allows_duplicate_child + process_rules( + "parent_allows_duplicate_child", + ParentAllowsDuplicateChildRule, + driver.rules.parent_allows_duplicate_child.append, + ) + + # sectional_exiting + for rule in v2_options.get("sectional_exiting", ()): + lineage_rules = rule.get("lineage") + match_rules = tuple( + match_rule + for lineage in lineage_rules + if (match_rule := _set_match_rule(lineage)) is not None + ) + exit_text = rule.get("exit_text", "") + + driver.rules.sectional_exiting.append( + SectionalExitingRule(match_rules=match_rules, exit_text=exit_text), + ) + + # full_text_sub + for rule in v2_options.get("full_text_sub", ()): + driver.rules.full_text_sub.append( + FullTextSubRule( + search=rule.get("search", ""), replace=rule.get("replace", "") + ) + ) + + # per_line_sub + for rule in v2_options.get("per_line_sub", ()): + driver.rules.per_line_sub.append( + PerLineSubRule( + search=rule.get("search", ""), replace=rule.get("replace", "") + ) + ) + + # idempotent_commands_blacklist -> idempotent_commands_avoid + process_rules( + "idempotent_commands_blacklist", + IdempotentCommandsAvoidRule, + driver.rules.idempotent_commands_avoid.append, + ) + + # idempotent_commands + process_rules( + "idempotent_commands", + IdempotentCommandsRule, + driver.rules.idempotent_commands.append, + ) + + # negation_default_when + process_rules( + "negation_default_when", + NegationDefaultWhenRule, + driver.rules.negation_default_when.append, + ) + + return driver + + +def load_hconfig_v2_options_from_file( + options_file: str, platform: Platform +) -> HConfigDriverBase: + """Load Hier Config v2 options file to v3 driver format. + + Args: + options_file (str): The v2 options file. + platform (Platform): The Hier Config v3 Platform enum for the target platform. + + Returns: + HConfigDriverBase: A v3 driver instance with the migrated rules. + + """ + hconfig_options = yaml.safe_load(read_text_from_file(file_path=options_file)) + return load_hconfig_v2_options(v2_options=hconfig_options, platform=platform) + + +def load_hconfig_v2_tags( + v2_tags: Union[list[dict[str, Any]], str], +) -> Union[tuple["TagRule"], tuple["TagRule", ...]]: + """Convert v2-style tags into v3-style TagRule Pydantic objects for Hier Config. + + Args: + v2_tags (Union[list[dict[str, Any]], str]): + Either a list of dictionaries representing v2-style tags or a file path + to a YAML file containing the v2-style tags. + - If a list is provided, each dictionary should contain: + - `lineage`: A list of dictionaries with rules (e.g., `startswith`, `endswith`). + - `add_tags`: A string representing the tag to add. + - If a file path is provided, it will be read and parsed as YAML. + + Returns: + Tuple[TagRule]: A tuple of TagRule Pydantic objects representing v3-style tags. + + """ + # Load tags from a file if a string is provided + if isinstance(v2_tags, str): + v2_tags = yaml.safe_load(read_text_from_file(file_path=v2_tags)) + + # Ensure v2_tags is a list + if not isinstance(v2_tags, list): + msg = "v2_tags must be a list of dictionaries or a valid file path." + raise TypeError(msg) + + v3_tags: list[TagRule] = [] + + for v2_tag in v2_tags: + if "lineage" in v2_tag and "add_tags" in v2_tag: + # Extract the v2 fields + lineage_rules = v2_tag["lineage"] + tags = v2_tag["add_tags"] + + # Convert to MatchRule objects + match_rules = tuple( + match_rule + for lineage in lineage_rules + if (match_rule := _set_match_rule(lineage)) is not None + ) + + # Create the TagRule object + v3_tag = TagRule(match_rules=match_rules, apply_tags=frozenset([tags])) + v3_tags.append(v3_tag) + + return tuple(v3_tags) diff --git a/mkdocs.yml b/mkdocs.yml index fc8a703..5ba601e 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -14,4 +14,5 @@ nav: - Future Config: future-config.md - JunOS Style Syntax Remediation: junos-style-syntax-remediation.md - Unified Diff: unified-diff.md +- Utilities: utilities.md - Working with Tags: tags.md diff --git a/tests/conftest.py b/tests/conftest.py index acf5d8d..c53c308 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,4 +1,5 @@ from pathlib import Path +from typing import Any import pytest import yaml @@ -37,6 +38,11 @@ def platform_b() -> Platform: return Platform.CISCO_IOS +@pytest.fixture(scope="module") +def platform_generic() -> Platform: + return Platform.GENERIC + + @pytest.fixture(scope="module") def tag_rules_ios() -> tuple[TagRule, ...]: return TypeAdapter(tuple[TagRule, ...]).validate_python( @@ -69,6 +75,41 @@ def remediation_config_flat_junos() -> str: return _fixture_file_read("remediation_config_flat_junos.conf") +@pytest.fixture(scope="module") +def tags_file_path() -> str: + return "./tests/fixtures/tag_rules_ios.yml" + + +@pytest.fixture(scope="module") +def v2_options() -> dict[str, Any]: + return { + "negation": "no", + "sectional_overwrite": [{"lineage": [{"startswith": "template"}]}], + "sectional_overwrite_no_negate": [{"lineage": [{"startswith": "as-path-set"}]}], + "ordering": [{"lineage": [{"startswith": "ntp"}], "order": 700}], + "indent_adjust": [ + {"start_expression": "^\\s*template", "end_expression": "^\\s*end-template"} + ], + "parent_allows_duplicate_child": [ + {"lineage": [{"startswith": "route-policy"}]} + ], + "sectional_exiting": [ + {"lineage": [{"startswith": "router bgp"}], "exit_text": "exit"} + ], + "full_text_sub": [{"search": "banner motd # replace me #", "replace": ""}], + "per_line_sub": [{"search": "^!.*Generated.*$", "replace": ""}], + "idempotent_commands_blacklist": [ + { + "lineage": [ + {"startswith": "interface"}, + {"re_search": "ip address.*secondary"}, + ] + } + ], + "idempotent_commands": [{"lineage": [{"startswith": "interface"}]}], + } + + def _fixture_file_read(filename: str) -> str: return str( Path(__file__) diff --git a/tests/test_utils.py b/tests/test_utils.py index 91682b7..16eb819 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -1,13 +1,23 @@ from pathlib import Path +from typing import Any, Union import pytest import yaml from pydantic import ValidationError -from hier_config.models import TagRule -from hier_config.utils import load_device_config, load_hier_config_tags - -TAGS_FILE_PATH = "./tests/fixtures/tag_rules_ios.yml" +from hier_config import Platform +from hier_config.models import ( + MatchRule, + TagRule, +) +from hier_config.utils import ( + hconfig_v2_os_v3_platform_mapper, + hconfig_v3_platform_v2_os_mapper, + load_hconfig_v2_options, + load_hconfig_v2_tags, + load_hier_config_tags, + read_text_from_file, +) @pytest.fixture @@ -18,31 +28,31 @@ def temporary_file_fixture(tmp_path: Path) -> tuple[Path, str]: return file_path, content -def test_load_device_config_success(temporary_file_fixture: tuple[Path, str]) -> None: +def test_read_text_from_file_success(temporary_file_fixture: tuple[Path, str]) -> None: """Test that the function successfully loads a valid configuration file.""" # pylint: disable=redefined-outer-name file_path, expected_content = temporary_file_fixture - result = load_device_config(str(file_path)) + result = read_text_from_file(str(file_path)) assert result == expected_content, "File content should match expected content." -def test_load_device_config_file_not_found() -> None: +def test_read_text_from_file_file_not_found() -> None: """Test that the function raises FileNotFoundError when the file does not exist.""" with pytest.raises(FileNotFoundError): - load_device_config("non_existent_file.conf") + read_text_from_file("non_existent_file.conf") -def test_load_device_config_empty_file(tmp_path: Path) -> None: +def test_read_text_from_file_empty_file(tmp_path: Path) -> None: """Test that the function correctly handles an empty configuration file.""" empty_file = tmp_path / "empty.conf" empty_file.write_text("") - result = load_device_config(str(empty_file)) + result = read_text_from_file(str(empty_file)) assert not result, "Empty file should return an empty string." -def test_load_hier_config_tags_success() -> None: +def test_load_hier_config_tags_success(tags_file_path: str) -> None: """Test that valid tags from the tag_rules_ios.yml file load and validate successfully.""" - result = load_hier_config_tags(TAGS_FILE_PATH) + result = load_hier_config_tags(tags_file_path) assert isinstance(result, tuple), "Result should be a tuple of TagRule objects." assert len(result) == 4, "There should be four TagRule objects." @@ -81,3 +91,254 @@ def test_load_hier_config_tags_empty_file(tmp_path: Path) -> None: with pytest.raises(ValidationError): load_hier_config_tags(str(empty_file)) + + +def test_hconfig_v2_os_v3_platform_mapper() -> None: + # Valid mappings + assert hconfig_v2_os_v3_platform_mapper("ios") == Platform.CISCO_IOS + assert hconfig_v2_os_v3_platform_mapper("nxos") == Platform.CISCO_NXOS + assert hconfig_v2_os_v3_platform_mapper("junos") == Platform.JUNIPER_JUNOS + assert hconfig_v2_os_v3_platform_mapper("invalid") == Platform.GENERIC + + +def test_hconfig_v3_platform_v2_os_mapper() -> None: + # Valid mappings + assert hconfig_v3_platform_v2_os_mapper(Platform.CISCO_IOS) == "ios" + assert hconfig_v3_platform_v2_os_mapper(Platform.CISCO_NXOS) == "nxos" + assert hconfig_v3_platform_v2_os_mapper(Platform.JUNIPER_JUNOS) == "junos" + assert hconfig_v3_platform_v2_os_mapper(Platform.GENERIC) == "generic" + + +def test_load_hconfig_v2_options( + platform_generic: Platform, v2_options: dict[str, Any] +) -> None: + # pylint: disable=redefined-outer-name, unused-argument + platform = platform_generic + + driver = load_hconfig_v2_options(v2_options, platform) + + # Assert sectional overwrite + assert len(driver.rules.sectional_overwrite) == 1 + assert driver.rules.sectional_overwrite[0].match_rules[0].startswith == "template" + + # Assert sectional overwrite no negate + assert len(driver.rules.sectional_overwrite_no_negate) == 1 + assert ( + driver.rules.sectional_overwrite_no_negate[0].match_rules[0].startswith + == "as-path-set" + ) + + # Assert ordering rule + assert len(driver.rules.ordering) == 1 + assert driver.rules.ordering[0].match_rules[0].startswith == "ntp" + assert driver.rules.ordering[0].weight == 200 + + # Assert indent adjust + assert len(driver.rules.indent_adjust) == 1 + assert driver.rules.indent_adjust[0].start_expression == "^\\s*template" + assert driver.rules.indent_adjust[0].end_expression == "^\\s*end-template" + + # Assert parent_allows_duplicate_child + assert len(driver.rules.parent_allows_duplicate_child) == 1 + assert ( + driver.rules.parent_allows_duplicate_child[0].match_rules[0].startswith + == "route-policy" + ) + + # Assert sectional exiting + assert len(driver.rules.sectional_exiting) == 1 + assert driver.rules.sectional_exiting[0].match_rules[0].startswith == "router bgp" + assert driver.rules.sectional_exiting[0].exit_text == "exit" + + # Assert per-line substitution + assert len(driver.rules.per_line_sub) == 1 + assert driver.rules.per_line_sub[0].search == "^!.*Generated.*$" + assert not driver.rules.per_line_sub[0].replace + + # Assert full-text substitution + assert len(driver.rules.full_text_sub) == 1 + assert driver.rules.full_text_sub[0].search == "banner motd # replace me #" + assert not driver.rules.full_text_sub[0].replace + + # Assert idempotent commands avoid (blacklist) + assert len(driver.rules.idempotent_commands_avoid) == 1 + assert ( + driver.rules.idempotent_commands_avoid[0].match_rules[0].startswith + == "interface" + ) + assert ( + driver.rules.idempotent_commands_avoid[0].match_rules[1].re_search + == "ip address.*secondary" + ) + + # Assert idempotent commands + assert len(driver.rules.idempotent_commands) == 1 + assert driver.rules.idempotent_commands[0].match_rules[0].startswith == "interface" + + +def test_load_hconfig_v2_tags_valid_input() -> None: + v2_tags = [ + { + "lineage": [ + {"startswith": ["ip name-server", "no ip name-server", "ntp", "no ntp"]} + ], + "add_tags": "ntp", + }, + { + "lineage": [{"startswith": ["router bgp", "address-family ipv4"]}], + "add_tags": "bgp", + }, + ] + + expected_output = ( + TagRule( + match_rules=( + MatchRule( + startswith=("ip name-server", "no ip name-server", "ntp", "no ntp") + ), + ), + apply_tags=frozenset(["ntp"]), + ), + TagRule( + match_rules=(MatchRule(startswith=("router bgp", "address-family ipv4")),), + apply_tags=frozenset(["bgp"]), + ), + ) + + result = load_hconfig_v2_tags(v2_tags) + assert result == expected_output + + +def test_load_hconfig_v2_tags_empty_input() -> None: + v2_tags: list[dict[str, Any]] = [] + + expected_output = () + + result = load_hconfig_v2_tags(v2_tags) + assert result == expected_output + + +def test_load_hconfig_v2_tags_multiple_lineage_fields() -> None: + v2_tags = [ + { + "lineage": [ + {"startswith": ["ip name-server"]}, + {"endswith": ["version 2"]}, + ], + "add_tags": "ntp", + } + ] + + expected_output = ( + TagRule( + match_rules=( + MatchRule(startswith=("ip name-server",)), + MatchRule(endswith=("version 2",)), + ), + apply_tags=frozenset(["ntp"]), + ), + ) + + result = load_hconfig_v2_tags(v2_tags) + assert result == expected_output + + +def test_load_hconfig_v2_tags_empty_lineage() -> None: + v2_tags: list[dict[str, Union[str, list[str]]]] = [ + { + "lineage": [], + "add_tags": "empty", + } + ] + + expected_output = (TagRule(match_rules=(), apply_tags=frozenset(["empty"])),) + + result = load_hconfig_v2_tags(v2_tags) + assert result == expected_output + + +def test_load_hconfig_v2_options_from_file_valid(tmp_path: Path) -> None: + """Test loading valid v2 options from a YAML file.""" + file_path = tmp_path / "v2_options.yml" + file_content = """ordering: + - lineage: + - startswith: ntp + order: 700 +sectional_overwrite: + - lineage: + - startswith: template +indent_adjust: + - start_expression: "start expression" + end_expression: "end expression" +""" + file_path.write_text(file_content) + + platform = Platform.GENERIC + driver = load_hconfig_v2_options(v2_options=str(file_path), platform=platform) + + assert len(driver.rules.ordering) == 1 + assert driver.rules.ordering[0].match_rules[0].startswith == "ntp" + assert driver.rules.ordering[0].weight == 200 + + assert len(driver.rules.sectional_overwrite) == 1 + assert driver.rules.sectional_overwrite[0].match_rules[0].startswith == "template" + + assert len(driver.rules.indent_adjust) == 1 + assert driver.rules.indent_adjust[0].start_expression == "start expression" + assert driver.rules.indent_adjust[0].end_expression == "end expression" + + +def test_load_hconfig_v2_options_from_file_invalid_yaml(tmp_path: Path) -> None: + """Test loading v2 options from a file with invalid YAML syntax.""" + file_path = tmp_path / "invalid_v2_options.yml" + file_content = """ordering: + - lineage: + - startswith: ntp + order: # Missing value causes a syntax error +""" + file_path.write_text(file_content) + + platform = Platform.GENERIC + with pytest.raises(TypeError): + load_hconfig_v2_options(v2_options=str(file_path), platform=platform) + + +def test_load_hconfig_v2_tags_from_file_valid(tmp_path: Path) -> None: + """Test loading valid v2 tags from a YAML file.""" + file_path = tmp_path / "v2_tags.yml" + file_content = """- lineage: + - startswith: ip name-server + add_tags: dns +- lineage: + - startswith: router bgp + add_tags: bgp +""" + file_path.write_text(file_content) + + result = load_hconfig_v2_tags(v2_tags=str(file_path)) + + assert len(result) == 2 + assert result[0].apply_tags == frozenset(["dns"]) + assert result[1].apply_tags == frozenset(["bgp"]) + + +def test_load_hconfig_v2_tags_from_file_invalid_yaml(tmp_path: Path) -> None: + """Test loading v2 tags from a file with invalid YAML syntax.""" + file_path = tmp_path / "invalid_v2_tags.yml" + file_content = """- lineage: + - startswith: ip name-server + add_tags dns # Missing colon causes a syntax error +""" + file_path.write_text(file_content) + + with pytest.raises(yaml.YAMLError): + load_hconfig_v2_tags(v2_tags=str(file_path)) + + +def test_load_hconfig_v2_tags_from_file_empty_file(tmp_path: Path) -> None: + """Test loading v2 tags from an empty file.""" + file_path = tmp_path / "empty_v2_tags.yml" + file_path.write_text("") + + with pytest.raises(TypeError): + load_hconfig_v2_tags(v2_tags=str(file_path))