Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
275 changes: 275 additions & 0 deletions src/pqnstack/pqn/drivers/thorlabs_polarimeter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,275 @@
from __future__ import annotations

import atexit
import contextlib
import logging
import time
from dataclasses import dataclass
from dataclasses import field
from typing import Any

import pyvisa
from pyvisa.errors import Error as VisaError

from pqnstack.base.errors import DeviceNotStartedError
from pqnstack.base.instrument import Instrument
from pqnstack.base.instrument import InstrumentInfo
from pqnstack.base.instrument import log_operation
from pqnstack.base.instrument import log_parameter

logger = logging.getLogger(__name__)

USB_FILTER = "USB?*::INSTR"
CMD_ENABLE_CALC = "SENS:CALC 1"
CMD_ENABLE_ROTATION = "INP:ROT:STAT 1"
CMD_DISABLE_CALC = "SENS:CALC 0"
CMD_DISABLE_ROTATION = "INP:ROT:STAT 0"
CMD_SET_WAVELENGTH_METERS = "SENS:CORR:WAV"
QRY_IS_CALC_ENABLED = "SENS:CALC?"
QRY_IS_ROTATION_ENABLED = "INP:ROT:STAT?"
QRY_WAVELENGTH_METERS = "SENS:CORR:WAV?"
QRY_LATEST = "SENS:DATA:LAT?"
QRY_IDN = "*IDN?"


@dataclass(frozen=True, slots=True)
class PAX1000IR2Info(InstrumentInfo):
wavelength_nm: float = float("nan")
last_theta_deg: float = float("nan")
last_eta_deg: float = float("nan")
last_dop: float = float("nan")
last_power_w: float = float("nan")
logging_rows: int = 0


@dataclass(slots=True)
class PAX1000IR2(Instrument):
name: str
desc: str
hw_address: str
parameters: set[str] = field(default_factory=set)
operations: dict[str, Any] = field(default_factory=dict)
Comment on lines +45 to +51
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As with other drivers, use an intermediate protocol between Instrument and the wrapper.

Suggested change
@dataclass(slots=True)
class PAX1000IR2(Instrument):
name: str
desc: str
hw_address: str
parameters: set[str] = field(default_factory=set)
operations: dict[str, Any] = field(default_factory=dict)
@runtime_checkable
@dataclass(slots=True)
class ThorlabsPolarimeterInstrument(Instrument, Protocol):
def __post_init__(self) -> None:
self.operations["read"] = self.read
self.operations["set_wavelength"] = self.set_wavelength
@log_operation
def read(self) -> ThorlabsPolarimeterInfo: ...
@log_operation
def set_wavelength(self, value: float) -> None: ...
@dataclass(slots=True)
class PAX1000IR2(ThorlabsPolarimeterInstrument):

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be honest, seems superfluous for one off drivers like this. If in the future we find there is a lot of shared utility like with motors, we can make a parent class for the group, else no point in my opinion


pax_id_contains: str | None = None
pax_idn_contains: str = "PAX1000"

_rm: Any | None = field(default=None, init=False, repr=False)
_instr: Any | None = field(default=None, init=False, repr=False)
_timeout_ms: int = field(default=3000, init=False, repr=False)

_wavelength_nm_cache: float = field(default=float("nan"), init=False)
_last_theta_deg: float = field(default=float("nan"), init=False)
_last_eta_deg: float = field(default=float("nan"), init=False)
_last_dop: float = field(default=float("nan"), init=False)
_last_power_w: float = field(default=float("nan"), init=False)

def _write(self, cmd: str) -> None:
if self._instr is None:
msg = "Start the device first."
raise DeviceNotStartedError(msg)
try:
self._instr.write(f"{cmd}\n")
except (VisaError, OSError):
with contextlib.suppress(VisaError, OSError):
self._instr.write(cmd)

def _query(self, cmd: str) -> str:
if self._instr is None:
msg = "Start the device first."
raise DeviceNotStartedError(msg)
try:
self._instr.write(f"{cmd}\n")
return str(self._instr.read()).strip()
except (VisaError, OSError):
try:
return str(self._instr.query(cmd)).strip()
except (VisaError, OSError):
return ""

def _list_usb_resources(self) -> tuple[str, ...]:
assert self._rm is not None
try:
return self._rm.list_resources(USB_FILTER) # type: ignore[no-any-return]
except VisaError as exc:
msg = f"VISA resource discovery failed: {exc}"
raise FileNotFoundError(msg) from exc

def _filter_candidates(self, resources: tuple[str, ...]) -> tuple[str, ...]:
if self.pax_id_contains:
return tuple(r for r in resources if self.pax_id_contains in r)
return resources

def _probe_idn(self, resource_name: str) -> str:
assert self._rm is not None
try:
with self._rm.open_resource(resource_name) as resource_handle:
visa_resource: Any = resource_handle
visa_resource.timeout = self._timeout_ms
try:
visa_resource.write(f"{QRY_IDN}\n")
return str(visa_resource.read()).strip()
except (VisaError, OSError):
try:
return str(visa_resource.query(QRY_IDN)).strip()
except (VisaError, OSError):
return ""
except VisaError as exc:
logger.debug("Resource probe failed for %s: %s", resource_name, exc)
return ""

def _discover_resource(self) -> str:
resources = self._filter_candidates(self._list_usb_resources())
if not resources:
msg = "No USB VISA resources matched filter."
raise FileNotFoundError(msg)
if len(resources) == 1 and not self.pax_idn_contains:
return resources[0]
idn_substring = self.pax_idn_contains or ""
matched = [r for r in resources if (not idn_substring) or (idn_substring in self._probe_idn(r))]
if len(matched) != 1:
msg = "PAX discovery ambiguous or no match."
raise FileNotFoundError(msg)
return matched[0]

