diff --git a/README.md b/README.md index ed08ef5..1ca9e55 100644 --- a/README.md +++ b/README.md @@ -57,6 +57,13 @@ suite. What it does today: periodic-spot notch filters, TV denoising, point/plane zeroing, lossless and arbitrary geometry transforms, and derived arithmetic channels. Steps are recorded as a processing state so an export can be reproduced. +- **Advanced edge detection** (Process tab) — **Canny** and **Sobel/Scharr** + detectors with a live, non-destructive overlay preview and STM-tuned presets. + Results become reusable analysis objects: an overlay, a new image, an **active + mask**, or ROI(s). The active-mask layer (Masks tab) supports morphological + cleanup (remove small objects, fill holes, dilate/erode/open/close, + skeletonize), restricts statistics, excludes regions from a plane fit (via + mask→ROI), and is saved to a `.masks.json` sidecar. - **FFT tools** (the FFT viewer) — inspect the magnitude and radial profile with q in nm⁻¹; overlay a draggable reciprocal-lattice grid and apply an affine lattice correction; show Bragg-shell rings for a known structure; predict and diff --git a/probeflow/core/__init__.py b/probeflow/core/__init__.py index cff094e..f12bcda 100644 --- a/probeflow/core/__init__.py +++ b/probeflow/core/__init__.py @@ -33,7 +33,9 @@ ResizeHandle, resize_handles, resize_roi, + roi_from_mask, ) +from probeflow.core.mask import ImageMask, MaskSet, mask_name __all__ = [ "PLANE_CANON_NAMES", @@ -55,4 +57,8 @@ "ResizeHandle", "resize_handles", "resize_roi", + "roi_from_mask", + "ImageMask", + "MaskSet", + "mask_name", ] diff --git a/probeflow/core/mask.py b/probeflow/core/mask.py new file mode 100644 index 0000000..ffe1121 --- /dev/null +++ b/probeflow/core/mask.py @@ -0,0 +1,227 @@ +"""Active-mask layer: ``ImageMask`` and ``MaskSet`` — domain model only. + +A *mask* is a boolean array over an image that downstream tools consume: it +can restrict statistics, exclude regions from a background fit, or convert to +ROI objects. ``MaskSet`` is a deliberate structural twin of +:class:`probeflow.core.roi.ROISet` — same ``image_id`` / ``active_*_id`` / +``to_dict`` / ``from_dict`` shape — so its sidecar and viewer plumbing mirror +the ROI path. + +Edge detection is the first producer of masks; thresholding, manual paint and +segmentation can plug into the same layer later. + +This module is Qt-free and lives in ``core`` (alongside ``roi`` and +``processing_state``) so it can be imported without pulling in the GUI. +""" + +from __future__ import annotations + +import base64 +import uuid +from dataclasses import dataclass, field +from typing import Any + +import numpy as np + + +# ── Compact boolean-array (de)serialisation ───────────────────────────────────── + +def _pack_bool(data: np.ndarray) -> str: + """Pack a boolean array into a base64 string of bits (row-major).""" + bits = np.packbits(np.asarray(data, dtype=bool).ravel(order="C")) + return base64.b64encode(bits.tobytes()).decode("ascii") + + +def _unpack_bool(packed: str, shape: tuple[int, int]) -> np.ndarray: + """Inverse of :func:`_pack_bool`.""" + raw = np.frombuffer(base64.b64decode(packed.encode("ascii")), dtype=np.uint8) + n = int(shape[0]) * int(shape[1]) + bits = np.unpackbits(raw)[:n] + return bits.astype(bool).reshape(shape) + + +# ── Auto-naming ───────────────────────────────────────────────────────────────── + +def mask_name(method: str, params: dict[str, Any] | None = None) -> str: + """Build a descriptive mask name from method + parameters. + + Examples: ``Canny_sigma1.5_p60-85``, ``Sobel_magnitude_p90``, + ``Scharr_x_gradient``. + """ + p = params or {} + m = (method or "mask").lower() + if m == "canny": + sigma = p.get("sigma", 1.0) + bits = [f"Canny_sigma{_fmt_num(sigma)}"] + if str(p.get("threshold_mode", "percentile")) == "percentile": + bits.append(f"p{_fmt_num(p.get('low', 70))}-{_fmt_num(p.get('high', 90))}") + return "_".join(bits) + if m in ("sobel", "scharr"): + output = str(p.get("output", "magnitude")) + label = f"{m.capitalize()}_{output}" + if output != "orientation": + if p.get("threshold_to_mask"): + label += f"_p{_fmt_num(p.get('threshold', 90))}" + elif output in ("x", "y"): + label += "_gradient" + return label + return method + + +def _fmt_num(value: Any) -> str: + try: + f = float(value) + except (TypeError, ValueError): + return str(value) + return str(int(f)) if f == int(f) else f"{f:g}" + + +# ── Mask model ────────────────────────────────────────────────────────────────── + +@dataclass +class ImageMask: + """One boolean mask over an image, with provenance. + + *data* is the source of truth; *shape* is cached for serialisation. All + coordinates are pixel-space (``(Ny, Nx)``), matching :class:`ROI` masks. + """ + + id: str + name: str + data: np.ndarray + method: str = "manual" + parameters: dict[str, Any] = field(default_factory=dict) + + def __post_init__(self) -> None: + self.data = np.asarray(self.data, dtype=bool) + if self.data.ndim != 2: + raise ValueError(f"ImageMask data must be 2-D, got shape {self.data.shape}") + + @property + def shape(self) -> tuple[int, int]: + return (int(self.data.shape[0]), int(self.data.shape[1])) + + @classmethod + def new( + cls, + data: np.ndarray, + *, + method: str = "manual", + parameters: dict[str, Any] | None = None, + name: str | None = None, + ) -> "ImageMask": + """Create a mask with a fresh UUID and an auto-generated name.""" + params = dict(parameters or {}) + mask_id = str(uuid.uuid4()) + resolved = name if name is not None else mask_name(method, params) + return cls(id=mask_id, name=resolved, data=np.asarray(data, dtype=bool), + method=method, parameters=params) + + def count(self) -> int: + """Number of True pixels.""" + return int(self.data.sum()) + + # ── Serialisation ───────────────────────────────────────────────────────── + + def to_dict(self) -> dict[str, Any]: + return { + "id": self.id, + "name": self.name, + "method": self.method, + "parameters": dict(self.parameters), + "shape": list(self.shape), + "data": _pack_bool(self.data), + } + + @classmethod + def from_dict(cls, d: dict[str, Any]) -> "ImageMask": + shape = tuple(int(v) for v in d["shape"]) # type: ignore[assignment] + data = _unpack_bool(str(d["data"]), shape) # type: ignore[arg-type] + return cls( + id=str(d["id"]), + name=str(d["name"]), + data=data, + method=str(d.get("method", "manual")), + parameters=dict(d.get("parameters", {})), + ) + + +@dataclass +class MaskSet: + """An ordered collection of masks belonging to one image. + + Structural twin of :class:`probeflow.core.roi.ROISet`. *image_id* ties the + set to a scan; its value is opaque (typically the file stem or a UUID). + """ + + image_id: str + masks: list[ImageMask] = field(default_factory=list) + active_mask_id: str | None = None + + # ── Mutation ────────────────────────────────────────────────────────────── + + def add(self, mask: ImageMask) -> None: + self.masks.append(mask) + + def remove(self, mask_id: str) -> None: + self.masks = [m for m in self.masks if m.id != mask_id] + if self.active_mask_id == mask_id: + self.active_mask_id = None + + def get(self, mask_id: str) -> ImageMask | None: + for m in self.masks: + if m.id == mask_id: + return m + return None + + def get_by_name(self, name: str) -> ImageMask | None: + for m in self.masks: + if m.name == name: + return m + return None + + def set_active(self, mask_id: str | None) -> None: + if mask_id is not None and self.get(mask_id) is None: + raise ValueError(f"Mask {mask_id!r} not in this MaskSet") + self.active_mask_id = mask_id + + def active(self) -> ImageMask | None: + """Return the active mask, or None.""" + if self.active_mask_id is None: + return None + return self.get(self.active_mask_id) + + def replace(self, mask_id: str, data: np.ndarray) -> None: + """Replace the *data* of an existing mask in place (e.g. after cleanup).""" + mask = self.get(mask_id) + if mask is None: + raise ValueError(f"Mask {mask_id!r} not in this MaskSet") + mask.data = np.asarray(data, dtype=bool) + + # ── Serialisation ───────────────────────────────────────────────────────── + + def to_dict(self) -> dict[str, Any]: + return { + "image_id": self.image_id, + "masks": [m.to_dict() for m in self.masks], + "active_mask_id": self.active_mask_id, + } + + @classmethod + def from_dict(cls, d: dict[str, Any]) -> "MaskSet": + mask_set = cls( + image_id=str(d["image_id"]), + active_mask_id=d.get("active_mask_id"), + ) + for mask_dict in d.get("masks", []): + try: + mask_set.masks.append(ImageMask.from_dict(mask_dict)) + except (KeyError, TypeError, ValueError) as exc: + raise ValueError( + f"Failed to reconstruct ImageMask from dict: {exc!r} — " + f"offending dict keys: {sorted(mask_dict) if isinstance(mask_dict, dict) else mask_dict!r}" + ) from exc + # Drop a dangling active id rather than raising on load. + if mask_set.active_mask_id is not None and mask_set.get(mask_set.active_mask_id) is None: + mask_set.active_mask_id = None + return mask_set diff --git a/probeflow/core/roi.py b/probeflow/core/roi.py index f6169bb..4f704c9 100644 --- a/probeflow/core/roi.py +++ b/probeflow/core/roi.py @@ -630,6 +630,87 @@ def _ring_to_list(ring) -> list[list[float]]: ) +def roi_from_mask( + mask: np.ndarray, + *, + min_size_px: int = 0, + simplify: bool = False, + simplify_tolerance: float = 1.0, + one_per_component: bool = True, + name_prefix: str = "mask", +) -> list["ROI"]: + """Convert a boolean mask to polygon/multipolygon ROI(s). + + Connected True regions are traced with ``skimage.measure`` and turned into + Shapely polygons (holes preserved), then handed to :func:`_shapely_to_roi`, + so no new geometry algebra is introduced. + + Parameters + ---------- + mask: + 2-D boolean array (``(Ny, Nx)``, pixel space — matching :meth:`ROI.to_mask`). + min_size_px: + Drop connected components with fewer than this many True pixels. + simplify: + Douglas–Peucker-simplify each polygon (Shapely ``simplify``) with + *simplify_tolerance* (px). + one_per_component: + When True (default) return one ROI per connected component. When + False, return a single multipolygon ROI unioning all components. + + Returns + ------- + list[ROI] + Empty if the mask has no qualifying components. + """ + from skimage import measure as _measure + from shapely.geometry import Polygon as _Poly + from shapely.ops import unary_union as _unary_union + + m = np.asarray(mask, dtype=bool) + if m.ndim != 2 or not m.any(): + return [] + + labels = _measure.label(m, connectivity=2) + polygons = [] + for region in _measure.regionprops(labels): + if region.area < max(0, int(min_size_px)): + continue + # Work in the full-image frame so coordinates need no offset back. + component = labels == region.label + contours = _measure.find_contours(component.astype(float), 0.5) + if not contours: + continue + # Longest contour is the exterior; the rest are holes. + contours.sort(key=len, reverse=True) + rings = [[[float(c), float(r)] for r, c in contour] for contour in contours] + exterior, holes = rings[0], rings[1:] + try: + poly = _Poly(exterior, holes) + except Exception: # noqa: BLE001 — degenerate / collinear ring + continue + if not poly.is_valid: + poly = poly.buffer(0) + if simplify and not poly.is_empty: + poly = poly.simplify(float(simplify_tolerance), preserve_topology=True) + if poly.is_empty: + continue + polygons.append(poly) + + if not polygons: + return [] + + if one_per_component: + return [ + _shapely_to_roi(poly, name=f"{name_prefix}_{i + 1}") + for i, poly in enumerate(polygons) + ] + merged = _unary_union(polygons) + if merged.is_empty: + return [] + return [_shapely_to_roi(merged, name=name_prefix)] + + def invert(roi: "ROI", image_shape: tuple[int, int]) -> "ROI": """Return a new ROI representing the complement of *roi* within *image_shape*. diff --git a/probeflow/gui/__init__.py b/probeflow/gui/__init__.py index 8ab57aa..d8bc396 100644 --- a/probeflow/gui/__init__.py +++ b/probeflow/gui/__init__.py @@ -100,6 +100,7 @@ "ConvertPanel", "ConvertSidebar", "DeveloperTerminalWidget", + "EdgeDetectionDialog", "FFTViewerDialog", "ImageViewerDialog", "Navbar", @@ -189,6 +190,7 @@ def __setattr__(self, name: str, value: Any) -> None: "DEFAULT_CMAP_LABEL", "DEFAULT_CUSHION", "DeveloperTerminalWidget", + "EdgeDetectionDialog", "FFTViewerDialog", "GUI_FONT_DEFAULT", "GUI_FONT_SIZES", diff --git a/probeflow/gui/compat.py b/probeflow/gui/compat.py index 110017b..b3f4fd0 100644 --- a/probeflow/gui/compat.py +++ b/probeflow/gui/compat.py @@ -29,6 +29,7 @@ from probeflow.gui.terminal import DeveloperTerminalWidget, _DevSidebar # noqa: F401 from probeflow.gui.dialogs import ( # noqa: F401 AboutDialog, + EdgeDetectionDialog, FFTViewerDialog, PeriodicFilterDialog, SpecMappingDialog, diff --git a/probeflow/gui/dialogs/__init__.py b/probeflow/gui/dialogs/__init__.py index 0fb3e7a..417780d 100644 --- a/probeflow/gui/dialogs/__init__.py +++ b/probeflow/gui/dialogs/__init__.py @@ -10,6 +10,7 @@ _DefinitionsPanel, render_roi_reference_html, ) +from probeflow.gui.dialogs.edge_detection import EdgeDetectionDialog from probeflow.gui.dialogs.feature_finder import FeatureFinderDialog from probeflow.gui.dialogs.feature_lattice_dialog import FeatureLatticeDialog from probeflow.gui.dialogs.fft_viewer import FFTViewerDialog @@ -23,6 +24,7 @@ __all__ = [ "AboutDialog", + "EdgeDetectionDialog", "_DEFINITIONS_HTML", "_ROI_REFERENCE_HTML", "_DefinitionsDialog", diff --git a/probeflow/gui/dialogs/edge_detection.py b/probeflow/gui/dialogs/edge_detection.py new file mode 100644 index 0000000..9d77e01 --- /dev/null +++ b/probeflow/gui/dialogs/edge_detection.py @@ -0,0 +1,470 @@ +"""Advanced Edge Detection dialog (Canny + Sobel/Scharr). + +A modeless utility dialog built like :class:`STMBackgroundDialog`: a live, +non-destructive overlay preview plus explicit output actions that turn the +result into reusable analysis objects — an overlay, a new image, an active +mask, or ROI(s). The dialog never mutates the source image. +""" + +from __future__ import annotations + +import numpy as np +from PySide6.QtCore import Qt, QTimer, Signal +from PySide6.QtGui import QImage, QPixmap +from PySide6.QtWidgets import ( + QCheckBox, + QComboBox, + QDialog, + QDoubleSpinBox, + QFormLayout, + QGroupBox, + QHBoxLayout, + QLabel, + QMessageBox, + QPushButton, + QSpinBox, + QStackedWidget, + QVBoxLayout, + QWidget, +) + +from probeflow.processing.display import array_to_uint8 +from probeflow.processing.edge_detection import ( + CANNY_PRESETS, + EdgeDetectionResult, + canny_edges, + gradient_filter, +) + +_PRESET_CUSTOM = "Custom" +_OVERLAY_RGBA = (255, 59, 48, 170) # red, matching the bad-segment overlay hue + + +class EdgeDetectionDialog(QDialog): + """Preview Canny / Sobel-Scharr edge detection and emit reusable outputs.""" + + # Output signals — the viewer connects to these. + overlay_requested = Signal(object) # EdgeDetectionResult + overlay_cleared = Signal() + mask_created = Signal(object) # ImageMask + rois_created = Signal(list) # list[ROI] + image_created = Signal(object, dict) # (display_image ndarray, provenance dict) + + def __init__( + self, + image: np.ndarray, + *, + theme: dict | None = None, + pixel_size_nm: float | None = None, + active_roi_mask: np.ndarray | None = None, + source_channel: str | None = None, + parent: QWidget | None = None, + ): + super().__init__(parent) + self.setWindowTitle("Advanced Edge Detection") + self.resize(880, 680) + self.setModal(False) + + self._image = np.asarray(image, dtype=np.float64).copy() + self._theme = theme or {} + self._pixel_size_nm = pixel_size_nm + self._active_roi_mask = ( + None if active_roi_mask is None else np.asarray(active_roi_mask, dtype=bool) + ) + self._source_channel = source_channel + self._result: EdgeDetectionResult | None = None + + root = QVBoxLayout(self) + root.setSpacing(8) + + intro = QLabel( + "Detect edges without altering the image. Preview as an overlay, then " + "create a mask, ROI(s), or a new image from the result." + ) + intro.setWordWrap(True) + root.addWidget(intro) + + # Debounced live recompute — created before the panels because building + # them fires control signals that call ``_schedule``. + self._debounce = QTimer(self) + self._debounce.setSingleShot(True) + self._debounce.setInterval(120) + self._debounce.timeout.connect(self._recompute) + + # ── Method selector ──────────────────────────────────────────────── + method_row = QFormLayout() + self._method_combo = QComboBox() + self._method_combo.addItems(["Canny", "Sobel / Scharr"]) + method_row.addRow("Method:", self._method_combo) + root.addLayout(method_row) + + # ── Parameter panels (swapped by the method selector) ──────────────── + self._stack = QStackedWidget() + self._stack.addWidget(self._build_canny_panel()) + self._stack.addWidget(self._build_gradient_panel()) + root.addWidget(self._stack) + + # ── Preview ────────────────────────────────────────────────────────── + self._preview_lbl = QLabel("Preview") + self._preview_lbl.setAlignment(Qt.AlignCenter) + self._preview_lbl.setMinimumSize(360, 320) + self._preview_lbl.setStyleSheet( + f"background: {self._theme.get('sidebar_bg', '#181825')};" + ) + root.addWidget(self._preview_lbl, 1) + + self._status_lbl = QLabel("") + self._status_lbl.setWordWrap(True) + root.addWidget(self._status_lbl) + + # ── Output buttons ───────────────────────────────────────────────── + buttons = QHBoxLayout() + self._overlay_btn = QPushButton("Overlay on image") + self._image_btn = QPushButton("Open as new image") + self._mask_btn = QPushButton("Create mask") + self._roi_btn = QPushButton("Convert to ROI(s)") + close_btn = QPushButton("Close") + for b in (self._overlay_btn, self._image_btn, self._mask_btn, self._roi_btn): + buttons.addWidget(b) + buttons.addStretch() + buttons.addWidget(close_btn) + root.addLayout(buttons) + + self._overlay_btn.clicked.connect(self._emit_overlay) + self._image_btn.clicked.connect(self._emit_image) + self._mask_btn.clicked.connect(self._emit_mask) + self._roi_btn.clicked.connect(self._emit_rois) + close_btn.clicked.connect(self._on_close) + + self._method_combo.currentIndexChanged.connect(self._stack.setCurrentIndex) + self._method_combo.currentIndexChanged.connect(lambda _: self._schedule()) + + self._recompute() + + # ── Panel construction ────────────────────────────────────────────────── + + def _build_canny_panel(self) -> QWidget: + w = QWidget() + form = QFormLayout(w) + + self._canny_preset = QComboBox() + self._canny_preset.addItem(_PRESET_CUSTOM) + self._canny_preset.addItems(list(CANNY_PRESETS)) + self._canny_preset.setToolTip("Presets fill sigma and thresholds for common STM cases.") + form.addRow("Preset:", self._canny_preset) + + self._canny_sigma = QDoubleSpinBox() + self._canny_sigma.setRange(0.0, 20.0) + self._canny_sigma.setSingleStep(0.1) + self._canny_sigma.setDecimals(2) + self._canny_sigma.setValue(1.0) + self._canny_sigma.setSuffix(" px") + self._canny_sigma_lbl = QLabel("") + srow = QHBoxLayout() + srow.setContentsMargins(0, 0, 0, 0) + srow.addWidget(self._canny_sigma, 1) + srow.addWidget(self._canny_sigma_lbl) + form.addRow("Gaussian sigma:", _wrap(srow)) + + self._canny_mode = QComboBox() + self._canny_mode.addItems(["Percentile", "Absolute"]) + self._canny_mode.setToolTip( + "Percentile thresholds are robust across STM channels whose absolute " + "scale varies; absolute uses raw gradient values." + ) + form.addRow("Threshold mode:", self._canny_mode) + + self._canny_low = QDoubleSpinBox() + self._canny_low.setRange(0.0, 100.0) + self._canny_low.setValue(70.0) + self._canny_high = QDoubleSpinBox() + self._canny_high.setRange(0.0, 100.0) + self._canny_high.setValue(90.0) + form.addRow("Low threshold:", self._canny_low) + form.addRow("High threshold:", self._canny_high) + + self._canny_roi = QCheckBox("Apply within active ROI only") + self._canny_roi.setEnabled(self._active_roi_mask is not None) + if self._active_roi_mask is None: + self._canny_roi.setToolTip("Select an area ROI to enable.") + form.addRow("", self._canny_roi) + + self._canny_preset.currentTextChanged.connect(self._apply_canny_preset) + for sb in (self._canny_sigma, self._canny_low, self._canny_high): + sb.valueChanged.connect(lambda _: self._on_canny_edited()) + self._canny_mode.currentIndexChanged.connect(lambda _: self._on_mode_changed()) + self._canny_roi.toggled.connect(lambda _: self._schedule()) + self._on_mode_changed() + self._update_sigma_label() + return w + + def _build_gradient_panel(self) -> QWidget: + w = QWidget() + form = QFormLayout(w) + + self._grad_operator = QComboBox() + self._grad_operator.addItems(["Sobel", "Scharr"]) + form.addRow("Operator:", self._grad_operator) + + self._grad_output = QComboBox() + self._grad_output.addItems(["Gradient magnitude", "X gradient", "Y gradient", + "Gradient orientation"]) + form.addRow("Output:", self._grad_output) + + self._grad_normalize = QCheckBox("Normalize output") + self._grad_normalize.setChecked(True) + form.addRow("", self._grad_normalize) + + self._grad_threshold_cb = QCheckBox("Threshold magnitude to mask") + self._grad_threshold = QDoubleSpinBox() + self._grad_threshold.setRange(0.0, 100.0) + self._grad_threshold.setValue(90.0) + self._grad_threshold.setSuffix(" %") + self._grad_threshold.setEnabled(False) + trow = QHBoxLayout() + trow.setContentsMargins(0, 0, 0, 0) + trow.addWidget(self._grad_threshold_cb) + trow.addWidget(self._grad_threshold, 1) + form.addRow("Threshold:", _wrap(trow)) + + self._grad_roi = QCheckBox("Apply within active ROI only") + self._grad_roi.setEnabled(self._active_roi_mask is not None) + if self._active_roi_mask is None: + self._grad_roi.setToolTip("Select an area ROI to enable.") + form.addRow("", self._grad_roi) + + self._grad_threshold_cb.toggled.connect(self._grad_threshold.setEnabled) + self._grad_operator.currentIndexChanged.connect(lambda _: self._schedule()) + self._grad_output.currentIndexChanged.connect(lambda _: self._schedule()) + self._grad_normalize.toggled.connect(lambda _: self._schedule()) + self._grad_threshold_cb.toggled.connect(lambda _: self._schedule()) + self._grad_threshold.valueChanged.connect(lambda _: self._schedule()) + self._grad_roi.toggled.connect(lambda _: self._schedule()) + + # ── ROI-conversion options (shared by the Convert button) ───────────── + roi_box = QGroupBox("ROI conversion") + roi_form = QFormLayout(roi_box) + self._roi_min_size = QSpinBox() + self._roi_min_size.setRange(0, 100000) + self._roi_min_size.setValue(20) + self._roi_min_size.setSuffix(" px") + roi_form.addRow("Min component size:", self._roi_min_size) + self._roi_simplify = QCheckBox("Simplify polylines") + roi_form.addRow("", self._roi_simplify) + self._roi_one_per = QCheckBox("One ROI per component") + self._roi_one_per.setChecked(True) + roi_form.addRow("", self._roi_one_per) + form.addRow(roi_box) + return w + + # ── Preset / mode handling ──────────────────────────────────────────────── + + def _apply_canny_preset(self, name: str) -> None: + if name == _PRESET_CUSTOM or name not in CANNY_PRESETS: + return + p = CANNY_PRESETS[name] + # Avoid re-entrancy resetting the combo to Custom while we fill the spins. + with _blocked(self._canny_sigma, self._canny_low, self._canny_high): + self._canny_mode.setCurrentText("Percentile") + self._canny_sigma.setValue(p["sigma"]) + self._canny_low.setValue(p["low"]) + self._canny_high.setValue(p["high"]) + self._update_sigma_label() + self._schedule() + + def _on_canny_edited(self) -> None: + # Manual edits drop the preset back to Custom. + if self._canny_preset.currentText() != _PRESET_CUSTOM: + with _blocked(self._canny_preset): + self._canny_preset.setCurrentText(_PRESET_CUSTOM) + self._update_sigma_label() + self._schedule() + + def _on_mode_changed(self) -> None: + percentile = self._canny_mode.currentText() == "Percentile" + suffix = " %" if percentile else "" + for sb in (self._canny_low, self._canny_high): + sb.setSuffix(suffix) + sb.setRange(0.0, 100.0 if percentile else 1e9) + self._schedule() + + def _update_sigma_label(self) -> None: + if self._pixel_size_nm: + nm = self._canny_sigma.value() * self._pixel_size_nm + self._canny_sigma_lbl.setText(f"≈ {nm:.3g} nm") + else: + self._canny_sigma_lbl.setText("") + + # ── Recompute / preview ──────────────────────────────────────────────────── + + def _schedule(self) -> None: + self._debounce.start() + + def _recompute(self) -> None: + try: + self._result = self._compute() + except Exception as exc: # noqa: BLE001 — surface any backend error to the user + self._result = None + self._status_lbl.setText(f"Edge detection failed: {exc}") + return + self._render_preview(self._result) + self._status_lbl.setText(self._summarize(self._result)) + + def _compute(self) -> EdgeDetectionResult: + if self._method_combo.currentIndex() == 0: + roi = self._active_roi_mask if self._canny_roi.isChecked() else None + return canny_edges( + self._image, + sigma=float(self._canny_sigma.value()), + threshold_mode="percentile" if self._canny_mode.currentText() == "Percentile" + else "absolute", + low=float(self._canny_low.value()), + high=float(self._canny_high.value()), + roi_mask=roi, + pixel_size_nm=self._pixel_size_nm, + source_channel=self._source_channel, + ) + roi = self._active_roi_mask if self._grad_roi.isChecked() else None + output_map = { + "Gradient magnitude": "magnitude", + "X gradient": "x", + "Y gradient": "y", + "Gradient orientation": "orientation", + } + return gradient_filter( + self._image, + operator=self._grad_operator.currentText().lower(), + output=output_map[self._grad_output.currentText()], + normalize=self._grad_normalize.isChecked(), + threshold_to_mask=self._grad_threshold_cb.isChecked(), + threshold=float(self._grad_threshold.value()), + roi_mask=roi, + pixel_size_nm=self._pixel_size_nm, + source_channel=self._source_channel, + ) + + def _render_preview(self, result: EdgeDetectionResult | None) -> None: + if result is None: + return + base = array_to_uint8(self._image) # grayscale uint8 (Ny, Nx) + h, w = base.shape[:2] + rgb = np.repeat(base[:, :, None], 3, axis=2).copy() + if result.edge_mask is not None and result.edge_mask.any(): + r, g, b, a = _OVERLAY_RGBA + mask = result.edge_mask + alpha = a / 255.0 + for ch, val in zip(range(3), (r, g, b)): + rgb[..., ch][mask] = ( + (1 - alpha) * rgb[..., ch][mask] + alpha * val + ).astype(np.uint8) + elif result.display_image is not None: + # No mask (e.g. gradient/orientation) — show the response itself. + rgb = np.repeat(array_to_uint8(result.display_image)[:, :, None], 3, axis=2).copy() + + qimg = QImage(rgb.data, w, h, rgb.strides[0], QImage.Format_RGB888).copy() + pix = QPixmap.fromImage(qimg).scaled( + self._preview_lbl.size(), Qt.KeepAspectRatio, Qt.SmoothTransformation + ) + self._preview_lbl.setPixmap(pix) + + def _summarize(self, result: EdgeDetectionResult | None) -> str: + if result is None: + return "" + if result.edge_mask is not None: + return f"{result.method}: {int(result.edge_mask.sum())} edge pixels." + return f"{result.method}: {result.parameters.get('output', 'response')} preview." + + # ── Output actions ────────────────────────────────────────────────────────── + + def _ensure_result(self) -> EdgeDetectionResult | None: + if self._result is None: + self._recompute() + return self._result + + def _ensure_mask(self) -> np.ndarray | None: + result = self._ensure_result() + if result is None: + return None + if result.edge_mask is None or not result.edge_mask.any(): + QMessageBox.information( + self, "Advanced Edge Detection", + "No binary mask available. For Sobel/Scharr, enable " + "“Threshold magnitude to mask”.", + ) + return None + return result.edge_mask + + def _emit_overlay(self) -> None: + result = self._ensure_result() + if result is not None: + self.overlay_requested.emit(result) + + def _emit_image(self) -> None: + result = self._ensure_result() + if result is None or result.display_image is None: + return + provenance = {"op": "advanced_edge_detection", **result.parameters} + self.image_created.emit(result.display_image, provenance) + + def _emit_mask(self) -> None: + from probeflow.core.mask import ImageMask + mask = self._ensure_mask() + if mask is None: + return + result = self._result + image_mask = ImageMask.new(mask, method=result.method, parameters=dict(result.parameters)) + self.mask_created.emit(image_mask) + self._status_lbl.setText(f"Created mask “{image_mask.name}”.") + + def _emit_rois(self) -> None: + from probeflow.core.roi import roi_from_mask + mask = self._ensure_mask() + if mask is None: + return + # ROI-conversion options only exist on the gradient panel; fall back to + # sensible defaults when on the Canny panel. + min_size = self._roi_min_size.value() if hasattr(self, "_roi_min_size") else 20 + simplify = self._roi_simplify.isChecked() if hasattr(self, "_roi_simplify") else False + one_per = self._roi_one_per.isChecked() if hasattr(self, "_roi_one_per") else True + rois = roi_from_mask( + mask, min_size_px=int(min_size), simplify=bool(simplify), + one_per_component=bool(one_per), + name_prefix=self._result.method if self._result else "mask", + ) + if not rois: + QMessageBox.information( + self, "Advanced Edge Detection", + "No closed regions found to convert. Try a higher threshold or " + "morphological cleanup (fill holes) first.", + ) + return + self.rois_created.emit(rois) + self._status_lbl.setText(f"Created {len(rois)} ROI(s) from the mask.") + + def _on_close(self) -> None: + self.overlay_cleared.emit() + self.close() + + +# ── small helpers ────────────────────────────────────────────────────────────── + +def _wrap(layout) -> QWidget: + w = QWidget() + w.setLayout(layout) + return w + + +class _blocked: + """Context manager: block Qt signals on the given widgets for its body.""" + + def __init__(self, *widgets): + self._widgets = widgets + + def __enter__(self): + self._prev = [w.blockSignals(True) for w in self._widgets] + return self + + def __exit__(self, *exc): + for w, prev in zip(self._widgets, self._prev): + w.blockSignals(prev) + return False diff --git a/probeflow/gui/dialogs/image_viewer.py b/probeflow/gui/dialogs/image_viewer.py index 3d07dcd..edcdb5f 100644 --- a/probeflow/gui/dialogs/image_viewer.py +++ b/probeflow/gui/dialogs/image_viewer.py @@ -57,6 +57,7 @@ from probeflow.gui.viewer.image_viewer_processing_export_mixin import ( ImageViewerProcessingExportMixin, ) +from probeflow.gui.viewer.image_viewer_mask_mixin import ImageViewerMaskMixin from probeflow.gui.viewer.image_viewer_roi_mixin import ImageViewerRoiMixin from probeflow.gui.viewer.image_viewer_toolbar_mixin import ImageViewerToolbarMixin from probeflow.gui.viewer.image_viewer_tools_mixin import ImageViewerToolsMixin @@ -69,6 +70,7 @@ class ImageViewerDialog( ImageViewerBuildMixin, ImageViewerChromeMixin, + ImageViewerMaskMixin, ImageViewerRoiMixin, ImageViewerToolbarMixin, ImageViewerDisplayMixin, @@ -264,6 +266,7 @@ def _load_current_source(self, entry: SxmFile, reset_zoom: bool = True): self._zoom_lbl.setPixmap(QPixmap()) self._zoom_lbl.set_markers([]) self._load_image_roi_set(entry) + self._load_image_mask_set(entry) if self._pending_initial_plane_idx is not None: target_ch = self._pending_initial_plane_idx self._pending_initial_plane_idx = None diff --git a/probeflow/gui/dialogs/image_viewer_build_mixin.py b/probeflow/gui/dialogs/image_viewer_build_mixin.py index ee2e273..8fd3f9b 100644 --- a/probeflow/gui/dialogs/image_viewer_build_mixin.py +++ b/probeflow/gui/dialogs/image_viewer_build_mixin.py @@ -14,6 +14,7 @@ from probeflow.gui.models import PLANE_NAMES from probeflow.gui.processing import ProcessingControlPanel from probeflow.gui.rendering import CMAP_NAMES, DEFAULT_CMAP_LABEL, STM_COLORMAPS +from probeflow.gui.mask_manager import MaskManagerPanel from probeflow.gui.roi_manager_dock import ROIManagerPanel from probeflow.gui.styling import _sep from probeflow.gui.typography import ui_font @@ -315,6 +316,10 @@ def _sidebar_tab(key: str, label: str, tip: str = "") -> tuple[QWidget, QVBoxLay "roi", "ROI", "Create, edit and combine regions of interest.", ) + _masks_tab, masks_lay = _sidebar_tab( + "masks", "Masks", + "Active mask layer: edge-detection output, cleanup, and conversion to ROIs.", + ) _measurements_tab, measurements_lay = _sidebar_tab( "measurements", "Measure", "Distances, angles, ROI statistics, features and results.", @@ -417,6 +422,8 @@ def _spin_row(label: str, mn: float, mx: float, init: float, self._on_open_stm_background) self._processing_panel.simple_background_requested.connect( self._on_simple_background) + self._processing_panel.advanced_edge_requested.connect( + self._on_open_advanced_edge) self._processing_panel._align_combo.currentIndexChanged.connect( self._on_align_rows_changed) processing_lay.addWidget(self._processing_panel) @@ -847,6 +854,8 @@ def _summary_row(row: int, name: str, attr: str, *, elide: bool = False) -> QLab self._image_roi_set = None self._copy_roi_buffer = None # ROI object held for Ctrl+V paste + self._image_mask_set = None # active-mask layer (MaskSet); loaded per image + self._edge_detection_dialog = None self._measurement_panel = ImageMeasurementsPanel(parent=self) self._measurement_table = self._measurement_panel.table @@ -913,6 +922,27 @@ def _summary_row(row: int, name: str, attr: str, *, elide: bool = False) -> QLab ) self._roi_panel.setObjectName("imageViewerRoiManagerPanel") + self._mask_panel = MaskManagerPanel( + mask_set_getter=lambda: self._image_mask_set, + callbacks={ + "on_mask_set_changed": self._on_image_mask_set_changed, + "convert_to_roi": self._convert_mask_to_rois, + "add_mask_stats": self._add_active_mask_stats, + "export_mask": self._export_mask_to_file, + }, + parent=self, + ) + self._mask_panel.setObjectName("imageViewerMaskManagerPanel") + + mask_hint = QLabel( + "Masks come from Advanced Edge Detection (Process tab). The active " + "mask (●) restricts statistics and can become ROI(s)." + ) + mask_hint.setWordWrap(True) + mask_hint.setFont(ui_font(8)) + masks_lay.addWidget(mask_hint) + masks_lay.addWidget(self._mask_panel, 1) + # ROI manager and measurements now live in their sidebar tabs (built # above) rather than in separate floating docks. roi_lay.addWidget(self._roi_panel, 1) diff --git a/probeflow/gui/image_canvas.py b/probeflow/gui/image_canvas.py index d4df8b2..aef91f5 100644 --- a/probeflow/gui/image_canvas.py +++ b/probeflow/gui/image_canvas.py @@ -22,7 +22,7 @@ from probeflow.gui.typography import ui_font from PySide6.QtCore import QPoint, QPointF, QRectF, Qt, Signal from PySide6.QtGui import ( - QBrush, QColor, QFont, QKeySequence, QPainter, QPainterPath, QPen, + QBrush, QColor, QFont, QImage, QKeySequence, QPainter, QPainterPath, QPen, QPixmap, QTransform, ) from PySide6.QtWidgets import ( @@ -172,6 +172,7 @@ def __init__(self, parent=None): self._bad_segment_items: list[QGraphicsRectItem] = [] self._feature_points: list[object] = [] self._feature_point_items: list[QGraphicsEllipseItem] = [] + self._mask_overlay_item: Optional[QGraphicsPixmapItem] = None self._text_overlay_item = QGraphicsTextItem() self._text_overlay_item.setDefaultTextColor(QColor("#cdd6f4")) @@ -423,6 +424,46 @@ def clear_bad_segment_overlay(self) -> None: self.scene().removeItem(item) self._bad_segment_items.clear() + # ── active-mask overlay ─────────────────────────────────────────────────── + + def set_mask_overlay( + self, + mask, + *, + color: tuple[int, int, int] = (255, 59, 48), + alpha: int = 110, + ) -> None: + """Show a non-destructive semi-transparent overlay of a boolean *mask*. + + Rendered as a single pixmap item in image-pixel coordinates (1 px = 1 + scene unit, like the underlying image), at a Z below the ROI handles so + it never blocks interaction. Follows the ``set_bad_segment_overlay`` + precedent. + """ + self.clear_mask_overlay() + if mask is None: + return + m = np.asarray(mask, dtype=bool) + if m.ndim != 2 or not m.any(): + return + h, w = m.shape + rgba = np.zeros((h, w, 4), dtype=np.uint8) + rgba[m, 0] = color[0] + rgba[m, 1] = color[1] + rgba[m, 2] = color[2] + rgba[m, 3] = int(np.clip(alpha, 0, 255)) + qimg = QImage(rgba.data, w, h, rgba.strides[0], QImage.Format_RGBA8888).copy() + item = QGraphicsPixmapItem(QPixmap.fromImage(qimg)) + item.setZValue(20) # above the image, below ROI shapes/handles + item.setAcceptedMouseButtons(Qt.NoButton) + self.scene().addItem(item) + self._mask_overlay_item = item + + def clear_mask_overlay(self) -> None: + if self._mask_overlay_item is not None: + self.scene().removeItem(self._mask_overlay_item) + self._mask_overlay_item = None + # ── detected feature points ────────────────────────────────────────────── def set_feature_points(self, points) -> None: diff --git a/probeflow/gui/mask_manager.py b/probeflow/gui/mask_manager.py new file mode 100644 index 0000000..2367031 --- /dev/null +++ b/probeflow/gui/mask_manager.py @@ -0,0 +1,210 @@ +"""Compact manager widget for the active-mask layer. + +Sibling of :class:`probeflow.gui.roi_manager_dock.ROIManagerPanel`: lists the +masks owned by the viewer's :class:`~probeflow.core.mask.MaskSet`, sets the +active mask, runs morphological cleanup, and bridges to ROI conversion and +mask-restricted statistics. All edits go through the supplied callbacks so the +viewer can persist the sidecar and refresh the overlay. +""" + +from __future__ import annotations + +from typing import Callable + +from PySide6.QtCore import Qt +from PySide6.QtWidgets import ( + QAbstractItemView, + QComboBox, + QGridLayout, + QHBoxLayout, + QInputDialog, + QListWidget, + QListWidgetItem, + QPushButton, + QSizePolicy, + QVBoxLayout, + QWidget, +) + +from probeflow.processing import mask_ops + +# Cleanup ops: label → (function, kwargs). Radii/sizes in pixels. +_CLEANUP_OPS: dict[str, tuple] = { + "Remove small objects": (mask_ops.remove_small_objects, {"min_size": 16}), + "Fill holes": (mask_ops.fill_holes, {}), + "Dilate": (mask_ops.dilate, {"radius": 1}), + "Erode": (mask_ops.erode, {"radius": 1}), + "Open": (mask_ops.binary_open, {"radius": 1}), + "Close": (mask_ops.binary_close, {"radius": 1}), + "Skeletonize": (mask_ops.skeletonize, {}), + "Remove border objects": (mask_ops.remove_border_objects, {}), +} + + +class MaskManagerPanel(QWidget): + """Widget that lists masks and provides activation / cleanup / conversion.""" + + def __init__(self, mask_set_getter: Callable, callbacks: dict, parent=None): + super().__init__(parent) + self.setObjectName("maskManagerPanel") + self._mask_set_getter = mask_set_getter + self._cb = callbacks + + lay = QVBoxLayout(self) + lay.setContentsMargins(6, 6, 6, 6) + lay.setSpacing(4) + + self._active_btn = _btn("Set active", self._on_set_active) + self._rename_btn = _btn("Rename", self._on_rename) + self._delete_btn = _btn("Delete", self._on_delete) + self._invert_btn = _btn("Invert", self._on_invert) + grid = QGridLayout() + grid.setContentsMargins(0, 0, 0, 0) + grid.setHorizontalSpacing(3) + grid.setVerticalSpacing(3) + grid.addWidget(self._active_btn, 0, 0) + grid.addWidget(self._rename_btn, 0, 1) + grid.addWidget(self._delete_btn, 1, 0) + grid.addWidget(self._invert_btn, 1, 1) + lay.addLayout(grid) + + cleanup_row = QWidget() + crow = QHBoxLayout(cleanup_row) + crow.setContentsMargins(0, 0, 0, 0) + crow.setSpacing(2) + self._cleanup_combo = QComboBox() + self._cleanup_combo.addItems(list(_CLEANUP_OPS)) + self._cleanup_btn = _btn("Apply cleanup", self._on_cleanup) + crow.addWidget(self._cleanup_combo, 1) + crow.addWidget(self._cleanup_btn) + lay.addWidget(cleanup_row) + + convert_row = QWidget() + vrow = QHBoxLayout(convert_row) + vrow.setContentsMargins(0, 0, 0, 0) + vrow.setSpacing(2) + self._roi_btn = _btn("To ROI(s)", self._on_convert_roi) + self._stats_btn = _btn("Statistics", self._on_stats) + self._export_btn = _btn("Export…", self._on_export) + vrow.addWidget(self._roi_btn) + vrow.addWidget(self._stats_btn) + vrow.addWidget(self._export_btn) + lay.addWidget(convert_row) + + self._list = QListWidget() + self._list.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding) + self._list.setSelectionMode(QAbstractItemView.SingleSelection) + self._list.itemDoubleClicked.connect(lambda _: self._on_set_active()) + self._list.itemSelectionChanged.connect(self._sync_buttons) + lay.addWidget(self._list) + + self.refresh(self._mask_set_getter()) + + # ── public ─────────────────────────────────────────────────────────────── + + def refresh(self, mask_set) -> None: + self._list.blockSignals(True) + self._list.clear() + if mask_set is not None: + for mask in mask_set.masks: + active = mask.id == mask_set.active_mask_id + label = ("● " if active else "○ ") + f"{mask.name} ({mask.count()} px)" + item = QListWidgetItem(label) + item.setData(Qt.UserRole, mask.id) + self._list.addItem(item) + self._list.blockSignals(False) + self._sync_buttons() + + # ── selection helpers ────────────────────────────────────────────────────── + + def _selected_id(self) -> "str | None": + items = self._list.selectedItems() + return items[0].data(Qt.UserRole) if items else None + + def _sync_buttons(self) -> None: + has = self._selected_id() is not None + for b in (self._active_btn, self._rename_btn, self._delete_btn, + self._invert_btn, self._cleanup_btn, self._roi_btn, + self._stats_btn, self._export_btn): + b.setEnabled(has) + + def _changed(self) -> None: + self._cb.get("on_mask_set_changed", lambda: None)() + self.refresh(self._mask_set_getter()) + + # ── action slots ──────────────────────────────────────────────────────────── + + def _on_set_active(self) -> None: + mask_id = self._selected_id() + mask_set = self._mask_set_getter() + if mask_id is None or mask_set is None: + return + mask_set.set_active(mask_id) + self._changed() + + def _on_rename(self) -> None: + mask_id = self._selected_id() + mask_set = self._mask_set_getter() + if mask_id is None or mask_set is None: + return + mask = mask_set.get(mask_id) + if mask is None: + return + new_name, ok = QInputDialog.getText(self, "Rename mask", "New name:", text=mask.name) + if ok and new_name.strip(): + mask.name = new_name.strip() + self._changed() + + def _on_delete(self) -> None: + mask_id = self._selected_id() + mask_set = self._mask_set_getter() + if mask_id is None or mask_set is None: + return + mask_set.remove(mask_id) + self._changed() + + def _on_invert(self) -> None: + self._apply_op(mask_ops.invert, {}) + + def _on_cleanup(self) -> None: + fn, kwargs = _CLEANUP_OPS[self._cleanup_combo.currentText()] + self._apply_op(fn, kwargs) + + def _apply_op(self, fn, kwargs) -> None: + mask_id = self._selected_id() + mask_set = self._mask_set_getter() + if mask_id is None or mask_set is None: + return + mask = mask_set.get(mask_id) + if mask is None: + return + mask_set.replace(mask_id, fn(mask.data, **kwargs)) + self._changed() + + def _on_convert_roi(self) -> None: + mask_id = self._selected_id() + if mask_id is not None: + self._cb.get("convert_to_roi", lambda _id: None)(mask_id) + + def _on_stats(self) -> None: + mask_id = self._selected_id() + mask_set = self._mask_set_getter() + if mask_id is None or mask_set is None: + return + # Statistics run over the active mask, so activate the selection first. + mask_set.set_active(mask_id) + self._changed() + self._cb.get("add_mask_stats", lambda: None)() + + def _on_export(self) -> None: + mask_id = self._selected_id() + if mask_id is not None: + self._cb.get("export_mask", lambda _id: None)(mask_id) + + +def _btn(text: str, slot) -> QPushButton: + b = QPushButton(text) + b.setFixedHeight(26) + b.setEnabled(False) + b.clicked.connect(slot) + return b diff --git a/probeflow/gui/processing.py b/probeflow/gui/processing.py index 41862c1..dffe215 100644 --- a/probeflow/gui/processing.py +++ b/probeflow/gui/processing.py @@ -17,6 +17,7 @@ class ProcessingControlPanel(QWidget): bad_line_preview_settings_changed = Signal() stm_background_requested = Signal() simple_background_requested = Signal() + advanced_edge_requested = Signal() QUICK_KEYS = ("align_rows", "remove_bad_lines") @@ -271,6 +272,15 @@ def _col_lbl(text: str, target): self._edge_combo.currentIndexChanged.connect( lambda i: self._edge_sigma_w.setVisible(i != 0)) + self._advanced_edge_btn = QPushButton("Advanced Edge Detection...") + self._advanced_edge_btn.setFont(ui_font(8)) + self._advanced_edge_btn.setToolTip( + "Open the Canny / Sobel-Scharr edge detector with a live overlay preview. " + "Outputs can become a mask, ROI(s), or a new image." + ) + self._advanced_edge_btn.clicked.connect(self.advanced_edge_requested.emit) + L.addWidget(self._advanced_edge_btn) + L.addStretch() lay.addWidget(self._filter_section) diff --git a/probeflow/gui/viewer/__init__.py b/probeflow/gui/viewer/__init__.py index d6cb123..ad39746 100644 --- a/probeflow/gui/viewer/__init__.py +++ b/probeflow/gui/viewer/__init__.py @@ -35,6 +35,8 @@ "invert_active_roi": "roi_ops", "invert_roi": "roi_ops", "load_roi_set": "roi_sidecar", + "load_mask_set": "mask_sidecar", + "save_mask_set": "mask_sidecar", "plot_roi_line_profile": "roi_analysis", "rename_roi": "roi_ops", "resolve_channel_unit": "channel_util", diff --git a/probeflow/gui/viewer/image_measurements.py b/probeflow/gui/viewer/image_measurements.py index 2849f65..3db6d14 100644 --- a/probeflow/gui/viewer/image_measurements.py +++ b/probeflow/gui/viewer/image_measurements.py @@ -247,6 +247,45 @@ def add_roi_stats_measurement(self, roi_id: str) -> None: except Exception as exc: self._set_status(f"Could not add ROI statistics: {exc}") + def add_active_mask_stats_measurement(self) -> None: + """Add a statistics measurement restricted to the active mask.""" + mask = self._active_mask_array() + arr = self._display_arr() + if mask is None: + self._set_status("No active mask. Create one in Advanced Edge Detection.") + return + if arr is None: + return + if mask.shape != arr.shape[:2]: + self._set_status("Active mask shape no longer matches the image.") + return + try: + entry, scale, unit, channel, source_label = self._source_info() + px_x_nm, px_y_nm = self._pixel_size_nm() + mask_obj = self._viewer._image_mask_set.active() + result = roi_statistics( + arr * scale, + measurement_id=self._table.next_measurement_id(), + source_label=source_label, + source_path=str(entry.path), + channel=channel, + mask=mask, + pixel_size_x=px_x_nm, + pixel_size_y=px_y_nm, + x_unit="nm", + y_unit="nm", + height_unit=unit or None, + notes=f"Mask statistics for {mask_obj.name if mask_obj else 'active mask'}", + ) + self._record(result) + except Exception as exc: + self._set_status(f"Could not add mask statistics: {exc}") + + def _active_mask_array(self): + """Boolean array of the viewer's active mask, or None.""" + getter = getattr(self._viewer, "_active_mask_array", None) + return getter() if callable(getter) else None + def add_selected_step_height_measurement(self) -> None: self.add_step_height_measurement_for_rois(self.selected_roi_ids()) diff --git a/probeflow/gui/viewer/image_viewer_mask_mixin.py b/probeflow/gui/viewer/image_viewer_mask_mixin.py new file mode 100644 index 0000000..8569999 --- /dev/null +++ b/probeflow/gui/viewer/image_viewer_mask_mixin.py @@ -0,0 +1,202 @@ +"""Active-mask layer orchestration for ImageViewerDialog. + +Mirrors :class:`probeflow.gui.viewer.image_viewer_roi_mixin.ImageViewerRoiMixin`: +the viewer owns a :class:`~probeflow.core.mask.MaskSet` per image, persisted to a +``.masks.json`` sidecar, with one active mask shown as a canvas overlay +and consumed by downstream statistics / background exclusion. +""" + +from __future__ import annotations + +import numpy as np + +from probeflow.gui.roi_context import area_roi_mask +from probeflow.gui.viewer import load_mask_set, save_mask_set + + +class ImageViewerMaskMixin: + # ── Image-level mask set ────────────────────────────────────────────────── + + def _load_image_mask_set(self, entry) -> None: + """Load masks from the ``.masks.json`` sidecar, else an empty set.""" + self._image_mask_set, _err = load_mask_set(entry.path) + self._refresh_mask_overlay() + if hasattr(self, "_mask_panel"): + self._mask_panel.refresh(self._image_mask_set) + + def _save_image_mask_set(self) -> None: + if self._image_mask_set is None: + return + entry = self._entries[self._idx] + err = save_mask_set(self._image_mask_set, entry.path) + if err and hasattr(self, "_status_lbl"): + self._status_lbl.setText(err) + + def _on_image_mask_set_changed(self) -> None: + """Persist, refresh the overlay/manager, and re-run mask-aware display.""" + self._save_image_mask_set() + self._refresh_mask_overlay() + if hasattr(self, "_mask_panel"): + self._mask_panel.refresh(self._image_mask_set) + # The active mask restricts statistics, so refresh the readout. + if hasattr(self, "_refresh_measurements"): + try: + self._refresh_measurements() + except Exception: + pass + + # ── Active mask access (consumed by stats / background) ──────────────────── + + def _active_mask_array(self) -> "np.ndarray | None": + """Return the active mask's boolean array, or None. + + Returns None when the mask shape no longer matches the displayed array + (e.g. after a shape-changing processing step) so callers fall back to + the whole image rather than crashing. + """ + ms = getattr(self, "_image_mask_set", None) + if ms is None: + return None + mask = ms.active() + if mask is None: + return None + arr = self._display_arr if self._display_arr is not None else self._raw_arr + if arr is not None and mask.shape != arr.shape[:2]: + return None + return mask.data + + def _refresh_mask_overlay(self) -> None: + """Show the active mask as a canvas overlay, or clear it.""" + if not hasattr(self, "_zoom_lbl"): + return + data = self._active_mask_array() + if data is None or not data.any(): + self._zoom_lbl.clear_mask_overlay() + else: + self._zoom_lbl.set_mask_overlay(data) + + # ── Advanced Edge Detection dialog ───────────────────────────────────────── + + def _on_open_advanced_edge(self) -> None: + from probeflow.gui.dialogs.edge_detection import EdgeDetectionDialog + + arr = self._display_arr if self._display_arr is not None else self._raw_arr + if arr is None: + self._status_lbl.setText("Advanced Edge Detection: no image loaded.") + return + roi_mask = area_roi_mask(self._active_image_roi(), arr.shape[:2]) + px_x_m, _ = self._processing_pixel_sizes_m() + pixel_size_nm = px_x_m * 1e9 if px_x_m else None + + dlg = EdgeDetectionDialog( + arr, + theme=self._t, + pixel_size_nm=pixel_size_nm, + active_roi_mask=roi_mask, + source_channel=self._channel_name() if hasattr(self, "_channel_name") else None, + parent=self, + ) + dlg.overlay_requested.connect(self._on_edge_overlay_requested) + dlg.overlay_cleared.connect(self._refresh_mask_overlay) + dlg.mask_created.connect(self._on_edge_mask_created) + dlg.rois_created.connect(self._on_edge_rois_created) + dlg.image_created.connect(self._on_edge_image_created) + self._edge_detection_dialog = dlg + self._present_modal_tool(dlg) + + # ── Dialog output handlers ────────────────────────────────────────────────── + + def _on_edge_overlay_requested(self, result) -> None: + mask = getattr(result, "edge_mask", None) + if mask is None or not np.asarray(mask).any(): + self._status_lbl.setText( + "Overlay: no binary edges to show (enable thresholding for Sobel/Scharr)." + ) + return + # Transient preview overlay (cyan) distinct from the active mask (red). + self._zoom_lbl.set_mask_overlay(mask, color=(0, 229, 255), alpha=120) + + def _on_edge_mask_created(self, image_mask) -> None: + from probeflow.core.mask import MaskSet + if self._image_mask_set is None: + entry = self._entries[self._idx] + self._image_mask_set = MaskSet(image_id=str(entry.path)) + self._image_mask_set.add(image_mask) + self._image_mask_set.set_active(image_mask.id) + self._on_image_mask_set_changed() + self._status_lbl.setText(f"Created active mask “{image_mask.name}”.") + + def _on_edge_rois_created(self, rois) -> None: + if self._image_roi_set is None or not rois: + return + for roi in rois: + self._image_roi_set.add(roi) + self._image_roi_set.set_active(rois[-1].id) + self._on_image_roi_set_changed() + self._status_lbl.setText(f"Added {len(rois)} ROI(s) from edge mask.") + + # ── Masks-manager callbacks ──────────────────────────────────────────────── + + def _convert_mask_to_rois(self, mask_id: str) -> None: + from probeflow.core.roi import roi_from_mask + + ms = getattr(self, "_image_mask_set", None) + mask = ms.get(mask_id) if ms is not None else None + if mask is None: + return + rois = roi_from_mask(mask.data, min_size_px=0, name_prefix=mask.name) + if not rois: + self._status_lbl.setText("No closed regions in mask to convert to ROI(s).") + return + self._on_edge_rois_created(rois) + + def _add_active_mask_stats(self) -> None: + if hasattr(self, "_image_measurements"): + self._image_measurements.add_active_mask_stats_measurement() + + def _export_mask_to_file(self, mask_id: str) -> None: + from pathlib import Path + + from PySide6.QtWidgets import QFileDialog + + from probeflow.io.mask_sidecar import save_mask_set_sidecar + from probeflow.core.mask import MaskSet + + ms = getattr(self, "_image_mask_set", None) + mask = ms.get(mask_id) if ms is not None else None + if mask is None: + return + default = f"{Path(self._entries[self._idx].path).stem}.{mask.name}.masks.json" + path, _ = QFileDialog.getSaveFileName( + self, "Export mask", default, "Mask JSON (*.masks.json *.json)" + ) + if not path: + return + single = MaskSet(image_id=ms.image_id) + single.add(mask) + single.set_active(mask.id) + try: + save_mask_set_sidecar(single, path, sidecar=path) + self._status_lbl.setText(f"Exported mask to {path}.") + except Exception as exc: + self._status_lbl.setText(f"Could not export mask: {exc}") + + def _on_edge_image_created(self, arr, provenance) -> None: + try: + from probeflow.gui.dialogs.array_image import ArrayImageDialog + scan_range = self._display_scan_range_m or self._scan_range_m or (1e-9, 1e-9) + dlg = ArrayImageDialog( + np.asarray(arr, dtype=float), + scan_range_m=tuple(scan_range), + title="Edge detection result", + colormap=getattr(self, "_viewer_colormap", self._colormap), + theme=self._t, + provenance=provenance, + parent=self, + ) + except Exception as exc: + self._status_lbl.setText(f"Could not open edge result image: {exc}") + return + self._track_modeless_child(dlg) + dlg.show() + self._status_lbl.setText("Opened edge detection result image.") diff --git a/probeflow/gui/viewer/mask_sidecar.py b/probeflow/gui/viewer/mask_sidecar.py new file mode 100644 index 0000000..958ff1b --- /dev/null +++ b/probeflow/gui/viewer/mask_sidecar.py @@ -0,0 +1,40 @@ +"""Mask sidecar load/save helpers for ImageViewerDialog. + +Mirror of :mod:`probeflow.gui.viewer.roi_sidecar` for the active-mask layer. +""" + +from __future__ import annotations + +from pathlib import Path + + +def load_mask_set(image_path: Path | str): + """Load a MaskSet from the sidecar next to *image_path*, or return an empty one. + + Never raises — file errors are swallowed and an empty MaskSet is returned + so callers do not need to handle the missing-file case. + """ + from probeflow.core.mask import MaskSet + from probeflow.io.mask_sidecar import load_mask_set_sidecar + + image_path = Path(image_path) + try: + loaded, _sidecar = load_mask_set_sidecar(image_path, missing_ok=True) + except Exception: + return MaskSet(image_id=str(image_path)), None + + return (loaded or MaskSet(image_id=str(image_path))), None + + +def save_mask_set(mask_set, image_path: Path | str) -> str | None: + """Persist *mask_set* to its sidecar file next to *image_path*. + + Returns an error message string on failure, or ``None`` on success. + """ + from probeflow.io.mask_sidecar import save_mask_set_sidecar + + try: + save_mask_set_sidecar(mask_set, Path(image_path)) + return None + except Exception as exc: + return f"Could not save mask sidecar: {exc}" diff --git a/probeflow/io/mask_sidecar.py b/probeflow/io/mask_sidecar.py new file mode 100644 index 0000000..51469ea --- /dev/null +++ b/probeflow/io/mask_sidecar.py @@ -0,0 +1,106 @@ +"""Mask sidecar loading and saving helpers. + +The mask model lives in :mod:`probeflow.core.mask`; this module owns the small +JSON sidecar convention, mirroring :mod:`probeflow.io.roi_sidecar`. Masks are +a new on-disk format, so there are no legacy-path fallbacks to carry. +""" + +from __future__ import annotations + +import json +from pathlib import Path +from typing import Any + +from probeflow.core.mask import MaskSet + + +def default_mask_sidecar_path(scan_path: str | Path) -> Path: + """Return the canonical mask sidecar path: ``{stem}.masks.json``. + + Uses ``f"{path.stem}.masks.json"`` (strips only the last extension) so + dotted Createc filenames like ``A250320.191933.dat`` get a per-scan sidecar + rather than a date-prefix-collapsed one — the same convention as + :func:`probeflow.io.roi_sidecar.default_roi_sidecar_path`. + """ + path = Path(scan_path) + return path.parent / f"{path.stem}.masks.json" + + +def _mask_set_payload(data: dict[str, Any]) -> dict[str, Any] | None: + """Extract a MaskSet dict from a sidecar payload.""" + if not isinstance(data, dict): + return None + if isinstance(data.get("masks"), dict): + return data["masks"] + if data.get("image_id") is not None and isinstance(data.get("masks"), list): + return data + return None + + +def load_mask_set_sidecar( + scan_path: str | Path, + *, + sidecar: str | Path | None = None, + missing_ok: bool = False, +) -> tuple[MaskSet | None, Path]: + """Load a MaskSet from *sidecar* or the default ``.masks.json`` path. + + Returns ``(mask_set, path_used)``. If no sidecar exists and ``missing_ok`` + is true, ``mask_set`` is ``None`` and ``path_used`` is the canonical path. + """ + scan = Path(scan_path) + chosen = Path(sidecar) if sidecar is not None else default_mask_sidecar_path(scan) + + if not chosen.exists(): + if missing_ok: + return None, chosen + raise FileNotFoundError(f"No mask sidecar found for {scan} (tried {chosen})") + + try: + data = json.loads(chosen.read_text(encoding="utf-8")) + except Exception as exc: + raise ValueError(f"Could not read mask sidecar {chosen}: {exc}") from exc + + payload = _mask_set_payload(data) + if payload is None: + raise ValueError(f"Sidecar {chosen} contains no MaskSet data") + + try: + return MaskSet.from_dict(payload), chosen + except Exception as exc: + raise ValueError(f"Could not deserialise masks from {chosen}: {exc}") from exc + + +def save_mask_set_sidecar( + mask_set: MaskSet, + scan_path: str | Path, + *, + sidecar: str | Path | None = None, +) -> Path: + """Write *mask_set* to the canonical ``.masks.json`` sidecar. + + Uses a write-to-temp-then-rename strategy so a crash or full disk during + the write never leaves a partially-written (corrupt) sidecar — mirroring + :func:`probeflow.io.roi_sidecar.save_roi_set_sidecar`. + """ + import tempfile + target = ( + Path(sidecar) + if sidecar is not None + else default_mask_sidecar_path(scan_path) + ) + payload = json.dumps(mask_set.to_dict(), indent=2) + tmp_fd, tmp_path = tempfile.mkstemp( + dir=target.parent, prefix=target.name + ".tmp", suffix=".json" + ) + try: + with open(tmp_fd, "w", encoding="utf-8") as fh: + fh.write(payload) + Path(tmp_path).replace(target) + except Exception: + try: + Path(tmp_path).unlink(missing_ok=True) + except OSError: + pass + raise + return target diff --git a/probeflow/processing/edge_detection.py b/probeflow/processing/edge_detection.py new file mode 100644 index 0000000..81d65e8 --- /dev/null +++ b/probeflow/processing/edge_detection.py @@ -0,0 +1,299 @@ +"""Advanced edge detection: Canny and Sobel/Scharr gradient filters. + +These detectors are *non-destructive analysis* operations: they return an +:class:`EdgeDetectionResult` carrying any of a display image, a boolean edge +mask, a gradient magnitude, and a gradient orientation, plus the parameters +used (for provenance and auto-naming). Unlike :func:`probeflow.processing. +filters.edge_detect` (a history-replayable display filter), these functions are +the backend for the Advanced Edge Detection dialog and the active-mask layer. + +NaN handling follows the same convention as ``edge_detect``: non-finite pixels +are filled with the finite mean before filtering, and NaN is restored in the +returned ``display_image`` / excluded from any mask. +""" + +from __future__ import annotations + +from dataclasses import dataclass, field + +import numpy as np + +from ._image_utils import _finite_mean, _nonnegative_finite + +__all__ = [ + "EdgeDetectionResult", + "CANNY_PRESETS", + "canny_edges", + "gradient_filter", +] + + +# ── Result model ────────────────────────────────────────────────────────────── + +@dataclass(frozen=True) +class EdgeDetectionResult: + """Outputs of one advanced edge-detection operation. + + An edge detector can yield more than one useful product, so each field is + optional and populated only when meaningful for the method: + + * ``display_image`` — array suitable for direct display (NaN-preserving). + * ``edge_mask`` — boolean array suitable for the active-mask layer. + * ``gradient_magnitude`` — continuous gradient response (Sobel/Scharr). + * ``gradient_orientation``— gradient direction in radians, ``arctan2(gy, gx)``. + """ + + method: str + source_channel: str | None = None + display_image: np.ndarray | None = None + edge_mask: np.ndarray | None = None + gradient_magnitude: np.ndarray | None = None + gradient_orientation: np.ndarray | None = None + parameters: dict[str, object] = field(default_factory=dict) + pixel_size_nm: float | None = None + + +# ── Canny presets (sigma in px; low/high as percentiles) ──────────────────────── + +CANNY_PRESETS: dict[str, dict[str, float]] = { + "Fine atomic/defect edges": {"sigma": 0.8, "low": 70.0, "high": 90.0}, + "Step edges / islands": {"sigma": 2.0, "low": 60.0, "high": 85.0}, + "Noisy scan": {"sigma": 3.0, "low": 50.0, "high": 80.0}, + "Strict edges only": {"sigma": 1.5, "low": 80.0, "high": 95.0}, +} + + +# ── Shared NaN-handling helper ────────────────────────────────────────────────── + +def _prepare(arr: np.ndarray) -> tuple[np.ndarray, np.ndarray]: + """Return ``(filled, nan_mask)``: a finite float64 copy and the NaN mask. + + Non-finite pixels are replaced with the finite mean so spatial filters do + not propagate NaN; the mask lets callers restore NaN afterwards. + """ + a = np.asarray(arr, dtype=np.float64) + nan_mask = ~np.isfinite(a) + if nan_mask.any(): + a = a.copy() + a[nan_mask] = _finite_mean(a) + return a, nan_mask + + +def _coerce_roi_mask(roi_mask: np.ndarray | None, shape: tuple[int, ...]) -> np.ndarray | None: + if roi_mask is None: + return None + m = np.asarray(roi_mask, dtype=bool) + if m.shape != shape: + raise ValueError(f"roi_mask shape {m.shape} != image shape {shape}") + return m + + +# ── Canny ─────────────────────────────────────────────────────────────────────── + +def canny_edges( + arr: np.ndarray, + *, + sigma: float = 1.0, + threshold_mode: str = "percentile", + low: float = 70.0, + high: float = 90.0, + roi_mask: np.ndarray | None = None, + preset: str | None = None, + pixel_size_nm: float | None = None, + source_channel: str | None = None, +) -> EdgeDetectionResult: + """Detect edges with the Canny algorithm (``skimage.feature.canny``). + + Parameters + ---------- + sigma: + Gaussian smoothing width in pixels. + threshold_mode: + ``"percentile"`` (default) interprets *low*/*high* as percentiles + (0–100) of the gradient magnitude — robust across STM channels whose + absolute scale varies. ``"absolute"`` uses *low*/*high* directly. + low, high: + Hysteresis thresholds. Percentiles in percentile mode, raw gradient + values in absolute mode. + roi_mask: + Optional boolean mask; edges are only detected where it is True. + preset: + Name in :data:`CANNY_PRESETS`; when given it overrides *sigma*/*low*/ + *high*. + """ + from skimage.feature import canny as _canny + + if preset is not None: + if preset not in CANNY_PRESETS: + raise ValueError(f"Unknown Canny preset {preset!r}. Choices: {sorted(CANNY_PRESETS)}") + p = CANNY_PRESETS[preset] + sigma, low, high = p["sigma"], p["low"], p["high"] + + sigma = _nonnegative_finite(sigma, "sigma") + if threshold_mode not in ("percentile", "absolute"): + raise ValueError(f"Unknown threshold_mode {threshold_mode!r}") + if low > high: + low, high = high, low + + a, nan_mask = _prepare(arr) + roi = _coerce_roi_mask(roi_mask, a.shape) + + # Restrict Canny to finite pixels (and the ROI, if given). + valid = ~nan_mask + canny_mask = valid if roi is None else (valid & roi) + + use_quantiles = threshold_mode == "percentile" + if use_quantiles: + low_t, high_t = low / 100.0, high / 100.0 + else: + low_t, high_t = float(low), float(high) + + if nan_mask.all() or not canny_mask.any(): + edge = np.zeros(a.shape, dtype=bool) + else: + edge = _canny( + a, + sigma=max(sigma, 1e-3), + low_threshold=low_t, + high_threshold=high_t, + mask=canny_mask, + use_quantiles=use_quantiles, + ) + + display = edge.astype(np.float64) + if nan_mask.any(): + display[nan_mask] = np.nan + + params: dict[str, object] = { + "method": "canny", + "sigma": float(sigma), + "threshold_mode": threshold_mode, + "low": float(low), + "high": float(high), + "roi_restricted": roi is not None, + "preset": preset, + } + if pixel_size_nm is not None: + params["sigma_nm"] = float(sigma) * float(pixel_size_nm) + + return EdgeDetectionResult( + method="canny", + source_channel=source_channel, + display_image=display, + edge_mask=edge, + parameters=params, + pixel_size_nm=pixel_size_nm, + ) + + +# ── Sobel / Scharr gradient ────────────────────────────────────────────────────── + +_GRADIENT_OUTPUTS = ("magnitude", "x", "y", "orientation") + + +def gradient_filter( + arr: np.ndarray, + *, + operator: str = "sobel", + output: str = "magnitude", + normalize: bool = True, + threshold_to_mask: bool = False, + threshold: float = 90.0, + roi_mask: np.ndarray | None = None, + pixel_size_nm: float | None = None, + source_channel: str | None = None, +) -> EdgeDetectionResult: + """Compute a Sobel or Scharr gradient response. + + Parameters + ---------- + operator: + ``"sobel"`` or ``"scharr"``. + output: + ``"magnitude"`` (default), ``"x"``, ``"y"``, or ``"orientation"``. + normalize: + Scale the chosen output to [0, 1] (magnitude) or [-1, 1] (signed) by + its peak absolute value. Orientation is never rescaled. + threshold_to_mask: + When True, threshold the gradient *magnitude* (always, regardless of + *output*) at the *threshold* percentile to produce ``edge_mask``. + roi_mask: + Optional boolean mask; gradients outside it are zeroed and excluded + from any threshold mask. + """ + from skimage import filters as _skf + + if operator not in ("sobel", "scharr"): + raise ValueError(f"Unknown operator {operator!r}. Choices: 'sobel', 'scharr'") + if output not in _GRADIENT_OUTPUTS: + raise ValueError(f"Unknown output {output!r}. Choices: {_GRADIENT_OUTPUTS}") + + a, nan_mask = _prepare(arr) + roi = _coerce_roi_mask(roi_mask, a.shape) + + if operator == "sobel": + gy = _skf.sobel_h(a) # derivative along axis 0 (rows / y) + gx = _skf.sobel_v(a) # derivative along axis 1 (cols / x) + else: + gy = _skf.scharr_h(a) + gx = _skf.scharr_v(a) + + magnitude = np.hypot(gx, gy) + orientation = np.arctan2(gy, gx) + + if output == "magnitude": + chosen = magnitude + elif output == "x": + chosen = gx + elif output == "y": + chosen = gy + else: # orientation + chosen = orientation + + if roi is not None: + chosen = np.where(roi, chosen, 0.0) + magnitude = np.where(roi, magnitude, 0.0) + + if normalize and output != "orientation": + peak = float(np.nanmax(np.abs(chosen))) if chosen.size else 0.0 + if peak > 0: + chosen = chosen / peak + + display = chosen.astype(np.float64, copy=True) + if nan_mask.any(): + display[nan_mask] = np.nan + + edge_mask = None + if threshold_to_mask: + ref = magnitude.copy() + if roi is not None: + ref = ref[roi] + finite_ref = ref[np.isfinite(ref)] if ref.size else ref + if finite_ref.size: + cut = float(np.percentile(finite_ref, float(threshold))) + edge_mask = magnitude >= cut + edge_mask &= ~nan_mask + if roi is not None: + edge_mask &= roi + else: + edge_mask = np.zeros(a.shape, dtype=bool) + + params: dict[str, object] = { + "method": operator, + "output": output, + "normalize": bool(normalize), + "threshold_to_mask": bool(threshold_to_mask), + "threshold": float(threshold), + "roi_restricted": roi is not None, + } + + return EdgeDetectionResult( + method=operator, + source_channel=source_channel, + display_image=display, + edge_mask=edge_mask, + gradient_magnitude=magnitude, + gradient_orientation=orientation, + parameters=params, + pixel_size_nm=pixel_size_nm, + ) diff --git a/probeflow/processing/filters.py b/probeflow/processing/filters.py index affdb76..716c687 100644 --- a/probeflow/processing/filters.py +++ b/probeflow/processing/filters.py @@ -267,9 +267,12 @@ def edge_detect( method='laplacian' — discrete Laplacian (2nd derivative, no smoothing) method='log' — Laplacian of Gaussian (σ = sigma) method='dog' — Difference of Gaussians (σ₁=sigma, σ₂=sigma2) + method='sobel' — Sobel gradient magnitude √(Gₓ² + G_y²) + method='scharr' — Scharr gradient magnitude (rotationally more accurate) - Returns the filter response — positive = bright edges/peaks, - negative = dark edges/valleys. Useful for atomic-resolution contrast + The Laplacian family returns a signed response — positive = bright + edges/peaks, negative = dark edges/valleys. Sobel/Scharr return a + non-negative gradient magnitude. Useful for atomic-resolution contrast enhancement and finding adsorption sites. """ sigma = _nonnegative_finite(sigma, "sigma") @@ -294,6 +297,14 @@ def edge_detect( g2 = gaussian_filter(a, sigma=max(sigma2, sigma + 0.1), mode='reflect') result = g1 - g2 + elif method in ('sobel', 'scharr'): + from skimage import filters as _skf + gy_fn, gx_fn = ( + (_skf.sobel_h, _skf.sobel_v) if method == 'sobel' + else (_skf.scharr_h, _skf.scharr_v) + ) + result = np.hypot(gx_fn(a), gy_fn(a)) + else: raise ValueError(f"Unknown edge_detect method: {method!r}") diff --git a/probeflow/processing/image.py b/probeflow/processing/image.py index 0d5385e..b5b8960 100644 --- a/probeflow/processing/image.py +++ b/probeflow/processing/image.py @@ -4,6 +4,7 @@ from probeflow.processing.background import * # noqa: F401,F403 from probeflow.processing.alignment import * # noqa: F401,F403 from probeflow.processing.filters import * # noqa: F401,F403 +from probeflow.processing.edge_detection import * # noqa: F401,F403 from probeflow.processing.mains_pickup import * # noqa: F401,F403 from probeflow.processing.inverse_fft import * # noqa: F401,F403 from probeflow.processing.analysis import * # noqa: F401,F403 diff --git a/probeflow/processing/mask_ops.py b/probeflow/processing/mask_ops.py new file mode 100644 index 0000000..2dda268 --- /dev/null +++ b/probeflow/processing/mask_ops.py @@ -0,0 +1,89 @@ +"""Morphological cleanup operations for boolean masks. + +Thin, boolean-in/boolean-out wrappers over ``scipy.ndimage`` (trivial +morphology) and ``skimage.morphology`` (small-object / hole removal, +skeletonisation). These are used by the Masks manager to tidy edge-detection +output before it is used as an active mask or converted to ROIs. + +All sizes/radii are in **pixels**. Functions never introduce NaN (input is +boolean) and always return a fresh boolean array of the same shape. +""" + +from __future__ import annotations + +import numpy as np +from scipy import ndimage as _ndi + + +def _as_bool(mask: np.ndarray) -> np.ndarray: + return np.asarray(mask, dtype=bool) + + +def _disk(radius: int) -> np.ndarray: + """Disk-shaped structuring element of the given pixel radius.""" + r = max(1, int(radius)) + yy, xx = np.mgrid[-r:r + 1, -r:r + 1] + return (xx ** 2 + yy ** 2) <= r ** 2 + + +def invert(mask: np.ndarray) -> np.ndarray: + """Logical complement of *mask*.""" + return ~_as_bool(mask) + + +def dilate(mask: np.ndarray, radius: int = 1) -> np.ndarray: + """Grow True regions by a disk of *radius* px.""" + return _ndi.binary_dilation(_as_bool(mask), structure=_disk(radius)) + + +def erode(mask: np.ndarray, radius: int = 1) -> np.ndarray: + """Shrink True regions by a disk of *radius* px.""" + return _ndi.binary_erosion(_as_bool(mask), structure=_disk(radius)) + + +def binary_open(mask: np.ndarray, radius: int = 1) -> np.ndarray: + """Erosion followed by dilation — removes small protrusions/specks.""" + return _ndi.binary_opening(_as_bool(mask), structure=_disk(radius)) + + +def binary_close(mask: np.ndarray, radius: int = 1) -> np.ndarray: + """Dilation followed by erosion — closes small gaps/holes.""" + return _ndi.binary_closing(_as_bool(mask), structure=_disk(radius)) + + +def fill_holes(mask: np.ndarray) -> np.ndarray: + """Fill enclosed background regions inside True components.""" + return _ndi.binary_fill_holes(_as_bool(mask)) + + +def remove_small_objects(mask: np.ndarray, min_size: int = 16) -> np.ndarray: + """Drop connected True components smaller than *min_size* px.""" + from skimage.morphology import remove_small_objects as _rso + m = _as_bool(mask) + if not m.any(): + return m + return _rso(m, min_size=max(1, int(min_size))) + + +def remove_small_holes(mask: np.ndarray, area_threshold: int = 16) -> np.ndarray: + """Fill enclosed background holes smaller than *area_threshold* px.""" + from skimage.morphology import remove_small_holes as _rsh + m = _as_bool(mask) + if not m.any(): + return m + return _rsh(m, area_threshold=max(1, int(area_threshold))) + + +def skeletonize(mask: np.ndarray) -> np.ndarray: + """Reduce True regions to a 1-px-wide skeleton.""" + from skimage.morphology import skeletonize as _skel + m = _as_bool(mask) + if not m.any(): + return m + return _skel(m) + + +def remove_border_objects(mask: np.ndarray) -> np.ndarray: + """Remove True components touching the image border.""" + from skimage.segmentation import clear_border as _clear + return _clear(_as_bool(mask)) diff --git a/probeflow/provenance/records.py b/probeflow/provenance/records.py index 8f83475..86651d9 100644 --- a/probeflow/provenance/records.py +++ b/probeflow/provenance/records.py @@ -648,6 +648,11 @@ def _step_summary(step: ProvenanceStep) -> str: return f"Background: {p.get('model', 'linear')} ({p.get('line_statistic', 'median')})" if op == "smooth": return f"Gaussian blur/smoothing: sigma={p.get('sigma_px', 1.0)} px" + if op == "edge_detect": + method = str(p.get("method", "laplacian")) + if method in ("sobel", "scharr"): + return f"Edge detection: {method} gradient magnitude" + return f"Edge detection: {method} (sigma={p.get('sigma', 1.0)} px)" if op == "roi": nested = p.get("step") if isinstance(p, dict) else None if isinstance(nested, dict): diff --git a/pyproject.toml b/pyproject.toml index 9b92034..a2925b1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,6 +16,7 @@ dependencies = [ "PySide6", "matplotlib", "shapely>=2.0", + "scikit-image>=0.22", ] [project.optional-dependencies] diff --git a/tests/conftest.py b/tests/conftest.py index 024fd0f..36aa4c9 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -27,6 +27,8 @@ "test_mains_pickup_gui.py", "test_roi_resize_handles_canvas.py", "test_worker_signals_lifetime.py", + "test_edge_detection_dialog.py", + "test_mask_manager_gui.py", } MIXED_QT_FIXTURE_MODULES = { diff --git a/tests/test_edge_detection.py b/tests/test_edge_detection.py new file mode 100644 index 0000000..b71e96f --- /dev/null +++ b/tests/test_edge_detection.py @@ -0,0 +1,162 @@ +"""Tests for the advanced edge-detection backend. + +Synthetic arrays only. No GUI. No external files. Deterministic. +""" +from __future__ import annotations + +import numpy as np +import pytest + +from probeflow.processing import ( + canny_edges, + edge_detect, + gradient_filter, +) +from probeflow.processing.edge_detection import CANNY_PRESETS, EdgeDetectionResult + + +# ── synthetic images ──────────────────────────────────────────────────────── + +def _step_image(n: int = 64, edge_col: int = 32) -> np.ndarray: + """A vertical step edge: left half low, right half high.""" + img = np.zeros((n, n), dtype=np.float64) + img[:, edge_col:] = 1.0 + return img + + +def _disk_image(n: int = 64, radius: int = 18) -> np.ndarray: + yy, xx = np.mgrid[0:n, 0:n] + cy = cx = n // 2 + return ((yy - cy) ** 2 + (xx - cx) ** 2 <= radius ** 2).astype(np.float64) + + +# ── Canny ──────────────────────────────────────────────────────────────────── + +class TestCanny: + def test_finds_step_edge_near_boundary(self): + edge_col = 32 + res = canny_edges(_step_image(edge_col=edge_col), sigma=1.0) + assert isinstance(res, EdgeDetectionResult) + assert res.edge_mask is not None and res.edge_mask.any() + # Edge pixels cluster within a couple of px of the true boundary. + cols = np.flatnonzero(res.edge_mask.any(axis=0)) + assert np.all(np.abs(cols - edge_col) <= 2) + + def test_disk_edges_form_a_ring(self): + res = canny_edges(_disk_image(), sigma=1.0, threshold_mode="percentile") + # The boundary of a disk has many more edge pixels than its area=0 interior row. + assert res.edge_mask.sum() > 30 + + def test_percentile_and_absolute_modes_both_run(self): + img = _step_image() + a = canny_edges(img, sigma=1.0, threshold_mode="percentile", low=70, high=90) + b = canny_edges(img, sigma=1.0, threshold_mode="absolute", low=0.0, high=0.0) + assert a.edge_mask.any() + assert b.edge_mask.any() + + def test_nan_input_preserved_in_display_mask_finite(self): + img = _step_image() + img[0, 0] = np.nan + res = canny_edges(img, sigma=1.0) + assert np.isnan(res.display_image[0, 0]) + assert res.edge_mask.dtype == bool + assert np.isfinite(res.edge_mask).all() # bool is always finite + assert not res.edge_mask[0, 0] + + def test_roi_mask_confines_edges(self): + img = _step_image(edge_col=32) + roi = np.zeros_like(img, dtype=bool) + roi[:, :16] = True # ROI excludes the real edge at col 32 + res = canny_edges(img, sigma=1.0, roi_mask=roi) + assert res.parameters["roi_restricted"] is True + assert not res.edge_mask[:, 16:].any() + + @pytest.mark.parametrize("name", sorted(CANNY_PRESETS)) + def test_preset_runs_and_overrides_params(self, name): + res = canny_edges(_step_image(), preset=name) + assert res.parameters["preset"] == name + assert res.parameters["sigma"] == CANNY_PRESETS[name]["sigma"] + assert res.edge_mask.any() + + def test_unknown_preset_raises(self): + with pytest.raises(ValueError): + canny_edges(_step_image(), preset="nope") + + def test_pixel_size_records_sigma_nm(self): + res = canny_edges(_step_image(), sigma=2.0, pixel_size_nm=0.05) + assert res.parameters["sigma_nm"] == pytest.approx(0.1) + + +# ── Sobel / Scharr ───────────────────────────────────────────────────────────── + +class TestGradient: + @pytest.mark.parametrize("operator", ["sobel", "scharr"]) + def test_magnitude_peaks_at_edge(self, operator): + edge_col = 32 + res = gradient_filter(_step_image(edge_col=edge_col), operator=operator, + output="magnitude", normalize=False) + assert res.gradient_magnitude is not None + # Peak response is at the step boundary. + col_response = res.gradient_magnitude.sum(axis=0) + assert abs(int(np.argmax(col_response)) - edge_col) <= 1 + + def test_orientation_of_vertical_step_is_horizontal_gradient(self): + # A vertical step edge has a gradient pointing in +x (orientation ~ 0). + res = gradient_filter(_step_image(), output="orientation") + edge = gradient_filter(_step_image(), output="magnitude").gradient_magnitude + strong = edge > 0.5 * edge.max() + ori = res.gradient_orientation[strong] + # arctan2(gy, gx) ~ 0 for a pure +x gradient. + assert np.allclose(np.cos(ori), 1.0, atol=1e-6) + + def test_normalize_scales_to_unit_peak(self): + res = gradient_filter(_disk_image(), output="magnitude", normalize=True) + assert np.nanmax(res.display_image) == pytest.approx(1.0) + + def test_threshold_to_mask_produces_binary(self): + res = gradient_filter(_step_image(), output="magnitude", + threshold_to_mask=True, threshold=90) + assert res.edge_mask is not None + assert res.edge_mask.dtype == bool + assert res.edge_mask.any() + + def test_nan_preserved_in_display(self): + img = _step_image() + img[5, 5] = np.nan + res = gradient_filter(img, output="magnitude") + assert np.isnan(res.display_image[5, 5]) + + def test_x_and_y_outputs_differ_for_diagonal(self): + yy, xx = np.mgrid[0:64, 0:64] + ramp = (xx + 2 * yy).astype(np.float64) + gx = gradient_filter(ramp, output="x", normalize=False).display_image + gy = gradient_filter(ramp, output="y", normalize=False).display_image + # y-slope is twice the x-slope, so |gy| median should exceed |gx|. + assert np.median(np.abs(gy)) > np.median(np.abs(gx)) + + def test_invalid_operator_and_output_raise(self): + with pytest.raises(ValueError): + gradient_filter(_step_image(), operator="prewitt") + with pytest.raises(ValueError): + gradient_filter(_step_image(), output="curl") + + +# ── edge_detect (history-replayable) sobel/scharr extension ──────────────────── + +class TestEdgeDetectGradient: + @pytest.mark.parametrize("method", ["sobel", "scharr"]) + def test_returns_nonnegative_magnitude(self, method): + out = edge_detect(_step_image(), method=method) + assert out.shape == (64, 64) + assert np.nanmin(out) >= 0.0 + assert np.nanmax(out) > 0.0 + + def test_nan_roundtrips(self): + img = _step_image() + img[10, 10] = np.nan + out = edge_detect(img, method="sobel") + assert np.isnan(out[10, 10]) + + def test_unknown_method_still_raises(self): + with pytest.raises(ValueError): + edge_detect(_step_image(), method="bogus") diff --git a/tests/test_edge_detection_dialog.py b/tests/test_edge_detection_dialog.py new file mode 100644 index 0000000..bedc630 --- /dev/null +++ b/tests/test_edge_detection_dialog.py @@ -0,0 +1,130 @@ +"""Offscreen-Qt tests for the Advanced Edge Detection dialog.""" +from __future__ import annotations + +import os + +import numpy as np +import pytest + +os.environ.setdefault("QT_QPA_PLATFORM", "offscreen") + +N = 64 + + +@pytest.fixture +def qapp(): + try: + from PySide6.QtWidgets import QApplication + except Exception as exc: + pytest.skip(f"PySide6 unavailable: {exc}") + app = QApplication.instance() + if app is not None: + return app + try: + return QApplication([]) + except Exception as exc: + pytest.skip(f"QApplication unavailable: {exc}") + + +def _step_image() -> np.ndarray: + img = np.zeros((N, N), dtype=np.float64) + img[:, N // 2:] = 1.0 + return img + + +def _dialog(qapp, **kw): + from probeflow.gui.dialogs.edge_detection import EdgeDetectionDialog + dlg = EdgeDetectionDialog(_step_image(), **kw) + dlg._debounce.stop() # run recompute synchronously in tests + dlg._recompute() + return dlg + + +def test_dialog_builds_and_previews_canny(qapp): + dlg = _dialog(qapp) + assert dlg._result is not None + assert dlg._result.method == "canny" + assert dlg._result.edge_mask.any() + dlg.deleteLater() + + +def test_method_switch_swaps_panel_and_recomputes(qapp): + dlg = _dialog(qapp) + dlg._method_combo.setCurrentIndex(1) # Sobel / Scharr + dlg._recompute() + assert dlg._stack.currentIndex() == 1 + assert dlg._result.method == "sobel" + assert dlg._result.gradient_magnitude is not None + dlg.deleteLater() + + +def test_preset_fills_canny_fields(qapp): + dlg = _dialog(qapp) + dlg._canny_preset.setCurrentText("Step edges / islands") + assert dlg._canny_sigma.value() == pytest.approx(2.0) + assert dlg._canny_low.value() == pytest.approx(60.0) + assert dlg._canny_high.value() == pytest.approx(85.0) + dlg.deleteLater() + + +def test_mask_created_signal_emits_image_mask(qapp): + dlg = _dialog(qapp) + captured = [] + dlg.mask_created.connect(captured.append) + dlg._emit_mask() + assert len(captured) == 1 + from probeflow.core.mask import ImageMask + assert isinstance(captured[0], ImageMask) + assert captured[0].count() > 0 + dlg.deleteLater() + + +def test_overlay_signal_emits_result(qapp): + dlg = _dialog(qapp) + captured = [] + dlg.overlay_requested.connect(captured.append) + dlg._emit_overlay() + assert len(captured) == 1 + assert captured[0] is dlg._result + dlg.deleteLater() + + +def test_image_created_signal_carries_provenance(qapp): + dlg = _dialog(qapp) + captured = [] + dlg.image_created.connect(lambda arr, prov: captured.append((arr, prov))) + dlg._emit_image() + assert len(captured) == 1 + arr, prov = captured[0] + assert arr.shape == (N, N) + assert prov["op"] == "advanced_edge_detection" + assert prov["method"] == "canny" + dlg.deleteLater() + + +def test_rois_created_from_canny_mask(qapp): + # A filled disk yields a closed region that converts to a polygon ROI. + from probeflow.gui.dialogs.edge_detection import EdgeDetectionDialog + yy, xx = np.mgrid[0:N, 0:N] + disk = (((yy - 32) ** 2 + (xx - 32) ** 2) <= 12 ** 2).astype(np.float64) + dlg = EdgeDetectionDialog(disk) + dlg._debounce.stop() + dlg._method_combo.setCurrentIndex(1) + dlg._grad_threshold_cb.setChecked(True) + dlg._recompute() + dlg._roi_min_size.setValue(0) + captured = [] + dlg.rois_created.connect(captured.append) + dlg._emit_rois() + # Either ROIs were produced, or an info box (no closed region) — both valid; + # for a thresholded disk gradient we expect at least one ROI. + assert captured and len(captured[0]) >= 1 + dlg.deleteLater() + + +def test_sobel_without_threshold_has_no_mask(qapp): + dlg = _dialog(qapp) + dlg._method_combo.setCurrentIndex(1) + dlg._recompute() + assert dlg._result.edge_mask is None + dlg.deleteLater() diff --git a/tests/test_mask.py b/tests/test_mask.py new file mode 100644 index 0000000..99c37ce --- /dev/null +++ b/tests/test_mask.py @@ -0,0 +1,141 @@ +"""Tests for the active-mask layer: ImageMask, MaskSet, mask_name, sidecar, mask→ROI.""" +from __future__ import annotations + +import numpy as np +import pytest + +from probeflow.core.mask import ImageMask, MaskSet, mask_name +from probeflow.core.roi import roi_from_mask + + +def _mask(n: int = 16) -> np.ndarray: + m = np.zeros((n, n), dtype=bool) + m[4:12, 4:12] = True + return m + + +# ── ImageMask ───────────────────────────────────────────────────────────────── + +class TestImageMask: + def test_new_generates_id_and_name(self): + m = ImageMask.new(_mask(), method="canny", parameters={"sigma": 1.0}) + assert m.id + assert m.name == "Canny_sigma1_p70-90" + assert m.count() == 64 + + def test_rejects_non_2d(self): + with pytest.raises(ValueError): + ImageMask.new(np.zeros((2, 2, 2), dtype=bool)) + + def test_packbits_roundtrip_preserves_shape_and_values(self): + rng = np.random.default_rng(0) + data = rng.random((23, 17)) > 0.5 # non-byte-aligned dims + m = ImageMask.new(data, name="rand") + back = ImageMask.from_dict(m.to_dict()) + assert back.shape == (23, 17) + assert np.array_equal(back.data, data) + assert back.name == "rand" + + def test_shape_property(self): + assert ImageMask.new(_mask(16)).shape == (16, 16) + + +# ── mask_name ───────────────────────────────────────────────────────────────── + +class TestMaskName: + def test_canny_percentile(self): + assert mask_name("canny", {"sigma": 1.5, "low": 60, "high": 85}) == "Canny_sigma1.5_p60-85" + + def test_canny_absolute_drops_percentile_suffix(self): + assert mask_name("canny", {"sigma": 1.0, "threshold_mode": "absolute"}) == "Canny_sigma1" + + def test_sobel_magnitude_thresholded(self): + assert mask_name("sobel", {"output": "magnitude", "threshold_to_mask": True, + "threshold": 90}) == "Sobel_magnitude_p90" + + def test_scharr_directional(self): + assert mask_name("scharr", {"output": "x"}) == "Scharr_x_gradient" + + +# ── MaskSet ─────────────────────────────────────────────────────────────────── + +class TestMaskSet: + def test_add_active_remove(self): + ms = MaskSet(image_id="img") + m = ImageMask.new(_mask(), name="a") + ms.add(m) + ms.set_active(m.id) + assert ms.active() is m + ms.remove(m.id) + assert ms.active_mask_id is None + assert ms.get(m.id) is None + + def test_set_active_unknown_raises(self): + ms = MaskSet(image_id="img") + with pytest.raises(ValueError): + ms.set_active("nope") + + def test_replace_data_in_place(self): + ms = MaskSet(image_id="img") + m = ImageMask.new(_mask()) + ms.add(m) + ms.replace(m.id, np.zeros((16, 16), dtype=bool)) + assert ms.get(m.id).count() == 0 + + def test_roundtrip(self): + ms = MaskSet(image_id="img") + m1 = ImageMask.new(_mask(), name="one") + m2 = ImageMask.new(~_mask(), name="two") + ms.add(m1) + ms.add(m2) + ms.set_active(m2.id) + back = MaskSet.from_dict(ms.to_dict()) + assert back.image_id == "img" + assert [m.name for m in back.masks] == ["one", "two"] + assert back.active_mask_id == m2.id + assert np.array_equal(back.get(m1.id).data, m1.data) + + def test_roundtrip_drops_dangling_active(self): + d = MaskSet(image_id="img").to_dict() + d["active_mask_id"] = "ghost" + assert MaskSet.from_dict(d).active_mask_id is None + + +# ── mask → ROI ────────────────────────────────────────────────────────────────── + +class TestRoiFromMask: + def test_single_square_roundtrips(self): + m = _mask(32) + rois = roi_from_mask(m) + assert len(rois) == 1 + rt = rois[0].to_mask(m.shape) + # Allow the half-pixel contour expansion; most original pixels recovered. + iou = (rt & m).sum() / (rt | m).sum() + assert iou > 0.8 + # Nearly all original pixels recovered (contour tracing may clip a corner). + assert (rt & m).sum() >= m.sum() - 1 + + def test_two_components_split(self): + m = np.zeros((40, 40), dtype=bool) + m[5:12, 5:12] = True + m[25:35, 25:35] = True + rois = roi_from_mask(m, one_per_component=True) + assert len(rois) == 2 + + def test_min_size_filters_small_blobs(self): + m = np.zeros((40, 40), dtype=bool) + m[5:30, 5:30] = True # big + m[0, 0] = True # 1-px speck + rois = roi_from_mask(m, min_size_px=10) + assert len(rois) == 1 + + def test_union_returns_single_roi(self): + m = np.zeros((40, 40), dtype=bool) + m[5:12, 5:12] = True + m[25:35, 25:35] = True + rois = roi_from_mask(m, one_per_component=False) + assert len(rois) == 1 + assert rois[0].kind == "multipolygon" + + def test_empty_mask_returns_empty(self): + assert roi_from_mask(np.zeros((10, 10), dtype=bool)) == [] diff --git a/tests/test_mask_integration.py b/tests/test_mask_integration.py new file mode 100644 index 0000000..5dff2d4 --- /dev/null +++ b/tests/test_mask_integration.py @@ -0,0 +1,62 @@ +"""Qt-free integration: the active mask flows into statistics and background exclusion. + +These exercise the backend seams the viewer relies on (``roi_statistics(mask=…)`` +and ``subtract_background`` exclusion via the mask→ROI bridge), without Qt. +""" +from __future__ import annotations + +import numpy as np + +from probeflow.core.mask import ImageMask, MaskSet +from probeflow.core.roi import roi_from_mask +from probeflow.measurements.image import roi_statistics +from probeflow.processing import subtract_background + + +def test_active_mask_restricts_statistics(): + # Left half = 0, right half = 10. A mask over the right half should report + # mean 10, not 5. + arr = np.zeros((32, 32), dtype=np.float64) + arr[:, 16:] = 10.0 + mask = np.zeros((32, 32), dtype=bool) + mask[:, 16:] = True + + ms = MaskSet(image_id="img") + ms.add(ImageMask.new(mask, name="right")) + ms.set_active(ms.masks[0].id) + + result = roi_statistics( + arr, measurement_id="m1", source_label="t", + mask=ms.active().data, pixel_size_x=1.0, pixel_size_y=1.0, + ) + assert result.values["mean_height"] == 10.0 + assert result.values["n_finite_pixels"] == 16 * 32 + + +def test_active_mask_excludes_region_from_plane_fit(): + # A tilted plane plus a bright contaminated blob. Fitting with the blob + # excluded recovers a near-flat residual; including it skews the fit. + yy, xx = np.mgrid[0:64, 0:64] + plane = 0.1 * xx + 0.05 * yy + img = plane.astype(np.float64).copy() + blob = np.zeros_like(img, dtype=bool) + blob[10:20, 10:20] = True + img[blob] += 50.0 # contamination + + rois = roi_from_mask(blob, min_size_px=0) + assert rois # mask converts to at least one ROI + + excluded = subtract_background(img, order=1, exclude_roi=rois[0]) + included = subtract_background(img, order=1) + + # Residual std away from the blob is much smaller when the blob is excluded. + clean = ~blob + assert np.std(excluded[clean]) < np.std(included[clean]) + assert np.std(excluded[clean]) < 1.0 + + +def test_active_mask_array_shape_guard_via_maskset(): + ms = MaskSet(image_id="img") + ms.add(ImageMask.new(np.ones((8, 8), dtype=bool))) + ms.set_active(ms.masks[0].id) + assert ms.active().shape == (8, 8) diff --git a/tests/test_mask_manager_gui.py b/tests/test_mask_manager_gui.py new file mode 100644 index 0000000..5b26cb6 --- /dev/null +++ b/tests/test_mask_manager_gui.py @@ -0,0 +1,101 @@ +"""Offscreen-Qt tests for the Masks manager panel.""" +from __future__ import annotations + +import os + +import numpy as np +import pytest + +os.environ.setdefault("QT_QPA_PLATFORM", "offscreen") + + +@pytest.fixture +def qapp(): + try: + from PySide6.QtWidgets import QApplication + except Exception as exc: + pytest.skip(f"PySide6 unavailable: {exc}") + app = QApplication.instance() + if app is not None: + return app + try: + return QApplication([]) + except Exception as exc: + pytest.skip(f"QApplication unavailable: {exc}") + + +def _mask_set(): + from probeflow.core.mask import ImageMask, MaskSet + ms = MaskSet(image_id="img") + data = np.zeros((20, 20), dtype=bool) + data[5:15, 5:15] = True + data[0, 0] = True # speck for cleanup + ms.add(ImageMask.new(data, name="m1")) + return ms + + +def _panel(qapp, ms, callbacks=None): + from probeflow.gui.mask_manager import MaskManagerPanel + return MaskManagerPanel(lambda: ms, callbacks or {}) + + +def test_panel_lists_masks_and_marks_active(qapp): + ms = _mask_set() + ms.set_active(ms.masks[0].id) + panel = _panel(qapp, ms) + assert panel._list.count() == 1 + assert panel._list.item(0).text().startswith("●") + panel.deleteLater() + + +def test_set_active_via_panel(qapp): + ms = _mask_set() + changed = [] + panel = _panel(qapp, ms, {"on_mask_set_changed": lambda: changed.append(True)}) + panel._list.setCurrentRow(0) + panel._on_set_active() + assert ms.active_mask_id == ms.masks[0].id + assert changed + panel.deleteLater() + + +def test_cleanup_removes_small_objects(qapp): + ms = _mask_set() + panel = _panel(qapp, ms) + panel._list.setCurrentRow(0) + before = ms.masks[0].count() + panel._cleanup_combo.setCurrentText("Remove small objects") + panel._on_cleanup() + after = ms.masks[0].count() + assert after < before # speck removed + assert not ms.masks[0].data[0, 0] + panel.deleteLater() + + +def test_invert_via_panel(qapp): + ms = _mask_set() + panel = _panel(qapp, ms) + panel._list.setCurrentRow(0) + before = ms.masks[0].count() + panel._on_invert() + assert ms.masks[0].count() == 20 * 20 - before + panel.deleteLater() + + +def test_convert_to_roi_callback_fires(qapp): + ms = _mask_set() + captured = [] + panel = _panel(qapp, ms, {"convert_to_roi": captured.append}) + panel._list.setCurrentRow(0) + panel._on_convert_roi() + assert captured == [ms.masks[0].id] + panel.deleteLater() + + +def test_delete_via_panel(qapp): + ms = _mask_set() + panel = _panel(qapp, ms) + panel._list.setCurrentRow(0) + panel._on_delete() + assert len(ms.masks) == 0 + panel.deleteLater() diff --git a/tests/test_mask_ops.py b/tests/test_mask_ops.py new file mode 100644 index 0000000..a2f4441 --- /dev/null +++ b/tests/test_mask_ops.py @@ -0,0 +1,97 @@ +"""Tests for morphological mask cleanup operations.""" +from __future__ import annotations + +import numpy as np +import pytest + +from probeflow.processing import mask_ops + + +def _square(n: int = 20) -> np.ndarray: + m = np.zeros((n, n), dtype=bool) + m[5:15, 5:15] = True + return m + + +def test_invert(): + m = _square() + assert np.array_equal(mask_ops.invert(m), ~m) + + +def test_dilate_grows_and_erode_shrinks(): + m = _square() + assert mask_ops.dilate(m, 1).sum() > m.sum() + assert mask_ops.erode(m, 1).sum() < m.sum() + + +def test_open_removes_speck(): + m = _square() + m[0, 0] = True # isolated speck + opened = mask_ops.binary_open(m, 1) + assert not opened[0, 0] + # Bulk of the square survives opening (a disk element rounds the corners). + assert opened[8:12, 8:12].all() + + +def test_close_fills_small_gap(): + m = _square() + m[10, 10] = False # 1-px hole + closed = mask_ops.binary_close(m, 1) + assert closed[10, 10] + + +def test_fill_holes(): + m = _square() + m[8:12, 8:12] = False # interior hole + filled = mask_ops.fill_holes(m) + assert filled[8:12, 8:12].all() + + +def test_remove_small_objects(): + m = _square() + m[0, 0] = True + cleaned = mask_ops.remove_small_objects(m, min_size=5) + assert not cleaned[0, 0] + assert cleaned[5:15, 5:15].all() + + +def test_remove_small_holes(): + m = _square() + m[9, 9] = False + filled = mask_ops.remove_small_holes(m, area_threshold=5) + assert filled[9, 9] + + +def test_skeletonize_thins_to_line(): + m = _square() + skel = mask_ops.skeletonize(m) + assert skel.sum() < m.sum() + assert skel.shape == m.shape + + +def test_remove_border_objects(): + m = np.zeros((20, 20), dtype=bool) + m[0:5, 0:5] = True # touches border → removed + m[10:15, 10:15] = True # interior → kept + cleaned = mask_ops.remove_border_objects(m) + assert not cleaned[0:5, 0:5].any() + assert cleaned[10:15, 10:15].all() + + +@pytest.mark.parametrize("op", [ + mask_ops.dilate, mask_ops.erode, mask_ops.binary_open, mask_ops.binary_close, +]) +def test_empty_mask_safe(op): + empty = np.zeros((10, 10), dtype=bool) + out = op(empty) + assert out.shape == (10, 10) + assert not out.any() + + +def test_shape_preserved_and_bool_dtype(): + m = _square() + for fn in (mask_ops.invert, mask_ops.fill_holes, mask_ops.skeletonize, + mask_ops.remove_small_objects, mask_ops.remove_border_objects): + out = fn(m) + assert out.shape == m.shape + assert out.dtype == bool diff --git a/tests/test_mask_sidecar.py b/tests/test_mask_sidecar.py new file mode 100644 index 0000000..2a3cfbc --- /dev/null +++ b/tests/test_mask_sidecar.py @@ -0,0 +1,48 @@ +"""Tests for the mask sidecar (on-disk persistence).""" +from __future__ import annotations + +import numpy as np +import pytest + +from probeflow.core.mask import ImageMask, MaskSet +from probeflow.io.mask_sidecar import ( + default_mask_sidecar_path, + load_mask_set_sidecar, + save_mask_set_sidecar, +) + + +def test_default_sidecar_path_keeps_dotted_stem(tmp_path): + scan = tmp_path / "A250320.191933.dat" + assert default_mask_sidecar_path(scan).name == "A250320.191933.masks.json" + + +def test_save_load_roundtrip(tmp_path): + scan = tmp_path / "scan.sxm" + ms = MaskSet(image_id="scan") + data = np.zeros((12, 9), dtype=bool) + data[2:6, 3:7] = True + m = ImageMask.new(data, method="canny", parameters={"sigma": 1.0}) + ms.add(m) + ms.set_active(m.id) + + saved = save_mask_set_sidecar(ms, scan) + assert saved.exists() + + loaded, path_used = load_mask_set_sidecar(scan) + assert path_used == saved + assert loaded.image_id == "scan" + assert loaded.active_mask_id == m.id + assert np.array_equal(loaded.get(m.id).data, data) + + +def test_missing_ok_returns_none(tmp_path): + scan = tmp_path / "absent.sxm" + loaded, path_used = load_mask_set_sidecar(scan, missing_ok=True) + assert loaded is None + assert path_used == default_mask_sidecar_path(scan) + + +def test_missing_raises_without_ok(tmp_path): + with pytest.raises(FileNotFoundError): + load_mask_set_sidecar(tmp_path / "absent.sxm")