Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/ansys/dpf/post/harmonic_mechanical_simulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,10 @@ def _get_result_workflow(
averaging_config: AveragingConfig = AveragingConfig(),
rescoping: Optional[_Rescoping] = None,
shell_layer: Optional[shell_layers] = None,
) -> (dpf.Workflow, Union[str, list[str], None], str):
) -> Tuple[dpf.Workflow, Union[str, list[str], None], str]:
"""Generate (without evaluating) the Workflow to extract results."""
result_workflow_inputs = _create_result_workflow_inputs(
available_results=self.results,
base_name=base_name,
category=category,
components=components,
Expand Down
5 changes: 3 additions & 2 deletions src/ansys/dpf/post/modal_mechanical_simulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
-------------------------
"""
from typing import List, Optional, Union
from typing import List, Optional, Tuple, Union

from ansys.dpf.core import shell_layers

Expand Down Expand Up @@ -70,9 +70,10 @@ def _get_result_workflow(
averaging_config: AveragingConfig = AveragingConfig(),
rescoping: Optional[_Rescoping] = None,
shell_layer: Optional[shell_layers] = None,
) -> (dpf.Workflow, Union[str, list[str], None], str):
) -> Tuple[dpf.Workflow, Union[str, list[str], None], str]:
"""Generate (without evaluating) the Workflow to extract results."""
result_workflow_inputs = _create_result_workflow_inputs(
available_results=self.results,
base_name=base_name,
category=category,
components=components,
Expand Down
11 changes: 7 additions & 4 deletions src/ansys/dpf/post/result_workflows/_build_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from typing import Callable, List, Optional, Union

from ansys.dpf.core import Operator, Workflow, shell_layers
from ansys.dpf.core.available_result import _result_properties
from ansys.dpf.core.available_result import AvailableResult
from ansys.dpf.core.common import locations

from ansys.dpf.post.result_workflows._component_helper import (
Expand All @@ -45,6 +45,7 @@
from ansys.dpf.post.result_workflows._utils import (
AveragingConfig,
_CreateOperatorCallable,
_get_native_location,
_Rescoping,
)
from ansys.dpf.post.selection import Selection, _WfNames
Expand Down Expand Up @@ -120,6 +121,7 @@ class _CreateWorkflowInputs:


def _requires_manual_averaging(
available_results: list[AvailableResult],
base_name: str,
location: str,
category: ResultCategory,
Expand All @@ -128,8 +130,7 @@ def _requires_manual_averaging(
create_operator_callable: Callable[[str], Operator],
average_per_body: bool,
):
res = _result_properties[base_name] if base_name in _result_properties else None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could also have a function here which redirects from B_N and whatnot to SMISC as you did, but does not handle getting the native location.
This way the handling of the native location remains the same, and you have a clear function which checks for availability of results and throws a precise error when it fails.

native_location = res["location"] if res is not None else None
native_location = _get_native_location(available_results, base_name)

if average_per_body and (
native_location == locations.elemental
Expand All @@ -138,7 +139,7 @@ def _requires_manual_averaging(
return True
if category == ResultCategory.equivalent and base_name[0] == "E": # strain eqv
return True
if res is not None:
if native_location is not None:
is_model_cyclic = create_operator_callable("is_cyclic").eval()
is_model_cyclic = is_model_cyclic in ["single_stage", "multi_stage"]
if has_external_layer and is_model_cyclic and location != native_location:
Expand Down Expand Up @@ -265,6 +266,7 @@ def _create_result_workflows(


def _create_result_workflow_inputs(
available_results: list[AvailableResult],
base_name: str,
category: ResultCategory,
components: Union[str, List[str], int, List[int], None],
Expand All @@ -284,6 +286,7 @@ def _create_result_workflow_inputs(
)

force_elemental_nodal = _requires_manual_averaging(
available_results=available_results,
base_name=base_name,
location=location,
category=category,
Expand Down
26 changes: 26 additions & 0 deletions src/ansys/dpf/post/result_workflows/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from typing import Optional, Protocol

from ansys.dpf.core import Operator, Workflow
from ansys.dpf.core.available_result import AvailableResult

from ansys.dpf.post.selection import _WfNames

Expand Down Expand Up @@ -122,3 +123,28 @@ def _append_workflow(new_wf: Optional[Workflow], last_wf: Workflow):
output_input_names={_WfNames.output_data: _WfNames.input_data},
)
return new_wf


def _get_native_location(
available_results: list[AvailableResult], base_name: str
) -> str:
"""Get the native location of a result from its base name."""
res = next((r for r in available_results if r.operator_name == base_name), None)

# special case for beam results, which are extracted from SMISC
if res is None and base_name in [
"B_N",
"B_M1",
"B_M2",
"B_MT",
"B_SN",
"B_EL",
"B_T1",
"B_T2",
]:
res = next((r for r in available_results if r.operator_name == "SMISC"), None)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@FedericoNegri so all SMISC results have the same native location then?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, that's my understanding


if res is not None:
return res.native_location

raise ValueError(f"Result with base name '{base_name}' not found.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
raise ValueError(f"Result with base name '{base_name}' not found.")
raise ValueError(f"Unknown native location for result with base name '{base_name}'.")

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either we make this check explicit here, or we move the check for existing result with base name somewhere above.
This function checks for the native location, not whether the result exists.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function checks for the native location, not whether the result exists.

I disagree, the function throws if it can't find the result, which is a prerequisite for querying its location.
Unknown native location would be an appropriate error message for a native location set to None.

we move the check for existing result with base name somewhere above

I've been thinking about this and considered having a function

def _get_available_result(
    available_results: list[AvailableResult], base_name: str
) -> AvailableResult:
    res = next((r for r in available_results if r.operator_name == base_name), None)

    # special case for beam results, which are extracted from SMISC
    if res is None and base_name in [
        "B_N",
        "B_M1",
        "B_M2",
        "B_MT",
        "B_SN",
        "B_EL",
        "B_T1",
        "B_T2",
    ]:
        res = next((r for r in available_results if r.operator_name == "SMISC"), None)

    if res is None:
        raise ValueError(f"Result with base name '{base_name}' is not available.")

    return res

but I think it would just be misleading. You come in with B_N (axial force) and it returns you an available result with physical name 'Elemental Summable Miscellaneous Data', no unit etc.

The proper fix is to have those beam results be listed as available results (that was discussed internally).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree on the proper fix.
Let's call this a quick fix then.

11 changes: 3 additions & 8 deletions src/ansys/dpf/post/simulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
from ansys.dpf.post.meshes import Meshes
from ansys.dpf.post.result_workflows._build_workflow import _requires_manual_averaging
from ansys.dpf.post.result_workflows._component_helper import ResultCategory
from ansys.dpf.post.result_workflows._utils import _Rescoping
from ansys.dpf.post.result_workflows._utils import _get_native_location, _Rescoping
from ansys.dpf.post.selection import Selection


Expand Down Expand Up @@ -636,6 +636,7 @@ def _build_selection(
has_skin = len(skin) > 0

requires_manual_averaging = _requires_manual_averaging(
available_results=self.results,
base_name=base_name,
location=location,
category=category,
Expand All @@ -658,13 +659,7 @@ def _build_selection(
if requires_manual_averaging and location != locations.elemental_nodal:
location = locations.elemental_nodal

available_results = self._model.metadata.result_info.available_results
result_info = next(
(r for r in available_results if r.operator_name == base_name), None
)
result_native_location = None
if result_info is not None:
result_native_location = result_info.native_location
result_native_location = _get_native_location(self.results, base_name)

# Create the SpatialSelection

Expand Down
3 changes: 2 additions & 1 deletion src/ansys/dpf/post/static_mechanical_simulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,10 @@ def _get_result_workflow(
averaging_config: AveragingConfig = AveragingConfig(),
rescoping: Optional[_Rescoping] = None,
shell_layer: Optional[shell_layers] = None,
) -> (core.Workflow, Union[str, list[str], None], str):
) -> Tuple[core.Workflow, Union[str, list[str], None], str]:
"""Generate (without evaluating) the Workflow to extract results."""
result_workflow_inputs = _create_result_workflow_inputs(
available_results=self.results,
base_name=base_name,
category=category,
components=components,
Expand Down
3 changes: 2 additions & 1 deletion src/ansys/dpf/post/transient_mechanical_simulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,10 @@ def _get_result_workflow(
averaging_config: AveragingConfig = AveragingConfig(),
rescoping: Optional[_Rescoping] = None,
shell_layer: Optional[shell_layers] = None,
) -> (dpf.Workflow, Union[str, list[str], None], str):
) -> Tuple[dpf.Workflow, Union[str, list[str], None], str]:
"""Generate (without evaluating) the Workflow to extract results."""
result_workflow_inputs = _create_result_workflow_inputs(
available_results=self.results,
base_name=base_name,
category=category,
components=components,
Expand Down
34 changes: 4 additions & 30 deletions tests/test_simulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -2203,36 +2203,10 @@ def test_kinetic_energy(self, transient_simulation):
assert np.allclose(field.data, field_ref.data)

def test_structural_temperature(self, transient_simulation):
result = transient_simulation.structural_temperature(set_ids=[2])
assert len(result._fc) == 1
assert result._fc.get_time_scoping().ids == [2]
field = result._fc[0]
op = transient_simulation._model.operator("BFE")
field_ref = op.eval()[0]
assert field.component_count == 1
assert np.allclose(field.data, field_ref.data)

def test_structural_temperature_nodal(self, transient_simulation):
result = transient_simulation.structural_temperature_nodal(set_ids=[2])
assert len(result._fc) == 1
assert result._fc.get_time_scoping().ids == [2]
field = result._fc[0]
op = transient_simulation._model.operator("BFE")
op.connect(9, post.locations.nodal)
field_ref = op.eval()[0]
assert field.component_count == 1
assert np.allclose(field.data, field_ref.data)

def test_structural_temperature_elemental(self, transient_simulation):
result = transient_simulation.structural_temperature_elemental(set_ids=[2])
assert len(result._fc) == 1
assert result._fc.get_time_scoping().ids == [2]
field = result._fc[0]
op = transient_simulation._model.operator("BFE")
op.connect(9, post.locations.elemental)
field_ref = op.eval()[0]
assert field.component_count == 1
assert np.allclose(field.data, field_ref.data)
# the model does not contain structural temperature results
with pytest.raises(ValueError) as excinfo:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@FedericoNegri so this test used to silently fail at finding the native location, and returned something wrong?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case the test was comparing trivial arrays, there's an internal bug report for that operator. But in general the logic before was resorting to some defaults in case of missing location. That resulted in hidden errors for results that weren't registered in the _result_properties dict.

_ = transient_simulation.structural_temperature(set_ids=[1])
assert "not found" in str(excinfo.value)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
assert "not found" in str(excinfo.value)
assert "Unknown native location" in str(excinfo.value)


# @pytest.mark.skipif(
# not SERVERS_VERSION_GREATER_THAN_OR_EQUAL_TO_5_0,
Expand Down