Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog/560.added.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add the ability to perform range expansions in object files. This feature allows users to define patterns in string fields that will be expanded into multiple objects, facilitating bulk object creation and management. The implementation includes validation to ensure that all expanded lists have the same length, preventing inconsistencies. Documentation has been updated to explain how to use this feature, including examples of valid and invalid configurations.
83 changes: 83 additions & 0 deletions docs/docs/python-sdk/topics/object_file.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -195,3 +195,86 @@ Metadata support is planned for future releases. Currently, the Object file does
2. Keep object files organized by model type or purpose.
3. Validate object files before loading them into production environments.
4. Use comments in your YAML files to document complex relationships or dependencies.

## Range Expansion in Object Files

The Infrahub Python SDK supports **range expansion** for string fields in object files. This feature allows you to specify a range pattern (e.g., `[1-5]`) in any string value, and the SDK will automatically expand it into multiple objects during validation and processing.

### How Range Expansion Works

- Any string field containing a pattern like `[1-5]`, `[10-15]`, or `[1,3,5]` will be expanded into multiple objects.
- If multiple fields in the same object use range expansion, **all expanded lists must have the same length**. If not, validation will fail.
- The expansion is performed before validation and processing, so all downstream logic works on the expanded data.

### Examples

#### Single Field Expansion

```yaml
spec:
kind: BuiltinLocation
data:
- name: AMS[1-3]
type: Country
```

This will expand to:

```yaml
- name: AMS1
type: Country
- name: AMS2
type: Country
- name: AMS3
type: Country
```

#### Multiple Field Expansion (Matching Lengths)

```yaml
spec:
kind: BuiltinLocation
data:
- name: AMS[1-3]
description: Datacenter [A-C]
type: Country
```

This will expand to:

```yaml
- name: AMS1
description: Datacenter A
type: Country
- name: AMS2
description: Datacenter B
type: Country
- name: AMS3
description: Datacenter C
type: Country
```

#### Error: Mismatched Range Lengths

If you use ranges of different lengths in multiple fields:

```yaml
spec:
kind: BuiltinLocation
data:
- name: AMS[1-3]
description: "Datacenter [10-15]"
type: Country
```

This will **fail validation** with an error like:

```bash
Range expansion mismatch: fields expanded to different lengths: [3, 6]
```

### Notes

- Range expansion is supported for any string field in the `data` section.
- If no range pattern is present, the field is left unchanged.
- If expansion fails for any field, validation will fail with an error message.
53 changes: 47 additions & 6 deletions infrahub_sdk/spec/object.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from __future__ import annotations

import copy
import re
from enum import Enum
from typing import TYPE_CHECKING, Any

Expand All @@ -8,6 +10,7 @@
from ..exceptions import ObjectValidationError, ValidationError
from ..schema import GenericSchemaAPI, RelationshipKind, RelationshipSchema
from ..yaml import InfrahubFile, InfrahubFileKind
from .range_expansion import MATCH_PATTERN, range_expansion

