Skip to content
61 changes: 34 additions & 27 deletions hier_config/constructors.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from .platforms.cisco_nxos.view import HConfigViewCiscoNXOS
from .platforms.cisco_xr.driver import HConfigDriverCiscoIOSXR
from .platforms.cisco_xr.view import HConfigViewCiscoIOSXR
from .platforms.fortinet_fortios.driver import HConfigDriverFortinetFortiOS
from .platforms.generic.driver import HConfigDriverGeneric
from .platforms.hp_comware5.driver import HConfigDriverHPComware5
from .platforms.hp_procurve.driver import HConfigDriverHPProcurve
Expand All @@ -29,29 +30,27 @@
logger = getLogger(__name__)


def get_hconfig_driver(platform: Platform) -> HConfigDriverBase: # noqa: PLR0911
def get_hconfig_driver(platform: Platform) -> HConfigDriverBase:
"""Create base options on an OS level."""
if platform == Platform.ARISTA_EOS:
return HConfigDriverAristaEOS()
if platform == Platform.CISCO_IOS:
return HConfigDriverCiscoIOS()
if platform == Platform.CISCO_NXOS:
return HConfigDriverCiscoNXOS()
if platform == Platform.CISCO_XR:
return HConfigDriverCiscoIOSXR()
if platform == Platform.GENERIC:
return HConfigDriverGeneric()
if platform == Platform.HP_PROCURVE:
return HConfigDriverHPProcurve()
if platform == Platform.HP_COMWARE5:
return HConfigDriverHPComware5()
if platform == Platform.JUNIPER_JUNOS:
return HConfigDriverJuniperJUNOS()
if platform == Platform.VYOS:
return HConfigDriverVYOS()

message = f"Unsupported platform: {platform}" # type: ignore[unreachable]
raise ValueError(message)
platform_drivers: dict[Platform, type[HConfigDriverBase]] = {
Platform.ARISTA_EOS: HConfigDriverAristaEOS,
Platform.CISCO_IOS: HConfigDriverCiscoIOS,
Platform.CISCO_NXOS: HConfigDriverCiscoNXOS,
Platform.CISCO_XR: HConfigDriverCiscoIOSXR,
Platform.FORTINET_FORTIOS: HConfigDriverFortinetFortiOS,
Platform.GENERIC: HConfigDriverGeneric,
Platform.HP_PROCURVE: HConfigDriverHPProcurve,
Platform.HP_COMWARE5: HConfigDriverHPComware5,
Platform.JUNIPER_JUNOS: HConfigDriverJuniperJUNOS,
Platform.VYOS: HConfigDriverVYOS,
}
driver_cls = platform_drivers.get(platform)

if driver_cls is None:
message = f"Unsupported platform: {platform}"
raise ValueError(message)

return driver_cls()


