diff --git a/hier_config/constructors.py b/hier_config/constructors.py index a7df5c8..f80a716 100644 --- a/hier_config/constructors.py +++ b/hier_config/constructors.py @@ -17,6 +17,7 @@ from .platforms.cisco_nxos.view import HConfigViewCiscoNXOS from .platforms.cisco_xr.driver import HConfigDriverCiscoIOSXR from .platforms.cisco_xr.view import HConfigViewCiscoIOSXR +from .platforms.fortinet_fortios.driver import HConfigDriverFortinetFortiOS from .platforms.generic.driver import HConfigDriverGeneric from .platforms.hp_comware5.driver import HConfigDriverHPComware5 from .platforms.hp_procurve.driver import HConfigDriverHPProcurve @@ -29,29 +30,27 @@ logger = getLogger(__name__) -def get_hconfig_driver(platform: Platform) -> HConfigDriverBase: # noqa: PLR0911 +def get_hconfig_driver(platform: Platform) -> HConfigDriverBase: """Create base options on an OS level.""" - if platform == Platform.ARISTA_EOS: - return HConfigDriverAristaEOS() - if platform == Platform.CISCO_IOS: - return HConfigDriverCiscoIOS() - if platform == Platform.CISCO_NXOS: - return HConfigDriverCiscoNXOS() - if platform == Platform.CISCO_XR: - return HConfigDriverCiscoIOSXR() - if platform == Platform.GENERIC: - return HConfigDriverGeneric() - if platform == Platform.HP_PROCURVE: - return HConfigDriverHPProcurve() - if platform == Platform.HP_COMWARE5: - return HConfigDriverHPComware5() - if platform == Platform.JUNIPER_JUNOS: - return HConfigDriverJuniperJUNOS() - if platform == Platform.VYOS: - return HConfigDriverVYOS() - - message = f"Unsupported platform: {platform}" # type: ignore[unreachable] - raise ValueError(message) + platform_drivers: dict[Platform, type[HConfigDriverBase]] = { + Platform.ARISTA_EOS: HConfigDriverAristaEOS, + Platform.CISCO_IOS: HConfigDriverCiscoIOS, + Platform.CISCO_NXOS: HConfigDriverCiscoNXOS, + Platform.CISCO_XR: HConfigDriverCiscoIOSXR, + Platform.FORTINET_FORTIOS: HConfigDriverFortinetFortiOS, + Platform.GENERIC: HConfigDriverGeneric, + Platform.HP_PROCURVE: HConfigDriverHPProcurve, + Platform.HP_COMWARE5: HConfigDriverHPComware5, + Platform.JUNIPER_JUNOS: HConfigDriverJuniperJUNOS, + Platform.VYOS: HConfigDriverVYOS, + } + driver_cls = platform_drivers.get(platform) + + if driver_cls is None: + message = f"Unsupported platform: {platform}" + raise ValueError(message) + + return driver_cls() def get_hconfig_view(config: HConfig) -> HConfigViewBase: @@ -143,17 +142,25 @@ def get_hconfig_fast_load( current_section: Union[HConfig, HConfigChild] = config most_recent_item: Union[HConfig, HConfigChild] = current_section - for line in lines: - if not (line_lstripped := line.lstrip()): + for original_line in lines: + if not (line_lstripped := original_line.lstrip()): + continue + + # Apply per_line_sub rules before processing + processed_line = original_line + for rule in driver.rules.per_line_sub: + processed_line = sub(rule.search, rule.replace, processed_line) + + if not (line_lstripped := processed_line.lstrip()): continue - indent = len(line) - len(line_lstripped) + indent = len(processed_line) - len(line_lstripped) # Determine parent in hierarchy most_recent_item, current_section = _analyze_indent( most_recent_item, current_section, indent, - " ".join(line.split()), + " ".join(processed_line.split()), ) for child in tuple(config.all_children()): @@ -272,7 +279,7 @@ def _load_from_string_lines(config: HConfig, config_text: str) -> None: # noqa: if not line: continue - # Determine indentation level + # Determine indentation level (after per_line_sub rules are applied) this_indent = len(line) - len(line.lstrip()) + indent_adjust line = line.lstrip() # noqa: PLW2901 diff --git a/hier_config/models.py b/hier_config/models.py index fe1fd22..d3c4c20 100644 --- a/hier_config/models.py +++ b/hier_config/models.py @@ -100,6 +100,7 @@ class Platform(str, Enum): CISCO_IOS = auto() CISCO_NXOS = auto() CISCO_XR = auto() + FORTINET_FORTIOS = auto() GENERIC = auto() # used in cases where the specific platform is unimportant/unknown HP_COMWARE5 = auto() HP_PROCURVE = auto() diff --git a/hier_config/platforms/fortinet_fortios/__init__.py b/hier_config/platforms/fortinet_fortios/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/hier_config/platforms/fortinet_fortios/driver.py b/hier_config/platforms/fortinet_fortios/driver.py new file mode 100644 index 0000000..b77ad9d --- /dev/null +++ b/hier_config/platforms/fortinet_fortios/driver.py @@ -0,0 +1,73 @@ +from collections.abc import Iterable +from typing import Optional + +from hier_config.child import HConfigChild +from hier_config.models import ( + MatchRule, + ParentAllowsDuplicateChildRule, + PerLineSubRule, + SectionalExitingRule, +) +from hier_config.platforms.driver_base import HConfigDriverBase, HConfigDriverRules + + +class HConfigDriverFortinetFortiOS(HConfigDriverBase): + """Driver for Fortinet FortiOS.""" + + @staticmethod + def _instantiate_rules() -> HConfigDriverRules: + return HConfigDriverRules( + sectional_exiting=[ + SectionalExitingRule( + match_rules=(MatchRule(startswith="config "),), exit_text="end" + ), + SectionalExitingRule( + match_rules=( + MatchRule(startswith="config "), + MatchRule(startswith="edit "), + ), + exit_text="next", + ), + ], + parent_allows_duplicate_child=[ + ParentAllowsDuplicateChildRule( + match_rules=(MatchRule(startswith="config"),) + ), + ], + per_line_sub=[ + PerLineSubRule(search="^end$", replace=" end"), + PerLineSubRule(search="^next$", replace=" next"), + ], + ) + + def swap_negation(self, child: HConfigChild) -> HConfigChild: + """Swap negation of a `self.text`.""" + if child.text.startswith(self.negation_prefix): + child.text = f"{self.declaration_prefix}{child.text_without_negation}" + elif child.text.startswith(self.declaration_prefix): + child.text = f"{self.negation_prefix}{child.text.removeprefix(self.declaration_prefix).split()[0]}" + + return child + + def idempotent_for( + self, config: HConfigChild, other_children: Iterable[HConfigChild] + ) -> Optional[HConfigChild]: + """Override idempotent_for to only consider a config idempotent + if the same command exists in the other set. + """ + for other_child in other_children: + if ( + config.text.startswith(self.declaration_prefix) + and other_child.text.startswith(self.declaration_prefix) + and config.text.split()[1] == other_child.text.split()[1] + ): + return other_child + return super().idempotent_for(config, other_children) + + @property + def negation_prefix(self) -> str: + return "unset " + + @property + def declaration_prefix(self) -> str: + return "set " diff --git a/tests/test_driver_cisco_xr.py b/tests/test_driver_cisco_xr.py new file mode 100644 index 0000000..2be2e6d --- /dev/null +++ b/tests/test_driver_cisco_xr.py @@ -0,0 +1,46 @@ +from hier_config import get_hconfig_fast_load +from hier_config.models import Platform + + +def test_duplicate_child() -> None: + platform = Platform.CISCO_XR + running_config = get_hconfig_fast_load( + platform, + ( + "route-policy SET_COMMUNITY_AND_PERMIT", + " if destination in (192.0.2.0/24, 198.51.100.0/24) then", + " set community (65001:100) additive", + " pass", + " else", + " drop", + " endif", + "end-policy", + "", + "route-policy SET_LOCAL_PREF_AND_PASS", + " if destination in (203.0.113.0/24) then", + " set local-preference 200", + " pass", + " else", + " drop", + " endif", + "end-policy", + ), + ) + generated_config = get_hconfig_fast_load( + platform, + ( + "route-policy SET_COMMUNITY_AND_PERMIT", + " if destination in (192.0.2.0/24, 198.51.100.0/24) then", + " set community (65001:100) additive", + " pass", + " else", + " drop", + " endif", + "end-policy", + "", + ), + ) + remediation_config = running_config.config_to_get_to(generated_config) + assert remediation_config.dump_simple(sectional_exiting=True) == ( + "no route-policy SET_LOCAL_PREF_AND_PASS", + ) diff --git a/tests/test_driver_fortinet_fortios.py b/tests/test_driver_fortinet_fortios.py new file mode 100644 index 0000000..d376c95 --- /dev/null +++ b/tests/test_driver_fortinet_fortios.py @@ -0,0 +1,109 @@ +from hier_config import get_hconfig_fast_load +from hier_config.constructors import get_hconfig +from hier_config.models import Platform + + +def test_swap_negation() -> None: + platform = Platform.FORTINET_FORTIOS + running_config = get_hconfig_fast_load( + platform, + ( + "config system interface", + " edit port1", + " set description 'Port 1'", + " set status down", + " next", + "end", + "config system dns", + " set primary 192.0.2.1", + " set secondary 192.0.2.2", + "end", + ), + ) + generated_config = get_hconfig_fast_load( + platform, + ( + "config system interface", + " edit port1", + " set status down", + " next", + "end", + "config system dns", + " set primary 192.0.2.1", + "end", + ), + ) + remediation_config = running_config.config_to_get_to(generated_config) + assert remediation_config.dump_simple(sectional_exiting=True) == ( + "config system interface", + " edit port1", + " unset description", + " next", + " end", + "config system dns", + " unset secondary", + " end", + ) + + +def test_idempotent_for() -> None: + platform = Platform.FORTINET_FORTIOS + running_config = get_hconfig_fast_load( + platform, + ( + "config system interface", + " edit port1", + " set description 'Old Description'", + " set status up", + " next", + "end", + "config system dns", + " set primary 192.0.2.1", + " set secondary 192.0.2.2", + "end", + ), + ) + generated_config = get_hconfig_fast_load( + platform, + ( + "config system interface", + " edit port1", + " set description 'New Description'", + " set status up", + " next", + "end", + "config system dns", + " set primary 192.0.2.1", + " set secondary 192.0.2.3", + "end", + ), + ) + remediation_config = running_config.config_to_get_to(generated_config) + assert remediation_config.dump_simple(sectional_exiting=True) == ( + "config system interface", + " edit port1", + " set description 'New Description'", + " next", + " end", + "config system dns", + " set secondary 192.0.2.3", + " end", + ) + + +def test_future() -> None: + platform = Platform.FORTINET_FORTIOS + running_config = get_hconfig(platform) + remediation_config = get_hconfig_fast_load( + platform, + ( + "config system interface", + " edit port2", + " set description 'Port 2'", + " set status up", + " next", + "end", + ), + ) + future_config = running_config.future(remediation_config) + assert not tuple(remediation_config.unified_diff(future_config))