if TYPE_CHECKING:
from ..client import InfrahubClient
Expand Down Expand Up @@ -164,14 +167,47 @@ async def get_relationship_info(
return info


def expand_data_with_ranges(data: list[dict[str, Any]]) -> list[dict[str, Any]]:
"""Expand any item in self.data with range pattern in any value. Supports multiple fields, requires equal expansion length."""
range_pattern = re.compile(MATCH_PATTERN)
expanded = []
for item in data:
# Find all fields to expand
expand_fields = {}
for key, value in item.items():
if isinstance(value, str) and range_pattern.search(value):
try:
expand_fields[key] = range_expansion(value)
except Exception:
# If expansion fails, treat as no expansion
expand_fields[key] = [value]
if not expand_fields:
expanded.append(item)
continue
# Check all expanded lists have the same length
lengths = [len(v) for v in expand_fields.values()]
if len(set(lengths)) > 1:
raise ValidationError(f"Range expansion mismatch: fields expanded to different lengths: {lengths}")
n = lengths[0]
# Zip expanded values and produce new items
for i in range(n):
new_item = copy.deepcopy(item)
for key, values in expand_fields.items():
new_item[key] = values[i]
expanded.append(new_item)
return expanded


class InfrahubObjectFileData(BaseModel):
kind: str
data: list[dict[str, Any]] = Field(default_factory=list)

async def validate_format(self, client: InfrahubClient, branch: str | None = None) -> list[ObjectValidationError]:
errors: list[ObjectValidationError] = []
schema = await client.schema.get(kind=self.kind, branch=branch)
for idx, item in enumerate(self.data):
expanded_data = expand_data_with_ranges(self.data)
self.data = expanded_data
for idx, item in enumerate(expanded_data):
errors.extend(
await self.validate_object(
client=client,
Expand All @@ -186,7 +222,8 @@ async def validate_format(self, client: InfrahubClient, branch: str | None = Non

async def process(self, client: InfrahubClient, branch: str | None = None) -> None:
schema = await client.schema.get(kind=self.kind, branch=branch)
for idx, item in enumerate(self.data):
expanded_data = expand_data_with_ranges(self.data)
for idx, item in enumerate(expanded_data):
await self.create_node(
client=client,
schema=schema,
Expand Down Expand Up @@ -311,7 +348,8 @@ async def validate_related_nodes(
rel_info.find_matching_relationship(peer_schema=peer_schema)
context.update(rel_info.get_context(value="placeholder"))

for idx, peer_data in enumerate(data["data"]):
expanded_data = expand_data_with_ranges(data=data["data"])
for idx, peer_data in enumerate(expanded_data):
context["list_index"] = idx
errors.extend(
await cls.validate_object(
Expand Down Expand Up @@ -421,22 +459,24 @@ async def create_node(
remaining_rels.append(key)
elif not rel_info.is_reference and not rel_info.is_mandatory:
if rel_info.format == RelationshipDataFormat.ONE_OBJ:
expanded_data = expand_data_with_ranges(data=[value])
nodes = await cls.create_related_nodes(
client=client,
position=position,
rel_info=rel_info,
data=value,
data=expanded_data,
branch=branch,
default_schema_kind=default_schema_kind,
)
clean_data[key] = nodes[0]

else:
expanded_data = expand_data_with_ranges(data=value)
nodes = await cls.create_related_nodes(
client=client,
position=position,
rel_info=rel_info,
data=value,
data=expanded_data,
branch=branch,
default_schema_kind=default_schema_kind,
)
Expand Down Expand Up @@ -525,7 +565,8 @@ async def create_related_nodes(
rel_info.find_matching_relationship(peer_schema=peer_schema)
context.update(rel_info.get_context(value=parent_node.id))

for idx, peer_data in enumerate(data["data"]):
expanded_data = expand_data_with_ranges(data=data["data"])
for idx, peer_data in enumerate(expanded_data):
context["list_index"] = idx
if isinstance(peer_data, dict):
node = await cls.create_node(
Expand Down
118 changes: 118 additions & 0 deletions infrahub_sdk/spec/range_expansion.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
import itertools
import re

MATCH_PATTERN = r"(\[[\w,-]+\])"


def _escape_brackets(s: str) -> str:
return s.replace("\\[", "__LBRACK__").replace("\\]", "__RBRACK__")


def _unescape_brackets(s: str) -> str:
return s.replace("__LBRACK__", "[").replace("__RBRACK__", "]")
Comment on lines +7 to +12
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Consider collision-resistant escape sequences.

The current placeholder strings __LBRACK__ and __RBRACK__ could collide with user input that already contains these exact strings, leading to incorrect unescaping. While this is an edge case, consider using more collision-resistant placeholders (e.g., with UUID or special characters unlikely to appear in typical input).

🤖 Prompt for AI Agents
In infrahub_sdk/spec/range_expansion.py around lines 7 to 12, the current escape
tokens "__LBRACK__"/"__RBRACK__" can collide with user data; replace them with
module-level, collision-resistant constants created once (e.g., incorporate a
UUID or use a binary/sentinel sequence unlikely in input) so the same unique
tokens are used by both _escape_brackets and _unescape_brackets; update both
functions to reference those constants instead of hard-coded strings and ensure
they remain deterministic for the process (generate the UUID at import time) so
escaping and unescaping remain consistent.



def _char_range_expand(char_range_str: str) -> list[str]:
"""Expands a string of numbers or single-character letters."""
expanded_values: list[str] = []
# Special case: if no dash and no comma, and multiple characters, error if not all alphanumeric
if "," not in char_range_str and "-" not in char_range_str and len(char_range_str) > 1:
if not char_range_str.isalnum():
raise ValueError(f"Invalid non-alphanumeric range: [{char_range_str}]")
return list(char_range_str)

for value in char_range_str.split(","):
if not value:
# Malformed: empty part in comma-separated list
return [f"[{char_range_str}]"]
if "-" in value:
start_char, end_char = value.split("-", 1)
if not start_char or not end_char:
expanded_values.append(f"[{char_range_str}]")
return expanded_values
# Check if it's a numeric range
if start_char.isdigit() and end_char.isdigit():
start_num = int(start_char)
end_num = int(end_char)
step = 1 if start_num <= end_num else -1
expanded_values.extend(str(i) for i in range(start_num, end_num + step, step))
# Check if it's an alphabetical range (single character)
elif len(start_char) == 1 and len(end_char) == 1 and start_char.isalpha() and end_char.isalpha():
start_ord = ord(start_char)
end_ord = ord(end_char)
step = 1 if start_ord <= end_ord else -1
is_upper = start_char.isupper()
for i in range(start_ord, end_ord + step, step):
char = chr(i)
expanded_values.append(char.upper() if is_upper else char)
else:
# Mixed or unsupported range type, append as-is
expanded_values.append(value)
else:
# If the value is a single character or valid alphanumeric string, append
if not value.isalnum():
raise ValueError(f"Invalid non-alphanumeric value: [{value}]")
expanded_values.append(value)
return expanded_values
Comment on lines +15 to +56
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Inconsistent error handling for malformed input.

The function handles malformed input inconsistently:

  • Lines 20-21 and 53-54 raise ValueError for non-alphanumeric values
  • Lines 25-27 and 30-32 return the malformed pattern as-is (wrapped in brackets)

This inconsistency could lead to unpredictable behavior. Consider standardizing the approach—either consistently raise exceptions for all malformed input, or consistently return malformed patterns as-is with clear documentation of this behavior.

Example of inconsistency:

# Raises ValueError
_char_range_expand("a@b")  

# Returns ["[a--b]"] without raising
_char_range_expand("a--b")  
🤖 Prompt for AI Agents
In infrahub_sdk/spec/range_expansion.py around lines 15 to 56, error handling is
inconsistent: some malformed inputs raise ValueError while other malformed
patterns are returned wrapped in brackets; standardize by raising ValueError for
all malformed cases. Specifically, change the branches that currently
return/[append] bracketed patterns (empty parts in comma list, missing start or
end in a hyphen range, and mixed/unsupported range types) to raise ValueError
with a clear message including the offending input, and keep the existing
ValueError behavior for non-alphanumeric single values so all malformed inputs
uniformly raise exceptions.



def _extract_constants(pattern: str, re_compiled: re.Pattern) -> tuple[list[int], list[list[str]]]:
cartesian_list = []
interface_constant = [0]
for match in re_compiled.finditer(pattern):
interface_constant.append(match.start())
interface_constant.append(match.end())
cartesian_list.append(_char_range_expand(match.group()[1:-1]))
return interface_constant, cartesian_list


def _expand_interfaces(pattern: str, interface_constant: list[int], cartesian_list: list[list[str]]) -> list[str]:
def _pairwise(lst: list[int]) -> list[tuple[int, int]]:
it = iter(lst)
return list(zip(it, it))

if interface_constant[-1] < len(pattern):
interface_constant.append(len(pattern))
interface_constant_out = _pairwise(interface_constant)
expanded_interfaces = []
for element in itertools.product(*cartesian_list):
current_interface = ""
for count, item in enumerate(interface_constant_out):
current_interface += pattern[item[0] : item[1]]
if count < len(element):
current_interface += element[count]
expanded_interfaces.append(_unescape_brackets(current_interface))
return expanded_interfaces


def range_expansion(interface_pattern: str) -> list[str]:
"""Expand string pattern into a list of strings, supporting both
number and single-character alphabet ranges. Heavily inspired by
Netutils interface_range_expansion but adapted to support letters.

Args:
interface_pattern: The string pattern that will be parsed to create the list of interfaces.

Returns:
Contains the expanded list of interfaces.

Examples:
>>> from infrahub_sdk.spec.range_expansion import range_expansion
>>> range_expansion("Device [A-C]")
['Device A', 'Device B', 'Device C']
>>> range_expansion("FastEthernet[1-2]/0/[10-15]")
['FastEthernet1/0/10', 'FastEthernet1/0/11', 'FastEthernet1/0/12',
'FastEthernet1/0/13', 'FastEthernet1/0/14', 'FastEthernet1/0/15',
'FastEthernet2/0/10', 'FastEthernet2/0/11', 'FastEthernet2/0/12',
'FastEthernet2/0/13', 'FastEthernet2/0/14', 'FastEthernet2/0/15']
>>> range_expansion("GigabitEthernet[a-c]/0/1")
['GigabitEtherneta/0/1', 'GigabitEthernetb/0/1', 'GigabitEthernetc/0/1']
>>> range_expansion("Eth[a,c,e]/0/1")
['Etha/0/1', 'Ethc/0/1', 'Ethe/0/1']
"""
pattern_escaped = _escape_brackets(interface_pattern)
re_compiled = re.compile(MATCH_PATTERN)
if not re_compiled.search(pattern_escaped):
return [_unescape_brackets(pattern_escaped)]
interface_constant, cartesian_list = _extract_constants(pattern_escaped, re_compiled)
return _expand_interfaces(pattern_escaped, interface_constant, cartesian_list)
Loading
Loading