Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions plugins/module_utils/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,11 @@
"restart",
"delete",
"update",
"merged",
"replaced",
"overridden",
"deleted",
"gathered",
)

INTERFACE_FLOW_RULES_TYPES_MAPPING = {"port_channel": "PORTCHANNEL", "physical": "PHYSICAL", "l3out_sub_interface": "L3_SUBIF", "l3out_svi": "SVI"}
Expand All @@ -168,3 +173,12 @@
ND_REST_KEYS_TO_SANITIZE = ["metadata"]

ND_SETUP_NODE_DEPLOYMENT_TYPE = {"physical": "cimc", "virtual": "vnode"}

USER_ROLES_MAPPING = {
"fabric_admin": "fabric-admin",
"observer": "observer",
"super_admin": "super-admin",
"support_engineer": "support-engineer",
"approver": "approver",
"designer": "designer",
}
74 changes: 23 additions & 51 deletions plugins/module_utils/nd.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
from ansible.module_utils.basic import json
from ansible.module_utils.basic import env_fallback
from ansible.module_utils.six import PY3
from ansible.module_utils.six.moves import filterfalse
from ansible.module_utils.six.moves.urllib.parse import urlencode
from ansible.module_utils._text import to_native, to_text
from ansible.module_utils.connection import Connection
Expand Down Expand Up @@ -73,53 +72,27 @@ def cmp(a, b):


def issubset(subset, superset):
"""Recurse through nested dictionary and compare entries"""
"""Recurse through a nested dictionary and check if it is a subset of another."""

# Both objects are the same object
if subset is superset:
return True

# Both objects are identical
if subset == superset:
return True

# Both objects have a different type
if isinstance(subset) is not isinstance(superset):
if type(subset) is not type(superset):
return False

if not isinstance(subset, dict):
if isinstance(subset, list):
return all(item in superset for item in subset)
return subset == superset

for key, value in subset.items():
# Ignore empty values
if value is None:
return True
continue

# Item from subset is missing from superset
if key not in superset:
return False

# Item has different types in subset and superset
if isinstance(superset.get(key)) is not isinstance(value):
return False
superset_value = superset.get(key)

# Compare if item values are subset
if isinstance(value, dict):
if not issubset(superset.get(key), value):
return False
elif isinstance(value, list):
try:
# NOTE: Fails for lists of dicts
if not set(value) <= set(superset.get(key)):
return False
except TypeError:
# Fall back to exact comparison for lists of dicts
diff = list(filterfalse(lambda i: i in value, superset.get(key))) + list(filterfalse(lambda j: j in superset.get(key), value))
if diff:
return False
elif isinstance(value, set):
if not value <= superset.get(key):
return False
else:
if not value == superset.get(key):
return False
if not issubset(value, superset_value):
return False

return True