def _open_resource(self, resource_name: str) -> None:
assert self._rm is not None
try:
self._instr = self._rm.open_resource(resource_name)
self._instr.timeout = self._timeout_ms
except VisaError as exc:
self._instr = None
msg = f"Failed to open VISA resource {resource_name}: {exc}"
raise RuntimeError(msg) from exc

def _write_and_confirm(self, set_cmd: str, qry_cmd: str, expect: str | float) -> bool:
try:
self._write(set_cmd)
except DeviceNotStartedError:
return False
expected_prefix = str(expect)
for _ in range(10):
try:
last_response = self._query(qry_cmd)
except DeviceNotStartedError:
last_response = ""
if last_response.startswith(expected_prefix):
return True
time.sleep(0.05)
return False

def _init_settings(self) -> None:
calc_ok = self._write_and_confirm(CMD_ENABLE_CALC, QRY_IS_CALC_ENABLED, 1)
rot_ok = self._write_and_confirm(CMD_ENABLE_ROTATION, QRY_IS_ROTATION_ENABLED, 1)
if not (calc_ok and rot_ok):
with contextlib.suppress(Exception):
self._write_and_confirm(CMD_DISABLE_CALC, QRY_IS_CALC_ENABLED, 0)
self._write_and_confirm(CMD_DISABLE_ROTATION, QRY_IS_ROTATION_ENABLED, 0)
msg = "PAX setup failed to enable calc/rotation."
raise RuntimeError(msg)

def _read_wavelength_cache(self) -> None:
try:
raw_value = self._query(QRY_WAVELENGTH_METERS)
self._wavelength_nm_cache = float(raw_value) * 1e9
except (ValueError, TypeError):
self._wavelength_nm_cache = float("nan")

@log_operation
def set_wavelength_nm(self, wavelength_nm: float) -> None:
try:
value_m = float(wavelength_nm) * 1e-9
except (TypeError, ValueError) as exc:
msg = f"Invalid wavelength: {wavelength_nm}"
raise ValueError(msg) from exc
self._write(f"{CMD_SET_WAVELENGTH_METERS} {value_m}")
self._read_wavelength_cache()

def start(self) -> None:
if self._instr is not None:
return
try:
self._rm = pyvisa.ResourceManager("@py")
except Exception as exc:
msg = f"VISA backend not available: {exc}"
raise RuntimeError(msg) from exc

resource_name = self.hw_address or self._discover_resource()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resource discovery feels beyond the scope of a slim wrapper. Users can provide a hw_address with the other configuration just like every other Instrument.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find it quite hard to connect to instruments via visa, I'd prefer this method whenever visa is absolutely necessary, though I prefer using USBTMC when possible.

self._open_resource(resource_name)
self._init_settings()
self._read_wavelength_cache()

self.operations.update(
{
"read": self.read,
"set_wavelength_nm": self.set_wavelength_nm,
}
)
atexit.register(self.close)

def close(self) -> None:
if self._instr is not None:
with contextlib.suppress(Exception):
self._write(CMD_DISABLE_CALC)
self._write(CMD_DISABLE_ROTATION)
_ = self._query(QRY_IS_CALC_ENABLED)
_ = self._query(QRY_IS_ROTATION_ENABLED)
with contextlib.suppress(Exception):
self._instr.close()
self._instr = None
if self._rm is not None:
with contextlib.suppress(Exception):
self._rm.close()
self._rm = None

@property
def info(self) -> PAX1000IR2Info:
return PAX1000IR2Info(
name=self.name,
desc=self.desc,
hw_address=self.hw_address,
hw_status={"connected": self._instr is not None},
wavelength_nm=self._wavelength_nm_cache,
last_theta_deg=self._last_theta_deg,
last_eta_deg=self._last_eta_deg,
last_dop=self._last_dop,
last_power_w=self._last_power_w,
)

@property
@log_parameter
def wavelength_nm(self) -> float:
return self._wavelength_nm_cache

@log_operation
def read(self) -> dict[str, float]:
if self._instr is None:
msg = "Start the device first."
raise DeviceNotStartedError(msg)
raw_reply = self._query(QRY_LATEST)
token_strs = [p for p in raw_reply.replace(";", ",").split(",") if p]
parsed_values: list[float | str] = []
for token_str in token_strs:
try:
parsed_values.append(float(token_str))
except (ValueError, TypeError):
parsed_values.append(token_str)

def get_float_at(index: int) -> float:
try:
value = parsed_values[index]
return float(value) if isinstance(value, (float, int)) else float(str(value))
except (ValueError, TypeError, IndexError):
return float("nan")

self._last_theta_deg = get_float_at(9)
self._last_eta_deg = get_float_at(10)
self._last_dop = get_float_at(11)
self._last_power_w = get_float_at(12)

return {
"pax_theta_deg": self._last_theta_deg,
"pax_eta_deg": self._last_eta_deg,
"pax_dop": self._last_dop,
"pax_power_w": self._last_power_w,
"pax_wavelength_nm": self._wavelength_nm_cache,
}
9 changes: 9 additions & 0 deletions test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from time import sleep

from pqnstack.pqn.drivers.powermeter import PM100DDevice

powermeter = PM100DDevice(name="bob", desc="desk", address="/dev/usbtmc0")
powermeter.start()
sleep(5)
while True:
sleep(0.5)