def get_hconfig_view(config: HConfig) -> HConfigViewBase:
Expand Down Expand Up @@ -143,17 +142,25 @@ def get_hconfig_fast_load(
current_section: Union[HConfig, HConfigChild] = config
most_recent_item: Union[HConfig, HConfigChild] = current_section

for line in lines:
if not (line_lstripped := line.lstrip()):
for original_line in lines:
if not (line_lstripped := original_line.lstrip()):
continue

# Apply per_line_sub rules before processing
processed_line = original_line
for rule in driver.rules.per_line_sub:
processed_line = sub(rule.search, rule.replace, processed_line)

if not (line_lstripped := processed_line.lstrip()):
continue
indent = len(line) - len(line_lstripped)
indent = len(processed_line) - len(line_lstripped)

# Determine parent in hierarchy
most_recent_item, current_section = _analyze_indent(
most_recent_item,
current_section,
indent,
" ".join(line.split()),
" ".join(processed_line.split()),
)

for child in tuple(config.all_children()):
Expand Down Expand Up @@ -272,7 +279,7 @@ def _load_from_string_lines(config: HConfig, config_text: str) -> None: # noqa:
if not line:
continue

# Determine indentation level
# Determine indentation level (after per_line_sub rules are applied)
this_indent = len(line) - len(line.lstrip()) + indent_adjust

line = line.lstrip() # noqa: PLW2901
Expand Down
1 change: 1 addition & 0 deletions hier_config/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ class Platform(str, Enum):
CISCO_IOS = auto()
CISCO_NXOS = auto()
CISCO_XR = auto()
FORTINET_FORTIOS = auto()
GENERIC = auto() # used in cases where the specific platform is unimportant/unknown
HP_COMWARE5 = auto()
HP_PROCURVE = auto()
Expand Down
Empty file.
73 changes: 73 additions & 0 deletions hier_config/platforms/fortinet_fortios/driver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
from collections.abc import Iterable
from typing import Optional

from hier_config.child import HConfigChild
from hier_config.models import (
MatchRule,
ParentAllowsDuplicateChildRule,
PerLineSubRule,
SectionalExitingRule,
)
from hier_config.platforms.driver_base import HConfigDriverBase, HConfigDriverRules


class HConfigDriverFortinetFortiOS(HConfigDriverBase):
"""Driver for Fortinet FortiOS."""

@staticmethod
def _instantiate_rules() -> HConfigDriverRules:
return HConfigDriverRules(
sectional_exiting=[
SectionalExitingRule(
match_rules=(MatchRule(startswith="config "),), exit_text="end"
),
SectionalExitingRule(
match_rules=(
MatchRule(startswith="config "),
MatchRule(startswith="edit "),
),
exit_text="next",
),
],
parent_allows_duplicate_child=[
ParentAllowsDuplicateChildRule(
match_rules=(MatchRule(startswith="config"),)
),
],
per_line_sub=[
PerLineSubRule(search="^end$", replace=" end"),
PerLineSubRule(search="^next$", replace=" next"),
],
)

def swap_negation(self, child: HConfigChild) -> HConfigChild:
"""Swap negation of a `self.text`."""
if child.text.startswith(self.negation_prefix):
child.text = f"{self.declaration_prefix}{child.text_without_negation}"
elif child.text.startswith(self.declaration_prefix):
child.text = f"{self.negation_prefix}{child.text.removeprefix(self.declaration_prefix).split()[0]}"

return child

def idempotent_for(
self, config: HConfigChild, other_children: Iterable[HConfigChild]
) -> Optional[HConfigChild]:
"""Override idempotent_for to only consider a config idempotent
if the same command exists in the other set.
"""
for other_child in other_children:
if (
config.text.startswith(self.declaration_prefix)
and other_child.text.startswith(self.declaration_prefix)
and config.text.split()[1] == other_child.text.split()[1]
):
return other_child
return super().idempotent_for(config, other_children)

@property
def negation_prefix(self) -> str:
return "unset "

@property
def declaration_prefix(self) -> str:
return "set "
46 changes: 46 additions & 0 deletions tests/test_driver_cisco_xr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
from hier_config import get_hconfig_fast_load
from hier_config.models import Platform


def test_duplicate_child() -> None:
platform = Platform.CISCO_XR
running_config = get_hconfig_fast_load(
platform,
(
"route-policy SET_COMMUNITY_AND_PERMIT",
" if destination in (192.0.2.0/24, 198.51.100.0/24) then",
" set community (65001:100) additive",
" pass",
" else",
" drop",
" endif",
"end-policy",
"",
"route-policy SET_LOCAL_PREF_AND_PASS",
" if destination in (203.0.113.0/24) then",
" set local-preference 200",
" pass",
" else",
" drop",
" endif",
"end-policy",
),
)
generated_config = get_hconfig_fast_load(
platform,
(
"route-policy SET_COMMUNITY_AND_PERMIT",
" if destination in (192.0.2.0/24, 198.51.100.0/24) then",
" set community (65001:100) additive",
" pass",
" else",
" drop",
" endif",
"end-policy",
"",
),
)
remediation_config = running_config.config_to_get_to(generated_config)
assert remediation_config.dump_simple(sectional_exiting=True) == (
"no route-policy SET_LOCAL_PREF_AND_PASS",
)
109 changes: 109 additions & 0 deletions tests/test_driver_fortinet_fortios.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
from hier_config import get_hconfig_fast_load
from hier_config.constructors import get_hconfig
from hier_config.models import Platform


def test_swap_negation() -> None:
platform = Platform.FORTINET_FORTIOS
running_config = get_hconfig_fast_load(
platform,
(
"config system interface",
" edit port1",
" set description 'Port 1'",
" set status down",
" next",
"end",
"config system dns",
" set primary 192.0.2.1",
" set secondary 192.0.2.2",
"end",
),
)
generated_config = get_hconfig_fast_load(
platform,
(
"config system interface",
" edit port1",
" set status down",
" next",
"end",
"config system dns",
" set primary 192.0.2.1",
"end",
),
)
remediation_config = running_config.config_to_get_to(generated_config)
assert remediation_config.dump_simple(sectional_exiting=True) == (
"config system interface",
" edit port1",
" unset description",
" next",
" end",
"config system dns",
" unset secondary",
" end",
)


def test_idempotent_for() -> None:
platform = Platform.FORTINET_FORTIOS
running_config = get_hconfig_fast_load(
platform,
(
"config system interface",
" edit port1",
" set description 'Old Description'",
" set status up",
" next",
"end",
"config system dns",
" set primary 192.0.2.1",
" set secondary 192.0.2.2",
"end",
),
)
generated_config = get_hconfig_fast_load(
platform,
(
"config system interface",
" edit port1",
" set description 'New Description'",
" set status up",
" next",
"end",
"config system dns",
" set primary 192.0.2.1",
" set secondary 192.0.2.3",
"end",
),
)
remediation_config = running_config.config_to_get_to(generated_config)
assert remediation_config.dump_simple(sectional_exiting=True) == (
"config system interface",
" edit port1",
" set description 'New Description'",
" next",
" end",
"config system dns",
" set secondary 192.0.2.3",
" end",
)


def test_future() -> None:
platform = Platform.FORTINET_FORTIOS
running_config = get_hconfig(platform)
remediation_config = get_hconfig_fast_load(
platform,
(
"config system interface",
" edit port2",
" set description 'Port 2'",
" set status up",
" next",
"end",
),
)
future_config = running_config.future(remediation_config)
assert not tuple(remediation_config.unified_diff(future_config))