Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 104 additions & 0 deletions src/dodal/common/maths.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
from collections.abc import Iterable
from dataclasses import dataclass
from typing import Self

import numpy as np
from ophyd_async.core import (
Array1D,
)


def step_to_num(start: float, stop: float, step: float) -> tuple[float, float, int]:
Expand Down Expand Up @@ -146,3 +153,100 @@ def rotate_clockwise(theta: float, x: float, y: float) -> tuple[float, float]:

def rotate_counter_clockwise(theta: float, x: float, y: float) -> tuple[float, float]:
return rotate_clockwise(-theta, x, y)


MotorOffsetAndPhase = Array1D[np.float32]


@dataclass
class AngleWithPhase:
"""Represents a point in an absolute rotational space which is defined by a phase where 0<=phase<360
and an offset from an origin where the absolute coordinate is the sum of the phase and the offset.

Attributes:
offset: The offset of 0 phase from some other unwrapped rotational coordinate space
phase: The phase in degrees relative to this offset.
"""

offset: float
phase: float = 0.0

def __post_init__(self) -> None:
correction = 360 * (self.phase // 360)
self.offset += correction
self.phase -= correction

@classmethod
def from_iterable(cls, values: Iterable[float]) -> Self:
"""Construct a normalised representation of the offset and phase, such that
0 <= phase < 360.

Args:
values (Iterable[float]): the offset and phase as a list or other iterable
"""
offset, phase = values
return cls(offset, phase)

@classmethod
def wrap(cls, unwrapped: float) -> "AngleWithPhase":
"""Construct a representation such that offset = n * 360 and 0 <= phase < 360.

Args:
unwrapped (float): The unwrapped angle in degrees
"""
offset = AngleWithPhase.offset_from_unwrapped(unwrapped)
return cls(offset, unwrapped - offset)

def rebase_to(self, other: Self) -> "AngleWithPhase":
"""Return this angle with the offset adjusted such that the phases can be compared."""
correction = other.offset - self.offset
if correction % 360:
return AngleWithPhase.from_iterable(
[self.offset + correction, self.phase - correction]
)
else:
return self

def unwrap(self) -> float:
"""Generate the unwrapped representation of this angle."""
return self.offset + self.phase

def phase_distance(self, phase: float) -> float:
"""Determine the shortest distance between this angle and the specified phase.

Args:
phase (float): The phase angle to compare to
"""
phase = phase % 360
max_theta = max(self.phase, phase)
min_theta = min(self.phase, phase)
return min(max_theta - min_theta, min_theta + 360 - max_theta)

@classmethod
def offset_from_unwrapped(cls, unwrapped_deg: float) -> float:
"""Obtain the offset from the corresponding wrapped angle in degrees."""
return round(unwrapped_deg // 360) * 360

def nearest_with_phase(self, phase_deg: float) -> "AngleWithPhase":
"""Return the nearest angle to this one with the specified phase."""
phase_deg = phase_deg % 360
if phase_deg > self.phase:
return (
AngleWithPhase(self.offset, phase_deg)
if phase_deg - self.phase <= 180
else AngleWithPhase(self.offset - 360, phase_deg)
)
else:
return (
AngleWithPhase(self.offset, phase_deg)
if self.phase - phase_deg <= 180
else AngleWithPhase(self.offset + 360, phase_deg)
)


def reflect_phase(phase) -> float:
"""Convert the phase angle as if the corresponding unwrapped angle were to
be reflected about the origin and then re-wrapped, this corresponds to
converting a clockwise angle to an anti-clockwise angle and vice-versa.
"""
return (360 - phase) % 360
4 changes: 2 additions & 2 deletions src/dodal/devices/aithre_lasershaping/goniometer.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from ophyd_async.epics.motor import Motor

from dodal.devices.motors import XYZOmegaStage, create_axis_perp_to_rotation
from dodal.devices.motors import XYZWrappedOmegaStage, create_axis_perp_to_rotation


class Goniometer(XYZOmegaStage):
class Goniometer(XYZWrappedOmegaStage):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
class Goniometer(XYZWrappedOmegaStage):
class Goniometer(XYZOmegaStage):
def(
self,
prefix: str,
name: str = ""
...
):
super().__init__(prefix=prefix, name=name, ...)
with self.add_children_as_readables():
self.wrapped_omega = WrappedAxis(self.omega)
...

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my comment below re Smargon

"""The Aithre lab goniometer and the XYZ stage it sits on.

`x`, `y` and `z` control the axes of the positioner at the base, while `sampy` and
Expand Down
19 changes: 19 additions & 0 deletions src/dodal/devices/motors.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from ophyd_async.epics.motor import Motor

from dodal.common.maths import rotate_clockwise, rotate_counter_clockwise
from dodal.devices.wrapped_axis import WrappedAxis

_X = "X"
_Y = "Y"
Expand Down Expand Up @@ -114,9 +115,27 @@ def __init__(
) -> None:
with self.add_children_as_readables():
self.omega = Motor(prefix + omega_infix)

super().__init__(prefix, name, x_infix, y_infix, z_infix)


class XYZWrappedOmegaStage(XYZOmegaStage):
"""Four-axis stage with x, y, z linear axes and an omega axis with unrestricted rotation."""

def __init__(
self,
prefix: str = "",
name: str = "",
x_infix: str = _X,
y_infix: str = _Y,
z_infix: str = _Z,
omega_infix: str = _OMEGA,
):
super().__init__(prefix, name, x_infix, y_infix, z_infix, omega_infix)
with self.add_children_as_readables():
self.wrapped_omega = WrappedAxis(self.omega)


class XYZAzimuthStage(XYZStage):
"""Four-axis stage with a standard xyz stage and one axis of rotation: azimuth."""

Expand Down
21 changes: 18 additions & 3 deletions src/dodal/devices/smargon.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
from ophyd_async.epics.core import epics_signal_r, epics_signal_rw
from ophyd_async.epics.motor import Motor

from dodal.devices.motors import XYZOmegaStage
from dodal.common.maths import AngleWithPhase
from dodal.devices.motors import XYZWrappedOmegaStage
from dodal.devices.util.epics_util import SetWhenEnabled


Expand Down Expand Up @@ -78,7 +79,7 @@ class CombinedMove(TypedDict, total=False):
chi: float | None


class Smargon(XYZOmegaStage, Movable):
class Smargon(XYZWrappedOmegaStage, Movable):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the only device that uses the wrapped omega axis. Rather than creating XYZWrappedOmegaStage, you should change it so Smargon still uses XYZOmegaStage and then just wrap the omega axis directly here.

class Smargon(XYZWrappedOmegaStage, Movable[float]):

     def __init__(self, prefix: str, name: str = ""):
         super().__init__(prefix=prefix, name=name)
         with self.add_children_as_readables():
               self.wrapped_omega = WrappedAxis(self.omega)
               # Rest of smargon logic
               ...

Copy link
Copy Markdown
Contributor Author

@rtuck99 rtuck99 Apr 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Goniometer also inherits from XYZWrappedOmegaStage.

Goniometer is used in the same pin tip centring plan that is used with the Smargon, and which following DiamondLightSource/mx-bluesky#1640 does use the wrapped axis, so we need a common abstraction.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm. My preference would be to still use XYZOmegaStage, and then have your plan include a parameter called wrapped_axis or wrapped_omega. So it looks like this:

@pydantic.dataclasses.dataclass(config={"arbitrary_types_allowed": True})
class PinTipCentringComposite:
    """All devices which are directly or indirectly required by this plan"""

    oav: OAV
    gonio: XYZOmegaStage
    gonio_wrapped_axis: WrappedAxis
    pin_tip_detection: PinTipDetection

The reason being is this makes it way more generic. Any axis can now be wrapped. This means we don't need to create a new subclass for every possible stage combination which needs an axis wrapped.

However, looking at it more I'm not sure this would be supported by BlueAPI as it is a child device and from what I can tell, composites are extracted from being a unique type (correct me if wrong?).

"""Real motors added to allow stops following pin load (e.g. real_x1.stop() )
X1 and X2 real motors provide compound chi motion as well as the compound X travel,
increasing the gap between x1 and x2 changes chi, moving together changes virtual x.
Expand All @@ -104,6 +105,15 @@ def __init__(self, prefix: str, name: str = ""):

super().__init__(prefix, name)

async def _get_target_value(self, motor_name: str, value: float) -> float:
if motor_name == "omega":
current_angle = AngleWithPhase.from_iterable(
await self.wrapped_omega.offset_and_phase.get_value()
)
return current_angle.nearest_with_phase(value).unwrap()
else:
return value

@AsyncStatus.wrap
async def set(self, value: CombinedMove):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should: I'm not sure I like that the co-ordinate system of this set is different to that of omega.set. I think it is possible to get the underlying motor object to always be moving based on phase.

Copy link
Copy Markdown
Contributor Author

@rtuck99 rtuck99 Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think making the underlying motor object represent phase is a bad idea for reasons I will state elsewhere in this PR, however I agree that having the omega coordinate as the only wrapped value in CombinedMove is somewhat ugly. The way I see, it the options are:

  • CombinedMove.omega is unwrapped and the plan unwraps the coordinate before specifying it - also ugly, and means you have to read the current position in the plan.
  • CombinedMove.omega_phase becomes a thing and you can specify the phase there, specifying omega_phase
  • We remove support for setting omega from CombinedMove - following Smargon combined moves should perform omega moves separately #1998 it seems the consensus is that omega moves should be serialized anyway because we've decided there are likely hardware limitations that mean we can't reliably do simultaneous fast omega moves at the same time as other axes. I think this is what I will ultimately do in the follow-on PR
  • We implement a separate move_to_robot_load which just moves to 0,0,0,0,0,mod-360 - since this is the only time we need to do the combined move apart from moving to the next xtal. We could do this in conjunction with # 3 if we want to make this an explicit smargon feature, but I think it's not strictly necessary.

"""This will move all motion together in a deferred move.
Expand All @@ -114,11 +124,16 @@ async def set(self, value: CombinedMove):
only come back after the motion on that axis finished.
"""
await self.defer_move.set(DeferMoves.ON)
# TODO Hotfix required here until https://github.com/DiamondLightSource/dodal/issues/1998
# is implemented in separate PR
try:
finished_moving = []
for motor_name, new_setpoint in value.items():
if new_setpoint is not None and isinstance(new_setpoint, int | float):
axis: Motor = getattr(self, motor_name)
new_setpoint = await self._get_target_value(
motor_name, new_setpoint
)
axis = getattr(self, motor_name)
await axis.check_motor_limit(
await axis.user_setpoint.get_value(), new_setpoint
)
Expand Down
73 changes: 73 additions & 0 deletions src/dodal/devices/wrapped_axis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import numpy as np
from ophyd_async.core import (
Reference,
StandardReadable,
StandardReadableFormat,
derived_signal_rw,
)
from ophyd_async.epics.motor import Motor

from dodal.common.maths import AngleWithPhase, MotorOffsetAndPhase


class WrappedAxis(StandardReadable):
"""This device is a wrapper around a rotational Motor that presents the unwrapped coordinate system of the
Comment thread
rtuck99 marked this conversation as resolved.
underlying motor as a combination of a phase angle and an offset from the motor's origin coordinate.

Attributes:
phase (float): This is a read-write signal that corresponds to the motor's phase angle, relative to the offset.
The behaviour of the phase signal differs from that of the underlying motor setpoint/readback signal
in the following ways:
* Readback values are constrained to 0 <= omega < 360
* Write values are normalised to 0 <= omega < 360, and then mapped to the nearest unwrapped angle. The
underlying motor will be moved via the shortest path to the required phase angle, thus the direction
of an unwrapped axis move is not always the same as the direction of a move in the wrapped axis.
* Write values are normalised so for un-normalised writes, the readback will differ.
* Bluesky mvr operations greater of 180 degrees or more will not result in the expected moves.
* set_and_wait_for_value() is unreliable close to the wrap-around (however in the common case this is
where deadband is 0.001 and values are rounded to this level, the default equality comparison is sufficient).
* Sequences of moves on the unwrapped axis that would not result in cumulative motion axis will
result in a cumulative motion in the wrapped axis. e.g. 0 -> 120 -> 240 -> 0 is 0 degrees of real
cumulative motion when performed in the unwrapped case, but is 360 degrees of real motion when performed
on the wrapped axis.
* The reverse is also true 0 -> 240 -> 360 -> is 0 degrees of real motion when executed on the wrapped axis
but 360 degrees when performed in the unwrapped axis.
* The above means that use of phase to set motor position is unsuitable for axes
where the underlying motor rotation is constrained.
offset_and_phase (Array1D[np.float32]): retrieve the offset and phase together, for use when
mapping to the underlying unwrapped axis. These values can be converted and manipulated using the
AngleWithPhase helper class.
"""

def __init__(self, real_motor: Motor, name=""):
self._real_motor = Reference(real_motor)
with self.add_children_as_readables(StandardReadableFormat.HINTED_SIGNAL):
self.offset_and_phase = derived_signal_rw(
self._get_motor_offset_and_phase,
self._set_motor_offset_and_phase,
motor_pos=real_motor,
)
with self.add_children_as_readables():
self.phase = derived_signal_rw(
self._get_phase, self._set_phase, offset_and_phase=self.offset_and_phase
)
Comment on lines +50 to +53
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have a signal that has offset_and_phase, why do we need phase as well which gives the same information?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

phase is very convenient to directly read and write from plans, most plans are not interested in absolute position, but the extra information is useful when you are.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it not make more sense to just have one signal for offset and one for phase?

super().__init__(name=name)

def _get_motor_offset_and_phase(self, motor_pos: float) -> MotorOffsetAndPhase:
angle = AngleWithPhase.wrap(motor_pos)
return np.array([angle.offset, angle.phase])

async def _set_motor_offset_and_phase(self, value: MotorOffsetAndPhase):
await self._real_motor().set(AngleWithPhase.from_iterable(value).unwrap())

def _get_phase(self, offset_and_phase: MotorOffsetAndPhase) -> float:
return offset_and_phase[1].item()

async def _set_phase(self, value: float):
"""Set the motor phase to the specified phase value in degrees.
The motor will travel via the shortest distance path.
"""
offset_and_phase = await self.offset_and_phase.get_value()
current_position = AngleWithPhase.from_iterable(offset_and_phase)
target_value = current_position.nearest_with_phase(value).unwrap()
await self._real_motor().set(target_value)
Loading
Loading