Expand Down Expand Up @@ -252,7 +225,7 @@ def request(
if file is not None:
info = conn.send_file_request(method, uri, file, data, None, file_key, file_ext)
else:
if data:
if data is not None:
info = conn.send_request(method, uri, json.dumps(data))
else:
info = conn.send_request(method, uri)
Expand Down Expand Up @@ -310,6 +283,8 @@ def request(
self.fail_json(msg="ND Error: {0}".format(self.error.get("message")), data=data, info=info)
self.error = payload
if "code" in payload:
if self.status == 404 and ignore_not_found_error:
return {}
self.fail_json(msg="ND Error {code}: {message}".format(**payload), data=data, info=info, payload=payload)
elif "messages" in payload and len(payload.get("messages")) > 0:
self.fail_json(msg="ND Error {code} ({severity}): {message}".format(**payload["messages"][0]), data=data, info=info, payload=payload)
Expand Down Expand Up @@ -506,30 +481,27 @@ def get_diff(self, unwanted=None):
if not self.existing and self.sent:
return True

existing = self.existing
sent = self.sent
existing = deepcopy(self.existing)
sent = deepcopy(self.sent)

for key in unwanted:
if isinstance(key, str):
if key in existing:
try:
del existing[key]
except KeyError:
pass
try:
del sent[key]
except KeyError:
pass
del existing[key]
if key in sent:
del sent[key]
elif isinstance(key, list):
key_path, last = key[:-1], key[-1]
try:
existing_parent = reduce(dict.get, key_path, existing)
del existing_parent[last]
if existing_parent is not None:
del existing_parent[last]
except KeyError:
pass
try:
sent_parent = reduce(dict.get, key_path, sent)
del sent_parent[last]
if sent_parent is not None:
del sent_parent[last]
except KeyError:
pass
return not issubset(sent, existing)
Expand Down
216 changes: 216 additions & 0 deletions plugins/module_utils/nd_config_collection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
# -*- coding: utf-8 -*-

# Copyright: (c) 2025, Gaspard Micol (@gmicol) <[email protected]>

# GNU General Public License v3.0+ (see LICENSE or https://www.gnu.org/licenses/gpl-3.0.txt)

from __future__ import absolute_import, division, print_function

__metaclass__ = type

from copy import deepcopy


# TODO: move it to utils.py
def sanitize_dict(dict_to_sanitize, keys=None, values=None, recursive=True, remove_none_values=True):
if keys is None:
keys = []
if values is None:
values = []

result = deepcopy(dict_to_sanitize)
for k, v in dict_to_sanitize.items():
if k in keys:
del result[k]
elif v in values or (v is None and remove_none_values):
del result[k]
elif isinstance(v, dict) and recursive:
result[k] = sanitize_dict(v, keys, values)
elif isinstance(v, list) and recursive:
for index, item in enumerate(v):
if isinstance(item, dict):
result[k][index] = sanitize_dict(item, keys, values)
return result


# Custom NDConfigCollection Exceptions
class NDConfigCollectionError(Exception):
"""Base exception for NDConfigCollection errors."""
pass


class NDConfigNotFoundError(NDConfigCollectionError, KeyError):
"""Raised when a configuration is not found by its identifier."""
pass


class NDIdentifierMismatchError(NDConfigCollectionError, ValueError):
"""Raised when an identifier in a config does not match the expected key."""
pass


class InvalidNDConfigError(NDConfigCollectionError, TypeError):
"""Raised when a provided config is not a dictionary or is missing the identifier key."""
pass


# TODO: Add a get_diff_config function
# TODO: Handle multiple identifiers
# TODO: Add descriptions
# TODO: Maybe leverage MutableMapping, MutableSequence from collections.abc
# NOTE: New data structure for ND Network Resource Module
class NDConfigCollection:
def __init__(self, identifier_key, data=None):
if not isinstance(identifier_key, str):
raise TypeError("identifier_key must be a string.")
self.identifier_key = identifier_key
self.config_collection = {}

if data is not None:
if isinstance(data, list):
self.list_view = data
elif isinstance(data, dict):
self.config_collection = data
else:
raise TypeError("data must be a list of dicts or dict of configs.")

@property
def list_view(self):
return [v.copy() for v in self.config_collection.values()]

@list_view.setter
def list_view(self, new_list):
if not isinstance(new_list, list):
raise TypeError("list_view must be set to a list.")

new_dict = {}
for item in new_list:
if not isinstance(item, dict):
raise TypeError("All items in list_view must be dicts.")
if self.identifier_key not in item:
raise InvalidNDConfigError("Missing '{0}' in item: {1}".format(self.identifier_key, item))

key = item[self.identifier_key]
new_dict[key] = item.copy()
self.config_collection = new_dict

# Basic Operations
def replace(self, config):
if not isinstance(config, dict):
raise InvalidNDConfigError("Config must be a dict.")
if self.identifier_key not in config:
raise InvalidNDConfigError("Missing '{0}' in config: {1}".format(self.identifier_key, config))

key = config[self.identifier_key]
self.config_collection[key] = config.copy()

def merge(self, config):
if not isinstance(config, dict):
raise InvalidNDConfigError("Config must be a dict.")
if self.identifier_key not in config:
raise InvalidNDConfigError("Missing '{0}' in config: {1}".format(self.identifier_key, config))

key = config[self.identifier_key]
if key in self.config_collection:
self.config_collection[key].update(config.copy())
else:
self.config_collection[key] = config.copy()

def remove(self, identifier):
if identifier not in self.config_collection:
raise NDConfigNotFoundError("Configuration with identifier '{0}' not found.".format(identifier))
del self.config_collection[identifier]

def get(self, identifier):
config = self.config_collection.get(identifier)
if config is None:
raise NDConfigNotFoundError("Configuration with identifier '{0}' not found.".format(identifier))
return config.copy()

# Magic Methods
def __len__(self):
return len(self.config_collection)

def __contains__(self, identifier):
return identifier in self.config_collection

def __iter__(self):
for config in self.config_collection.values():
yield config.copy()

def __getitem__(self, identifier):
return self.get(identifier)

def __setitem__(self, identifier, config):
if not isinstance(config, dict):
raise InvalidNDConfigError("Config must be a dict when setting via __setitem__.")
if self.identifier_key not in config:
raise InvalidNDConfigError("Config must contain '{0}' when setting via __setitem__.".format(self.identifier_key))
if config[self.identifier_key] != identifier:
raise NDIdentifierMismatchError(
"Identifier '{0}' in key does not match '{1}' value '{2}' in config.".format(identifier, self.identifier_key, config[self.identifier_key])
)
self.replace(config)

def __delitem__(self, identifier):
self.remove(identifier)

def __eq__(self, other):
if not isinstance(other, NDConfigCollection):
# TODO: Make it works for list and dict as well. For now just raise an error
raise InvalidNDConfigError("Can only do __eq__ with another NDConfigCollection instance.")

if self.identifier_key != other.identifier_key:
return False

return self.config_collection == other.config_collection

def __ne__(self, other):
return not self.__eq__(other)

def __repr__(self):
return "NDConfigCollection(identifier_key='{0}', count={1})".format(self.identifier_key, len(self))

# Standard Dictionary-like Views
def keys(self):
return self.config_collection.keys()

def values(self):
for v in self.config_collection.values():
yield v.copy()

def items(self):
for k, v in self.config_collection.items():
yield k, v.copy()

# Utility/Convenience Functions
def clear(self):
self.config_collection.clear()

def find_by_attribute(self, attribute_name, attribute_value):
matching_configs = []
for config in self.values():
if config.get(attribute_name) == attribute_value:
matching_configs.append(config.copy())
return matching_configs

def copy(self):
return NDConfigCollection(self.identifier_key, data=deepcopy(self.config_collection))

def sanitize(self, keys_to_remove=None, values_to_remove=None, recursive=True, remove_none_values=True):
sanitized_config_collection = sanitize_dict(self.config_collection, keys_to_remove, values_to_remove, recursive, remove_none_values)
return NDConfigCollection(self.identifier_key, data=sanitized_config_collection)

def get_diff_identifiers(self, other_collection):
if not isinstance(other_collection, NDConfigCollection):
raise InvalidNDConfigError("Can only do get_removed_identifiers with another NDConfigCollection instance.")

if self.identifier_key != other_collection.identifier_key:
raise NDIdentifierMismatchError(
"Cannot do get_removed_identifiers with another NDConfigCollection with different identifier_key. "
"Expected '{0}', got '{1}'.".format(self.identifier_key, other_collection.identifier_key)
)
current_identifiers = set(self.config_collection.keys())
other_identifiers = set(other_collection.config_collection.keys())

return list(current_identifiers - other_identifiers)
Loading
Loading