Skip to content

fix(backend): create node with many rels faster #6883

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Aug 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions backend/infrahub/core/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -400,15 +400,15 @@ async def query_peers(

results = []
for peer in peers_info:
result = await Relationship(schema=schema, branch=branch, at=at, node_id=peer.source_id).load(
result = Relationship(schema=schema, branch=branch, at=at, node_id=peer.source_id).load(
db=db,
id=peer.rel_node_id,
db_id=peer.rel_node_db_id,
updated_at=peer.updated_at,
data=peer,
)
if fetch_peers:
await result.set_peer(value=peer_nodes[peer.peer_id])
result.set_peer(value=peer_nodes[peer.peer_id])
results.append(result)

return results
Expand Down
13 changes: 13 additions & 0 deletions backend/infrahub/core/query/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,20 @@ async def query_init(self, db: InfrahubDatabase, **kwargs) -> None: # noqa: ARG
relationships: list[RelationshipCreateData] = []
for rel_name in self.node._relationships:
rel_manager: RelationshipManager = getattr(self.node, rel_name)
# Fetch all relationship peers through a single database call for performances.
peers = await rel_manager.get_peers(db=db, branch_agnostic=self.branch_agnostic)

for rel in rel_manager._relationships:
try:
rel.set_peer(value=peers[rel.get_peer_id()])
except KeyError:
pass
except ValueError:
# Relationship has not been initialized yet, it means the peer does not exist in db yet
# typically because it will be allocated from a ressource pool. In that case, the peer
# will be fetched using `rel.resolve` later.
pass

rel_create_data = await rel.get_create_data(db=db, at=at)
if rel_create_data.peer_branch_level > deepest_branch_level or (
deepest_branch_name == GLOBAL_BRANCH_NAME and rel_create_data.peer_branch == registry.default_branch
Expand Down
26 changes: 13 additions & 13 deletions backend/infrahub/core/relationship/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,11 +166,11 @@ def get_branch_based_on_support_type(self) -> Branch:
return registry.get_global_branch()
return self.branch

async def _process_data(self, data: dict | RelationshipPeerData | str) -> None:
def _process_data(self, data: dict | RelationshipPeerData | str) -> None:
self.data = data

if isinstance(data, RelationshipPeerData):
await self.set_peer(value=str(data.peer_id))
self.set_peer(value=str(data.peer_id))

if not self.id and data.rel_node_id:
self.id = data.rel_node_id
Expand All @@ -187,7 +187,7 @@ async def _process_data(self, data: dict | RelationshipPeerData | str) -> None:
elif isinstance(data, dict):
for key, value in data.items():
if key in ["peer", "id"]:
await self.set_peer(value=data.get(key, None))
self.set_peer(value=data.get(key, None))
elif key == "hfid" and self.peer_id is None:
self.peer_hfid = value
elif key.startswith(PREFIX_PROPERTY) and key.replace(PREFIX_PROPERTY, "") in self._flag_properties:
Expand All @@ -198,19 +198,19 @@ async def _process_data(self, data: dict | RelationshipPeerData | str) -> None:
self.from_pool = value

else:
await self.set_peer(value=data)
self.set_peer(value=data)

async def new(
self,
db: InfrahubDatabase, # noqa: ARG002
data: dict | RelationshipPeerData | Any = None,
**kwargs: Any, # noqa: ARG002
) -> Relationship:
await self._process_data(data=data)
self._process_data(data=data)

return self

async def load(
def load(
self,
db: InfrahubDatabase, # noqa: ARG002
id: UUID | None = None,
Expand All @@ -223,7 +223,7 @@ async def load(
self.id = id or self.id
self.db_id = db_id or self.db_id

await self._process_data(data=data)
self._process_data(data=data)

if updated_at and hash(self) != hash_before:
self.updated_at = Timestamp(updated_at)
Expand Down Expand Up @@ -252,7 +252,7 @@ async def get_node(self, db: InfrahubDatabase) -> Node:
self._node_id = self._node.id
return node

async def set_peer(self, value: str | Node) -> None:
def set_peer(self, value: str | Node) -> None:
if isinstance(value, str):
self.peer_id = value
else:
Expand Down Expand Up @@ -433,7 +433,7 @@ async def resolve(self, db: InfrahubDatabase, at: Timestamp | None = None) -> No
db=db, id=self.peer_id, branch=self.branch, kind=self.schema.peer, fields={"display_label": None}
)
if peer:
await self.set_peer(value=peer)
self.set_peer(value=peer)

if not self.peer_id and self.peer_hfid:
peer_schema = db.schema.get(name=self.schema.peer, branch=self.branch)
Expand All @@ -450,7 +450,7 @@ async def resolve(self, db: InfrahubDatabase, at: Timestamp | None = None) -> No
fields={"display_label": None},
raise_on_error=True,
)
await self.set_peer(value=peer)
self.set_peer(value=peer)

if not self.peer_id and self.from_pool and "id" in self.from_pool:
pool_id = str(self.from_pool.get("id"))
Expand All @@ -473,7 +473,7 @@ async def resolve(self, db: InfrahubDatabase, at: Timestamp | None = None) -> No
data_from_pool["identifier"] = f"hfid={hfid_str} rel={self.name}"

assigned_peer: Node = await pool.get_resource(db=db, branch=self.branch, at=at, **data_from_pool) # type: ignore[attr-defined]
await self.set_peer(value=assigned_peer)
self.set_peer(value=assigned_peer)
self.set_source(value=pool.id)

async def save(self, db: InfrahubDatabase, at: Timestamp | None = None) -> Self:
Expand Down Expand Up @@ -962,7 +962,7 @@ async def _fetch_relationships(

for peer_id in details.peer_ids_present_database_only:
self._relationships.append(
await Relationship(
Relationship(
schema=self.schema,
branch=self.branch,
at=at or self.at,
Expand Down Expand Up @@ -1050,7 +1050,7 @@ async def update(self, data: list[str | Node] | dict[str, Any] | str | Node | No
if isinstance(item, dict) and item.get("id", None) in previous_relationships:
rel = previous_relationships[item["id"]]
hash_before = hash(rel)
await rel.load(data=item, db=db)
rel.load(data=item, db=db)
if hash(rel) != hash_before:
changed = True
self._relationships.append(rel)
Expand Down
2 changes: 1 addition & 1 deletion backend/infrahub/graphql/mutations/relationship.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ async def mutate(
# we should use RelationshipDataDeleteQuery to delete the relationship
# it would be more query efficient
rel = Relationship(schema=rel_schema, branch=graphql_context.branch, node=source)
await rel.load(db=db, data=existing_peers[node_data.get("id")])
rel.load(db=db, data=existing_peers[node_data.get("id")])
if group_event_type != GroupUpdateType.NONE:
peers.append(EventNode(id=rel.get_peer_id(), kind=nodes[rel.get_peer_id()].get_kind()))
node_changelog.delete_relationship(relationship=rel)
Expand Down
19 changes: 9 additions & 10 deletions backend/tests/unit/core/migrations/graph/test_003.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,21 @@

from infrahub.core.migrations.graph.m003_relationship_parent_optional import Migration003, Migration003Query01
from infrahub.core.node import Node
from infrahub.core.schema import SchemaRoot, internal_schema
from infrahub.core.schema.schema_branch import SchemaBranch
from infrahub.core.utils import count_relationships
from infrahub.database import InfrahubDatabase


@pytest.fixture
async def migration_003_data(db: InfrahubDatabase, reset_registry, default_branch, delete_all_nodes_in_db):
# # load the internal schema from
schema = SchemaRoot(**internal_schema)
schema_branch = SchemaBranch(cache={}, name="default_branch")
schema_branch.load_schema(schema=schema)
schema_branch.process()

node_schema = schema_branch.get(name="SchemaNode")
rel_schema = schema_branch.get(name="SchemaRelationship")
async def migration_003_data(
db: InfrahubDatabase,
reset_registry,
default_branch,
delete_all_nodes_in_db,
register_core_models_schema: SchemaBranch,
):
Comment on lines 10 to +17
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Confirm pytest async support and fixture availability.

Using async tests and an async fixture with @pytest.fixture requires pytest-asyncio or equivalent auto mode. Also verify that register_core_models_schema is defined as a fixture.

Run:


🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "Looking for pytest-asyncio or asyncio_mode=auto configuration..."
fd -a pytest.ini tox.ini pyproject.toml conftest.py 2>/dev/null | xargs -I{} rg -n 'pytest-asyncio|asyncio_mode\s*=\s*auto|pytestmark\s*=\s*pytest\.mark\.asyncio' {} || true

echo "Searching for the register_core_models_schema fixture definition..."
rg -n 'def\s+register_core_models_schema\(' -A 3 -B 3 || true

Length of output: 1469


🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "Displaying beginning of the test file to check async tests and markers..."
sed -n '1,50p' backend/tests/unit/core/migrations/graph/test_003.py

Length of output: 1974


Ensure pytest-asyncio is enabled for async fixtures/tests

I verified that:

  • There is no asyncio_mode or pytest.mark.asyncio entry in your pytest config (pytest.ini/pyproject.toml/conftest.py).
  • The async fixture migration_003_data and async tests in backend/tests/unit/core/migrations/graph/test_003.py will not run without the pytest-asyncio plugin or equivalent.
  • The register_core_models_schema fixture is correctly defined as an async fixture in backend/tests/conftest.py.

Please update your test setup to support asyncio, for example:

  • Add pytest-asyncio to your dev dependencies.

  • In pytest.ini or pyproject.toml, enable asyncio mode:

    [tool.pytest.ini_options]
    asyncio_mode = "auto"
    

    or

    [pytest]
    asyncio_mode = auto
    
  • (Alternatively) add at the top of your async test module:

    import pytest
    pytestmark = pytest.mark.asyncio

This will ensure async def fixtures and tests execute correctly.

🤖 Prompt for AI Agents
In backend/tests/unit/core/migrations/graph/test_003.py around lines 10 to 17,
the async fixture migration_003_data and async tests require pytest-asyncio to
run properly. To fix this, ensure pytest-asyncio is installed as a dev
dependency and enable asyncio support by adding asyncio_mode = "auto" in your
pytest configuration file (pytest.ini or pyproject.toml). Alternatively, add
pytestmark = pytest.mark.asyncio at the top of this test module to mark all
tests as asyncio-enabled.

node_schema = register_core_models_schema.get(name="SchemaNode")
rel_schema = register_core_models_schema.get(name="SchemaRelationship")
Comment on lines +11 to +19
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add return type and docstring; annotate untyped fixture params.

Per guidelines, add -> None and annotate reset_registry/default_branch/delete_all_nodes_in_db. Include a brief Google-style docstring.

Apply this diff:

-async def migration_003_data(
-    db: InfrahubDatabase,
-    reset_registry,
-    default_branch,
-    delete_all_nodes_in_db,
-    register_core_models_schema: SchemaBranch,
-):
+async def migration_003_data(
+    db: InfrahubDatabase,
+    reset_registry: Any,
+    default_branch: Any,
+    delete_all_nodes_in_db: Any,
+    register_core_models_schema: SchemaBranch,
+) -> None:
+    """Prepare data for Migration003.
+
+    Creates a Node with two Parent relationships (one optional, one required)
+    using the shared core models schema.
+
+    Args:
+      db: Database handle.
+      reset_registry: Ensures a clean registry.
+      default_branch: Ensures the default branch exists.
+      delete_all_nodes_in_db: Cleans DB prior to running the test.
+      register_core_models_schema: Registered schema branch containing core models.
+
+    Returns:
+      None
+    """
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
async def migration_003_data(
db: InfrahubDatabase,
reset_registry,
default_branch,
delete_all_nodes_in_db,
register_core_models_schema: SchemaBranch,
):
node_schema = register_core_models_schema.get(name="SchemaNode")
rel_schema = register_core_models_schema.get(name="SchemaRelationship")
async def migration_003_data(
db: InfrahubDatabase,
reset_registry: Any,
default_branch: Any,
delete_all_nodes_in_db: Any,
register_core_models_schema: SchemaBranch,
) -> None:
"""Prepare data for Migration003.
Creates a Node with two Parent relationships (one optional, one required)
using the shared core models schema.
Args:
db: Database handle.
reset_registry: Ensures a clean registry.
default_branch: Ensures the default branch exists.
delete_all_nodes_in_db: Cleans DB prior to running the test.
register_core_models_schema: Registered schema branch containing core models.
Returns:
None
"""
node_schema = register_core_models_schema.get(name="SchemaNode")
rel_schema = register_core_models_schema.get(name="SchemaRelationship")
🤖 Prompt for AI Agents
In backend/tests/unit/core/migrations/graph/test_003.py around lines 11 to 19,
the async function migration_003_data lacks a return type annotation and a
docstring, and some fixture parameters are untyped. Add a return type annotation
-> None to the function signature, annotate the types of reset_registry,
default_branch, and delete_all_nodes_in_db parameters appropriately, and include
a brief Google-style docstring describing the function's purpose and parameters.


node1 = await Node.init(db=db, schema=node_schema)
await node1.new(db=db, name="Node", namespace="Test")
Expand Down
22 changes: 7 additions & 15 deletions backend/tests/unit/core/migrations/graph/test_012.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@
Migration012RenameTypeAttributeSchema,
)
from infrahub.core.node import Node
from infrahub.core.schema import AttributeSchema, NodeSchema, RelationshipSchema, SchemaRoot, internal_schema
from infrahub.core.schema.schema_branch import SchemaBranch
from infrahub.core.schema import AttributeSchema, NodeSchema, RelationshipSchema
from infrahub.core.utils import count_nodes, count_relationships
from infrahub.database import InfrahubDatabase

Expand Down Expand Up @@ -134,13 +133,9 @@


@pytest.fixture
async def migration_012_data(db: InfrahubDatabase, reset_registry, default_branch, delete_all_nodes_in_db):
# # load the internal schema from
schema = SchemaRoot(**internal_schema)
schema_branch = SchemaBranch(cache={}, name="default_branch")
schema_branch.load_schema(schema=schema)
schema_branch.process()

async def migration_012_data(
db: InfrahubDatabase, reset_registry, default_branch, delete_all_nodes_in_db, register_core_models_schema
):
user1 = await Node.init(db=db, schema=ACCOUNT_SCHEMA)
await user1.new(db=db, name="User1", type="User")
await user1.save(db=db)
Expand All @@ -163,12 +158,9 @@ async def migration_012_data(db: InfrahubDatabase, reset_registry, default_branc


@pytest.fixture
async def migration_012_schema(db: InfrahubDatabase, reset_registry, default_branch, delete_all_nodes_in_db):
schema = SchemaRoot(**internal_schema)
schema_branch = SchemaBranch(cache={}, name="default_branch")
schema_branch.load_schema(schema=schema)
schema_branch.process()

async def migration_012_schema(
db: InfrahubDatabase, reset_registry, default_branch, delete_all_nodes_in_db, register_core_models_schema
):
node1 = await Node.init(db=db, schema=NODE_SCHEMA)
await node1.new(
db=db, name="Account", namespace="Core", inherit_from=[InfrahubKind.LINEAGEOWNER, InfrahubKind.LINEAGESOURCE]
Expand Down
22 changes: 7 additions & 15 deletions backend/tests/unit/core/migrations/graph/test_013.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@
Migration013DeleteUsernamePasswordGenericSchema,
)
from infrahub.core.node import Node
from infrahub.core.schema import AttributeSchema, NodeSchema, RelationshipSchema, SchemaRoot, internal_schema
from infrahub.core.schema.schema_branch import SchemaBranch
from infrahub.core.schema import AttributeSchema, NodeSchema, RelationshipSchema
from infrahub.core.utils import count_nodes, count_relationships
from infrahub.database import InfrahubDatabase

Expand Down Expand Up @@ -141,13 +140,9 @@


@pytest.fixture
async def migration_013_data(db: InfrahubDatabase, reset_registry, default_branch, delete_all_nodes_in_db):
# load the internal schema from
schema = SchemaRoot(**internal_schema)
schema_branch = SchemaBranch(cache={}, name="default_branch")
schema_branch.load_schema(schema=schema)
schema_branch.process()

async def migration_013_data(
db: InfrahubDatabase, reset_registry, default_branch, delete_all_nodes_in_db, register_core_models_schema
):
repo1 = await Node.init(db=db, schema=GIT_SCHEMA)
await repo1.new(db=db, name="repo1 initial", username="user1 initial", password="pwd1 initial")
await repo1.save(db=db)
Expand All @@ -168,12 +163,9 @@ async def migration_013_data(db: InfrahubDatabase, reset_registry, default_branc


@pytest.fixture
async def migration_013_schema(db: InfrahubDatabase, reset_registry, default_branch, delete_all_nodes_in_db):
schema = SchemaRoot(**internal_schema)
schema_branch = SchemaBranch(cache={}, name="default_branch")
schema_branch.load_schema(schema=schema)
schema_branch.process()

async def migration_013_schema(
db: InfrahubDatabase, reset_registry, default_branch, delete_all_nodes_in_db, register_core_models_schema
):
node1 = await Node.init(db=db, schema=NODE_SCHEMA)
await node1.new(db=db, name="GenericRepository", namespace="Core")
await node1.save(db=db)
Expand Down
2 changes: 1 addition & 1 deletion backend/tests/unit/core/test_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -638,7 +638,7 @@ async def test_node_create_with_single_relationship(db: InfrahubDatabase, defaul
assert c1.nbr_seats.value == 4
assert c1.is_electric.value is True
c1_owner = await c1.owner.get_peer(db=db)
assert c1_owner == p1
assert c1_owner.id == p1.id

paths = await get_paths_between_nodes(
db=db, source_id=c1.db_id, destination_id=p1.db_id, max_length=2, relationships=["IS_RELATED"]
Expand Down
18 changes: 9 additions & 9 deletions backend/tests/unit/core/test_relationship.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ async def test_relationship_load_existing(

assert peers[0].properties["is_protected"].value is True

await rel.load(db=db, data=peers[0])
rel.load(db=db, data=peers[0])

assert rel.id == peers[0].rel_node_id
assert rel.db_id == peers[0].rel_node_db_id
Expand All @@ -115,7 +115,7 @@ async def test_relationship_peer(db: InfrahubDatabase, tag_blue_main: Node, pers
rel_schema = person_schema.get_relationship("tags")

rel = Relationship(schema=rel_schema, branch=branch, node=person_jack_main)
await rel.set_peer(value=tag_blue_main)
rel.set_peer(value=tag_blue_main)

assert rel.schema == rel_schema
assert rel.name == rel_schema.name
Expand All @@ -131,7 +131,7 @@ async def test_relationship_save(db: InfrahubDatabase, tag_blue_main: Node, pers
rel_schema = person_schema.get_relationship("tags")

rel = Relationship(schema=rel_schema, branch=branch, node=person_jack_main)
await rel.set_peer(value=tag_blue_main)
rel.set_peer(value=tag_blue_main)
await rel.save(db=db)

p11 = await NodeManager.get_one(id=person_jack_main.id, db=db, branch=branch)
Expand All @@ -147,28 +147,28 @@ async def test_relationship_hash(
rel_schema = person_schema.get_relationship("tags")

rel = Relationship(schema=rel_schema, branch=branch, node=person_jack_main)
await rel.set_peer(value=tag_blue_main)
rel.set_peer(value=tag_blue_main)
await rel.save(db=db)
hash1 = hash(rel)

# Update flag property back and forth and check that hash is the same
await rel.load(db=db, data={"_relation__is_protected": True})
rel.load(db=db, data={"_relation__is_protected": True})
hash2 = hash(rel)

await rel.load(db=db, data={"_relation__is_protected": False})
rel.load(db=db, data={"_relation__is_protected": False})
hash3 = hash(rel)

assert hash1 == hash3
assert hash1 != hash2

# Update node property back and forth and check that hash is the same as well
await rel.load(db=db, data={"_relation__owner": first_account})
rel.load(db=db, data={"_relation__owner": first_account})
hash4 = hash(rel)

await rel.load(db=db, data={"_relation__owner": None})
rel.load(db=db, data={"_relation__owner": None})
hash5 = hash(rel)

await rel.load(db=db, data={"_relation__owner": first_account})
rel.load(db=db, data={"_relation__owner": first_account})
hash6 = hash(rel)

assert hash4 == hash6
Expand Down
4 changes: 2 additions & 2 deletions backend/tests/unit/core/test_relationship_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ async def test_query_RelationshipDeleteQuery(
)

rel = Relationship(schema=rel_schema, branch=branch, node=person_jack_tags_main)
await rel.load(db=db, data=rel_data)
rel.load(db=db, data=rel_data)

query = await RelationshipDeleteQuery.init(
db=db,
Expand Down Expand Up @@ -386,7 +386,7 @@ def get_active_path_and_rel(all_paths, previous_rel: str):
)

rel = Relationship(schema=rel_schema, branch=branch, node=person_jack_tags_main)
await rel.load(db=db, data=rel_data)
rel.load(db=db, data=rel_data)

query = await RelationshipDeleteQuery.init(
db=db,
Expand Down
Loading