diff --git a/src/sas/qtgui/Perspectives/Fitting/EventMediator.py b/src/sas/qtgui/Perspectives/Fitting/EventMediator.py new file mode 100644 index 0000000000..f8ff3fb7c6 --- /dev/null +++ b/src/sas/qtgui/Perspectives/Fitting/EventMediator.py @@ -0,0 +1,319 @@ +""" +EventMediator - Central event bus for FittingWidget. + +This class implements the Mediator pattern to decouple signal/slot connections +and reduce tight coupling between FittingWidget and its child widgets. +""" + +from collections.abc import Callable +from enum import Enum, auto +from typing import Any + +from PySide6 import QtCore + + +class FittingEventType(Enum): + """Enumeration of all event types in the fitting perspective.""" + + # Model selection events + MODEL_SELECTED = auto() + STRUCTURE_FACTOR_SELECTED = auto() + CATEGORY_SELECTED = auto() + BATCH_FILE_SELECTED = auto() + + # View toggle events + VIEW_2D_TOGGLED = auto() + POLYDISPERSITY_TOGGLED = auto() + MAGNETISM_TOGGLED = auto() + CHAIN_FIT_TOGGLED = auto() + + # Action button events + FIT_REQUESTED = auto() + PLOT_REQUESTED = auto() + HELP_REQUESTED = auto() + + # Data events + DATA_LOADED = auto() + DATA_UPDATED = auto() + MASKED_DATA_UPDATED = auto() + Q_RANGE_UPDATED = auto() + + # Parameter events + PARAMS_CHANGED = auto() + SELECTION_CHANGED = auto() + FIT_ENABLEMENT_CHANGED = auto() + + # Calculation events + CALCULATION_STARTED = auto() + CALCULATION_FINISHED = auto() + CALCULATION_1D_FINISHED = auto() + CALCULATION_2D_FINISHED = auto() + + # Fitting events + FITTING_STARTED = auto() + FITTING_FINISHED = auto() + BATCH_FITTING_FINISHED = auto() + + # Constraint events + CONSTRAINT_ADDED = auto() + CONSTRAINT_REMOVED = auto() + + # Widget update events + OPTIONS_UPDATE_REQUESTED = auto() + SMEARING_OPTIONS_UPDATED = auto() + MODEL_ITERATION_REQUESTED = auto() + + # Custom model events + CUSTOM_MODEL_CHANGED = auto() + MODEL_CATEGORIES_UPDATED = auto() + + # Keyboard events + KEY_PRESSED = auto() + + +class EventMediator(QtCore.QObject): + """ + Central event bus for managing events in the fitting perspective. + + This mediator decouples signal/slot connections by providing: + - Type-safe event routing + - Centralized subscription management + - Weak reference support to prevent memory leaks + - Event filtering and transformation capabilities + + Usage: + mediator = EventMediator() + mediator.subscribe(FittingEventType.FIT_REQUESTED, self.onFit) + mediator.publish(FittingEventType.FIT_REQUESTED, {"params": params}) + """ + + def __init__(self, parent: QtCore.QObject | None = None): + """ + Initialize the EventMediator. + + Args: + parent: Parent QObject (optional) + """ + super().__init__(parent) + + # Dictionary of event_type -> list of (callback, use_weak_ref) tuples + self._subscribers: dict[FittingEventType, list[tuple[Callable, bool]]] = {} + + # Event history for debugging (limited size) + self._event_history: list[tuple[FittingEventType, Any]] = [] + self._max_history_size = 100 + + # Enable/disable event logging + self._logging_enabled = False + + def subscribe( + self, + event_type: FittingEventType, + callback: Callable[[Any], None], + use_weak_ref: bool = False + ) -> None: + """ + Subscribe to an event type. + + Args: + event_type: The event type to subscribe to + callback: Function to call when event is published + use_weak_ref: If True, use weak reference to prevent memory leaks + """ + if event_type not in self._subscribers: + self._subscribers[event_type] = [] + + self._subscribers[event_type].append((callback, use_weak_ref)) + + def unsubscribe( + self, + event_type: FittingEventType, + callback: Callable[[Any], None] + ) -> None: + """ + Unsubscribe from an event type. + + Args: + event_type: The event type to unsubscribe from + callback: The callback function to remove + """ + if event_type in self._subscribers: + self._subscribers[event_type] = [ + (cb, weak) for cb, weak in self._subscribers[event_type] + if cb != callback + ] + + def publish( + self, + event_type: FittingEventType, + event_data: Any = None + ) -> None: + """ + Publish an event to all subscribers. + + Args: + event_type: The event type being published + event_data: Optional data associated with the event + """ + # Log event if enabled + if self._logging_enabled: + self._log_event(event_type, event_data) + + # Store in history + self._add_to_history(event_type, event_data) + + # Notify all subscribers + if event_type in self._subscribers: + for callback, use_weak_ref in self._subscribers[event_type][:]: + try: + if use_weak_ref: + # TODO: Implement weak reference support + callback(event_data) + else: + callback(event_data) + except Exception as e: + # Log error but don't stop other callbacks + print(f"Error in event callback for {event_type}: {e}") + + def clear_subscribers(self, event_type: FittingEventType | None = None) -> None: + """ + Clear all subscribers for an event type, or all subscribers if None. + + Args: + event_type: Event type to clear, or None to clear all + """ + if event_type is None: + self._subscribers.clear() + elif event_type in self._subscribers: + self._subscribers[event_type].clear() + + def has_subscribers(self, event_type: FittingEventType) -> bool: + """ + Check if an event type has any subscribers. + + Args: + event_type: Event type to check + + Returns: + True if there are subscribers, False otherwise + """ + return event_type in self._subscribers and len(self._subscribers[event_type]) > 0 + + def get_subscriber_count(self, event_type: FittingEventType) -> int: + """ + Get the number of subscribers for an event type. + + Args: + event_type: Event type to check + + Returns: + Number of subscribers + """ + return len(self._subscribers.get(event_type, [])) + + def enable_logging(self, enabled: bool = True) -> None: + """ + Enable or disable event logging. + + Args: + enabled: True to enable logging, False to disable + """ + self._logging_enabled = enabled + + def get_event_history(self) -> list[tuple[FittingEventType, Any]]: + """ + Get the event history. + + Returns: + List of (event_type, event_data) tuples + """ + return self._event_history.copy() + + def clear_history(self) -> None: + """Clear the event history.""" + self._event_history.clear() + + def _add_to_history(self, event_type: FittingEventType, event_data: Any) -> None: + """Add event to history with size limit.""" + self._event_history.append((event_type, event_data)) + + # Trim history if too large + if len(self._event_history) > self._max_history_size: + self._event_history = self._event_history[-self._max_history_size:] + + def _log_event(self, event_type: FittingEventType, event_data: Any) -> None: + """Log event for debugging.""" + print(f"[EventMediator] {event_type.name}: {event_data}") + + +class EventBridge: + """ + Helper class to bridge Qt signals to EventMediator. + + This class provides utilities to connect Qt signals to the event mediator, + reducing boilerplate in initializeSignals(). + """ + + def __init__(self, mediator: EventMediator): + """ + Initialize the EventBridge. + + Args: + mediator: The EventMediator to publish events to + """ + self.mediator = mediator + + def connect_signal( + self, + signal: QtCore.Signal, + event_type: FittingEventType, + transform: Callable[[Any], Any] | None = None + ) -> None: + """ + Connect a Qt signal to an event type. + + Args: + signal: Qt signal to connect + event_type: Event type to publish when signal is emitted + transform: Optional function to transform signal data before publishing + """ + def on_signal_emitted(*args): + # Extract single value if only one arg + event_data = args[0] if len(args) == 1 else args if args else None + + # Apply transformation if provided + if transform: + event_data = transform(event_data) + + # Publish to mediator + self.mediator.publish(event_type, event_data) + + signal.connect(on_signal_emitted) + + def connect_action( + self, + signal: QtCore.Signal, + event_type: FittingEventType + ) -> None: + """ + Connect a parameterless signal (like button clicks) to an event type. + + Args: + signal: Qt signal to connect + event_type: Event type to publish when signal is emitted + """ + signal.connect(lambda: self.mediator.publish(event_type)) + + def publish_to_signal( + self, + event_type: FittingEventType, + signal: QtCore.Signal + ) -> None: + """ + Subscribe to an event type and emit a Qt signal when it occurs. + + Args: + event_type: Event type to subscribe to + signal: Qt signal to emit when event occurs + """ + self.mediator.subscribe(event_type, lambda data: signal.emit(data)) diff --git a/src/sas/qtgui/Perspectives/Fitting/FittingWidget.py b/src/sas/qtgui/Perspectives/Fitting/FittingWidget.py index b9fcacb39e..e0f25eb1ad 100644 --- a/src/sas/qtgui/Perspectives/Fitting/FittingWidget.py +++ b/src/sas/qtgui/Perspectives/Fitting/FittingWidget.py @@ -20,6 +20,7 @@ from sas.qtgui.Perspectives.Fitting import FittingUtilities from sas.qtgui.Perspectives.Fitting.ConsoleUpdate import ConsoleUpdate from sas.qtgui.Perspectives.Fitting.Constraint import Constraint +from sas.qtgui.Perspectives.Fitting.EventMediator import EventBridge, EventMediator, FittingEventType from sas.qtgui.Perspectives.Fitting.FitPage import FitPage from sas.qtgui.Perspectives.Fitting.FitThread import FitThread from sas.qtgui.Perspectives.Fitting.FittingLogic import FittingLogic @@ -39,6 +40,7 @@ from sas.sascalc.fit import models from sas.sascalc.fit.BumpsFitting import BumpsFit as Fit from sas.system import HELP_SYSTEM +from sas.system.user import find_plugins_dir TAB_MAGNETISM = 4 TAB_POLY = 3 @@ -102,6 +104,10 @@ def __init__(self, parent: QtWidgets.QWidget | None = None, data: Any | None = N # Globals self.initializeGlobals() + # Initialize event mediator for decoupled event handling + self.event_mediator = EventMediator(parent=self) + self.event_bridge = EventBridge(self.event_mediator) + # data index for the batch set self.data_index = 0 # Main Data[12]D holders @@ -575,54 +581,203 @@ def initializeControls(self) -> None: def initializeSignals(self) -> None: """ - Connect GUI element signals - """ - # Comboboxes - self.cbStructureFactor.currentIndexChanged.connect(self.onSelectStructureFactor) - self.cbCategory.currentIndexChanged.connect(self.onSelectCategory) - self.cbModel.currentIndexChanged.connect(self.onSelectModel) - self.cbFileNames.currentIndexChanged.connect(self.onSelectBatchFilename) - # Checkboxes - self.chk2DView.toggled.connect(self.toggle2D) - self.chkPolydispersity.toggled.connect(self.togglePoly) - self.chkMagnetism.toggled.connect(self.toggleMagnetism) - self.chkChainFit.toggled.connect(self.toggleChainFit) - # Buttons - self.cmdFit.clicked.connect(self.onFit) - self.cmdPlot.clicked.connect(self.onPlot) - self.cmdHelp.clicked.connect(self.onHelp) - - # Respond to change in parameters from the UI - self._model_model.dataChanged.connect(self.onMainParamsChange) - self.lstParams.selectionModel().selectionChanged.connect(self.onSelectionChanged) + Connect GUI element signals using EventMediator for decoupled event handling. + + The EventMediator pattern provides: + - Centralized event routing + - Type-safe event handling + - Easier testing and debugging + - Reduced coupling between components + """ + # ========== UI Control Events (routed through mediator) ========== + + # Comboboxes - model selection events + self.event_bridge.connect_signal( + self.cbStructureFactor.currentIndexChanged, + FittingEventType.STRUCTURE_FACTOR_SELECTED + ) + self.event_bridge.connect_signal( + self.cbCategory.currentIndexChanged, + FittingEventType.CATEGORY_SELECTED + ) + self.event_bridge.connect_signal( + self.cbModel.currentIndexChanged, + FittingEventType.MODEL_SELECTED + ) + self.event_bridge.connect_signal( + self.cbFileNames.currentIndexChanged, + FittingEventType.BATCH_FILE_SELECTED + ) + + # Checkboxes - view toggle events + self.event_bridge.connect_signal( + self.chk2DView.toggled, + FittingEventType.VIEW_2D_TOGGLED + ) + self.event_bridge.connect_signal( + self.chkPolydispersity.toggled, + FittingEventType.POLYDISPERSITY_TOGGLED + ) + self.event_bridge.connect_signal( + self.chkMagnetism.toggled, + FittingEventType.MAGNETISM_TOGGLED + ) + self.event_bridge.connect_signal( + self.chkChainFit.toggled, + FittingEventType.CHAIN_FIT_TOGGLED + ) + + # Buttons - action events + self.event_bridge.connect_action( + self.cmdFit.clicked, + FittingEventType.FIT_REQUESTED + ) + self.event_bridge.connect_action( + self.cmdPlot.clicked, + FittingEventType.PLOT_REQUESTED + ) + self.event_bridge.connect_action( + self.cmdHelp.clicked, + FittingEventType.HELP_REQUESTED + ) + + # ========== Subscribe to events ========== + + # Model selection handlers + self.event_mediator.subscribe(FittingEventType.STRUCTURE_FACTOR_SELECTED, lambda _: self.onSelectStructureFactor()) + self.event_mediator.subscribe(FittingEventType.CATEGORY_SELECTED, lambda _: self.onSelectCategory()) + self.event_mediator.subscribe(FittingEventType.MODEL_SELECTED, lambda _: self.onSelectModel()) + self.event_mediator.subscribe(FittingEventType.BATCH_FILE_SELECTED, lambda idx: self.onSelectBatchFilename(idx)) + + # View toggle handlers + self.event_mediator.subscribe(FittingEventType.VIEW_2D_TOGGLED, lambda checked: self.toggle2D(checked)) + self.event_mediator.subscribe(FittingEventType.POLYDISPERSITY_TOGGLED, lambda checked: self.togglePoly(checked)) + self.event_mediator.subscribe(FittingEventType.MAGNETISM_TOGGLED, lambda checked: self.toggleMagnetism(checked)) + self.event_mediator.subscribe(FittingEventType.CHAIN_FIT_TOGGLED, lambda checked: self.toggleChainFit(checked)) + + # Action handlers + self.event_mediator.subscribe(FittingEventType.FIT_REQUESTED, lambda _: self.onFit()) + self.event_mediator.subscribe(FittingEventType.PLOT_REQUESTED, lambda _: self.onPlot()) + self.event_mediator.subscribe(FittingEventType.HELP_REQUESTED, lambda _: self.onHelp()) + + # Parameter model changes + self.event_bridge.connect_signal( + self._model_model.dataChanged, + FittingEventType.PARAMS_CHANGED + ) + self.event_mediator.subscribe( + FittingEventType.PARAMS_CHANGED, + lambda args: self.onMainParamsChange(args[0], args[1]) # top, bottom indices + ) + + # Parameter selection changes + # Note: lstParams.selectionModel().selectionChanged needs special handling + self.lstParams.selectionModel().selectionChanged.connect( + lambda: self.event_mediator.publish(FittingEventType.SELECTION_CHANGED) + ) + self.event_mediator.subscribe(FittingEventType.SELECTION_CHANGED, lambda _: self.onSelectionChanged()) + + # Event filter for keyboard events (must stay direct - Qt requirement) self.lstParams.installEventFilter(self) - # Local signals - self.batchFittingFinishedSignal.connect(self.batchFitComplete) - self.fittingFinishedSignal.connect(self.fitComplete) - self.Calc1DFinishedSignal.connect(self.complete1D) - self.Calc2DFinishedSignal.connect(self.complete2D) - - # Signals from separate tabs asking for replot - self.options_widget.plot_signal.connect(self.onOptionsUpdate) + # Calculation completion signals + self.event_bridge.connect_signal( + self.batchFittingFinishedSignal, + FittingEventType.BATCH_FITTING_FINISHED + ) + self.event_bridge.connect_signal( + self.fittingFinishedSignal, + FittingEventType.FITTING_FINISHED + ) + self.event_bridge.connect_signal( + self.Calc1DFinishedSignal, + FittingEventType.CALCULATION_1D_FINISHED + ) + self.event_bridge.connect_signal( + self.Calc2DFinishedSignal, + FittingEventType.CALCULATION_2D_FINISHED + ) + + # Subscribe to calculation events + self.event_mediator.subscribe(FittingEventType.BATCH_FITTING_FINISHED, lambda result: self.batchFitComplete(result)) + self.event_mediator.subscribe(FittingEventType.FITTING_FINISHED, lambda result: self.fitComplete(result)) + self.event_mediator.subscribe(FittingEventType.CALCULATION_1D_FINISHED, lambda data: self.complete1D(data)) + self.event_mediator.subscribe(FittingEventType.CALCULATION_2D_FINISHED, lambda data: self.complete2D(data)) + + # Options widget - route through mediator for consistency + self.event_bridge.connect_action( + self.options_widget.plot_signal, + FittingEventType.OPTIONS_UPDATE_REQUESTED + ) + self.event_mediator.subscribe(FittingEventType.OPTIONS_UPDATE_REQUESTED, lambda _: self.onOptionsUpdate()) + + # Options widget internal signals (kept direct - internal widget state) self.options_widget.txtMinRange.editingFinished.connect(self.options_widget.updateMinQ) self.options_widget.txtMaxRange.editingFinished.connect(self.options_widget.updateMaxQ) - # Signals from other widgets - self.communicate.customModelDirectoryChanged.connect(self.onCustomModelChange) - self.smearing_widget.smearingChangedSignal.connect(self.onSmearingOptionsUpdate) - self.polydispersity_widget.cmdFitSignal.connect(lambda: self.cmdFit.setEnabled(self.haveParamsToFit())) - self.polydispersity_widget.updateDataSignal.connect(lambda: self.updateData()) - self.polydispersity_widget.iterateOverModelSignal.connect(lambda: self.iterateOverModel(self.updateFunctionCaption)) - self.magnetism_widget.cmdFitSignal.connect(lambda: self.cmdFit.setEnabled(self.haveParamsToFit())) - self.magnetism_widget.updateDataSignal.connect(lambda: self.updateData()) - - # Communicator signal - self.communicate.updateModelCategoriesSignal.connect(self.onCategoriesChanged) - self.communicate.updateMaskedDataSignal.connect(self.onMaskedData) - - # Catch all key press events - self.keyPressedSignal.connect(self.onKey) + # ========== Child widget events (routed through mediator) ========== + + # Custom model directory changes + self.event_bridge.connect_signal( + self.communicate.customModelDirectoryChanged, + FittingEventType.CUSTOM_MODEL_CHANGED + ) + self.event_mediator.subscribe(FittingEventType.CUSTOM_MODEL_CHANGED, lambda _: self.onCustomModelChange()) + + # Smearing changes + self.event_bridge.connect_signal( + self.smearing_widget.smearingChangedSignal, + FittingEventType.SMEARING_OPTIONS_UPDATED + ) + self.event_mediator.subscribe(FittingEventType.SMEARING_OPTIONS_UPDATED, lambda _: self.onSmearingOptionsUpdate()) + + # Polydispersity widget events + self.event_bridge.connect_action( + self.polydispersity_widget.cmdFitSignal, + FittingEventType.FIT_ENABLEMENT_CHANGED + ) + self.event_bridge.connect_action( + self.polydispersity_widget.updateDataSignal, + FittingEventType.DATA_UPDATED + ) + self.event_bridge.connect_action( + self.polydispersity_widget.iterateOverModelSignal, + FittingEventType.MODEL_ITERATION_REQUESTED + ) + + # Magnetism widget events + self.event_bridge.connect_action( + self.magnetism_widget.cmdFitSignal, + FittingEventType.FIT_ENABLEMENT_CHANGED + ) + self.event_bridge.connect_action( + self.magnetism_widget.updateDataSignal, + FittingEventType.DATA_UPDATED + ) + + # Subscribe to aggregated events + self.event_mediator.subscribe(FittingEventType.FIT_ENABLEMENT_CHANGED, lambda _: self.cmdFit.setEnabled(self.haveParamsToFit())) + self.event_mediator.subscribe(FittingEventType.DATA_UPDATED, lambda _: self.updateData()) + self.event_mediator.subscribe(FittingEventType.MODEL_ITERATION_REQUESTED, lambda _: self.iterateOverModel(self.updateFunctionCaption)) + + # Model categories and masked data updates + self.event_bridge.connect_signal( + self.communicate.updateModelCategoriesSignal, + FittingEventType.MODEL_CATEGORIES_UPDATED + ) + self.event_bridge.connect_signal( + self.communicate.updateMaskedDataSignal, + FittingEventType.MASKED_DATA_UPDATED + ) + self.event_mediator.subscribe(FittingEventType.MODEL_CATEGORIES_UPDATED, lambda _: self.onCategoriesChanged()) + self.event_mediator.subscribe(FittingEventType.MASKED_DATA_UPDATED, lambda _: self.onMaskedData()) + + # Keyboard events + self.event_bridge.connect_signal( + self.keyPressedSignal, + FittingEventType.KEY_PRESSED + ) + self.event_mediator.subscribe(FittingEventType.KEY_PRESSED, lambda event: self.onKey(event)) def keyPressEvent(self, event: QtGui.QKeyEvent) -> None: super(FittingWidget, self).keyPressEvent(event) @@ -1117,7 +1272,7 @@ def paramHasConstraint(self, param: str | None = None) -> bool: if param not in self.allParamNames(): return False - for model_key in self.model_dict.keys(): + for model_key in self.model_dict: for row in range(self.model_dict[model_key].rowCount()): param_name = self.model_dict[model_key].item(row,0).text() if model_key == 'poly': @@ -1223,7 +1378,7 @@ def getConstraintsForAllModels(self) -> list[tuple[str, str]]: e.g. [('sld','5*sld_solvent')] """ params = [] - for model_key in self.model_dict.keys(): + for model_key in self.model_dict: model = self.model_dict[model_key] param_number = model.rowCount() if model_key == 'poly': @@ -1242,7 +1397,7 @@ def getComplexConstraintsForAllModels(self) -> list[tuple[str, str]]: for a given FitPage """ constraints = [] - for model_key in self.model_dict.keys(): + for model_key in self.model_dict: constraints += self.getComplexConstraintsForModel(model_key=model_key) return constraints @@ -1288,7 +1443,7 @@ def getConstraintObjectsForAllModels(self) -> list[Constraint]: Returns a list of the constraint object for a given FitPage """ constraints = [] - for model_key in self.model_dict.keys(): + for model_key in self.model_dict: constraints += self.getConstraintObjectsForModel(model_key=model_key) return constraints @@ -1308,7 +1463,7 @@ def getConstraintsForFitting(self) -> list[tuple[str, str]]: """ # Get constraints constraints = [] - for model_key in self.model_dict.keys(): + for model_key in self.model_dict: constraints += self.getComplexConstraintsForModel(model_key=model_key) # See if there are any constraints across models multi_constraints = [cons for cons in constraints if self.isConstraintMultimodel(cons[1])] @@ -1954,7 +2109,7 @@ def prepareFitters(self, fitter: Fit | None = None, fit_id: int = 0, weight_incr # Get the constraints. constraints = [] - for model_key in self.model_dict.keys(): + for model_key in self.model_dict: constraints += self.getComplexConstraintsForModel(model_key=model_key) if fitter is None: # For single fits - check for inter-model constraints @@ -3176,7 +3331,7 @@ def addExtraShells(self) -> None: # cell 5: SLD button item5 = QtGui.QStandardItem() button = None - for p in self.logic.kernel_module.params.keys(): + for p in self.logic.kernel_module.params: if re.search(r'^[\w]{0,3}sld.*[1-9]$', p): # Only display the SLD Profile button for models with SLD parameters button = QtWidgets.QPushButton() diff --git a/src/sas/qtgui/Perspectives/Fitting/UnitTesting/test_event_mediator.py b/src/sas/qtgui/Perspectives/Fitting/UnitTesting/test_event_mediator.py new file mode 100644 index 0000000000..464d94cc51 --- /dev/null +++ b/src/sas/qtgui/Perspectives/Fitting/UnitTesting/test_event_mediator.py @@ -0,0 +1,301 @@ +""" +Unit tests for EventMediator and EventBridge classes. +""" + +from unittest.mock import Mock + +from sas.qtgui.Perspectives.Fitting.EventMediator import EventBridge, EventMediator, FittingEventType + + +class TestEventMediator: + """Test cases for EventMediator class.""" + + def test_init(self): + """Test EventMediator initialization.""" + mediator = EventMediator() + assert mediator._subscribers == {} + assert mediator._event_history == [] + assert mediator._max_history_size == 100 + assert not mediator._logging_enabled + + def test_subscribe(self): + """Test subscribing to events.""" + mediator = EventMediator() + callback = Mock() + + # Subscribe to an event + mediator.subscribe(FittingEventType.FIT_REQUESTED, callback) + + assert FittingEventType.FIT_REQUESTED in mediator._subscribers + assert len(mediator._subscribers[FittingEventType.FIT_REQUESTED]) == 1 + assert mediator._subscribers[FittingEventType.FIT_REQUESTED][0] == (callback, False) + + def test_subscribe_weak_ref(self): + """Test subscribing with weak reference.""" + mediator = EventMediator() + callback = Mock() + + mediator.subscribe(FittingEventType.FIT_REQUESTED, callback, use_weak_ref=True) + + assert mediator._subscribers[FittingEventType.FIT_REQUESTED][0] == (callback, True) + + def test_unsubscribe(self): + """Test unsubscribing from events.""" + mediator = EventMediator() + callback1 = Mock() + callback2 = Mock() + + mediator.subscribe(FittingEventType.FIT_REQUESTED, callback1) + mediator.subscribe(FittingEventType.FIT_REQUESTED, callback2) + + # Unsubscribe one callback + mediator.unsubscribe(FittingEventType.FIT_REQUESTED, callback1) + + assert len(mediator._subscribers[FittingEventType.FIT_REQUESTED]) == 1 + assert mediator._subscribers[FittingEventType.FIT_REQUESTED][0] == (callback2, False) + + def test_publish_no_subscribers(self): + """Test publishing event with no subscribers.""" + mediator = EventMediator() + + # Should not raise any errors + mediator.publish(FittingEventType.FIT_REQUESTED, "test data") + + def test_publish_with_subscribers(self): + """Test publishing event to subscribers.""" + mediator = EventMediator() + callback1 = Mock() + callback2 = Mock() + + mediator.subscribe(FittingEventType.FIT_REQUESTED, callback1) + mediator.subscribe(FittingEventType.FIT_REQUESTED, callback2) + + test_data = {"param": "value"} + mediator.publish(FittingEventType.FIT_REQUESTED, test_data) + + callback1.assert_called_once_with(test_data) + callback2.assert_called_once_with(test_data) + + def test_publish_no_data(self): + """Test publishing event without data.""" + mediator = EventMediator() + callback = Mock() + + mediator.subscribe(FittingEventType.FIT_REQUESTED, callback) + mediator.publish(FittingEventType.FIT_REQUESTED) + + callback.assert_called_once_with(None) + + def test_publish_callback_error(self): + """Test that callback errors don't stop other callbacks.""" + mediator = EventMediator() + callback1 = Mock(side_effect=Exception("Test error")) + callback2 = Mock() + + mediator.subscribe(FittingEventType.FIT_REQUESTED, callback1) + mediator.subscribe(FittingEventType.FIT_REQUESTED, callback2) + + # Should not raise exception + mediator.publish(FittingEventType.FIT_REQUESTED, "data") + + callback1.assert_called_once_with("data") + callback2.assert_called_once_with("data") + + def test_clear_subscribers_specific_event(self): + """Test clearing subscribers for specific event.""" + mediator = EventMediator() + callback = Mock() + + mediator.subscribe(FittingEventType.FIT_REQUESTED, callback) + mediator.subscribe(FittingEventType.PLOT_REQUESTED, callback) + + mediator.clear_subscribers(FittingEventType.FIT_REQUESTED) + + assert len(mediator._subscribers[FittingEventType.FIT_REQUESTED]) == 0 + assert FittingEventType.PLOT_REQUESTED in mediator._subscribers + + def test_clear_subscribers_all(self): + """Test clearing all subscribers.""" + mediator = EventMediator() + callback = Mock() + + mediator.subscribe(FittingEventType.FIT_REQUESTED, callback) + mediator.subscribe(FittingEventType.PLOT_REQUESTED, callback) + + mediator.clear_subscribers() + + assert mediator._subscribers == {} + + def test_has_subscribers(self): + """Test checking if event has subscribers.""" + mediator = EventMediator() + + assert not mediator.has_subscribers(FittingEventType.FIT_REQUESTED) + + mediator.subscribe(FittingEventType.FIT_REQUESTED, Mock()) + assert mediator.has_subscribers(FittingEventType.FIT_REQUESTED) + + def test_get_subscriber_count(self): + """Test getting subscriber count.""" + mediator = EventMediator() + + assert mediator.get_subscriber_count(FittingEventType.FIT_REQUESTED) == 0 + + mediator.subscribe(FittingEventType.FIT_REQUESTED, Mock()) + mediator.subscribe(FittingEventType.FIT_REQUESTED, Mock()) + + assert mediator.get_subscriber_count(FittingEventType.FIT_REQUESTED) == 2 + + def test_event_history(self): + """Test event history recording.""" + mediator = EventMediator() + + mediator.publish(FittingEventType.FIT_REQUESTED, "data1") + mediator.publish(FittingEventType.PLOT_REQUESTED, "data2") + + history = mediator.get_event_history() + assert len(history) == 2 + assert history[0] == (FittingEventType.FIT_REQUESTED, "data1") + assert history[1] == (FittingEventType.PLOT_REQUESTED, "data2") + + def test_event_history_size_limit(self): + """Test event history size limit.""" + mediator = EventMediator() + mediator._max_history_size = 3 + + for i in range(5): + mediator.publish(FittingEventType.FIT_REQUESTED, f"data{i}") + + history = mediator.get_event_history() + assert len(history) == 3 + assert history[0] == (FittingEventType.FIT_REQUESTED, "data2") + + def test_clear_history(self): + """Test clearing event history.""" + mediator = EventMediator() + + mediator.publish(FittingEventType.FIT_REQUESTED, "data") + assert len(mediator.get_event_history()) == 1 + + mediator.clear_history() + assert len(mediator.get_event_history()) == 0 + + def test_enable_logging(self): + """Test enabling/disabling logging.""" + mediator = EventMediator() + + mediator.enable_logging(True) + assert mediator._logging_enabled + + mediator.enable_logging(False) + assert not mediator._logging_enabled + + +class TestEventBridge: + """Test cases for EventBridge class.""" + + def test_init(self): + """Test EventBridge initialization.""" + mediator = EventMediator() + bridge = EventBridge(mediator) + assert bridge.mediator is mediator + + def test_connect_signal_single_arg(self): + """Test connecting signal with single argument.""" + mediator = EventMediator() + bridge = EventBridge(mediator) + callback = Mock() + + mediator.subscribe(FittingEventType.FIT_REQUESTED, callback) + + # Mock Qt signal + signal = Mock() + bridge.connect_signal(signal, FittingEventType.FIT_REQUESTED) + + # Emit signal + signal.connect.assert_called_once() + # Get the connected function + connected_func = signal.connect.call_args[0][0] + + # Call the connected function + connected_func("test_data") + + callback.assert_called_once_with("test_data") + + def test_connect_signal_multiple_args(self): + """Test connecting signal with multiple arguments.""" + mediator = EventMediator() + bridge = EventBridge(mediator) + callback = Mock() + + mediator.subscribe(FittingEventType.FIT_REQUESTED, callback) + + signal = Mock() + bridge.connect_signal(signal, FittingEventType.FIT_REQUESTED) + + connected_func = signal.connect.call_args[0][0] + connected_func("arg1", "arg2") + + callback.assert_called_once_with(("arg1", "arg2")) + + def test_connect_signal_no_args(self): + """Test connecting signal with no arguments.""" + mediator = EventMediator() + bridge = EventBridge(mediator) + callback = Mock() + + mediator.subscribe(FittingEventType.FIT_REQUESTED, callback) + + signal = Mock() + bridge.connect_signal(signal, FittingEventType.FIT_REQUESTED) + + connected_func = signal.connect.call_args[0][0] + connected_func() + + callback.assert_called_once_with(None) + + def test_connect_signal_with_transform(self): + """Test connecting signal with data transformation.""" + mediator = EventMediator() + bridge = EventBridge(mediator) + callback = Mock() + + mediator.subscribe(FittingEventType.FIT_REQUESTED, callback) + + signal = Mock() + def transform(x): + return f"transformed_{x}" + bridge.connect_signal(signal, FittingEventType.FIT_REQUESTED, transform) + + connected_func = signal.connect.call_args[0][0] + connected_func("input") + + callback.assert_called_once_with("transformed_input") + + def test_connect_action(self): + """Test connecting action signal.""" + mediator = EventMediator() + bridge = EventBridge(mediator) + callback = Mock() + + mediator.subscribe(FittingEventType.FIT_REQUESTED, callback) + + signal = Mock() + bridge.connect_action(signal, FittingEventType.FIT_REQUESTED) + + connected_func = signal.connect.call_args[0][0] + connected_func() + + callback.assert_called_once_with(None) + + def test_publish_to_signal(self): + """Test publishing event to Qt signal.""" + mediator = EventMediator() + bridge = EventBridge(mediator) + + signal = Mock() + bridge.publish_to_signal(FittingEventType.FIT_REQUESTED, signal) + + mediator.publish(FittingEventType.FIT_REQUESTED, "test_data") + + signal.emit.assert_called_once_with("test_data") diff --git a/src/sas/qtgui/Perspectives/Fitting/UnitTesting/test_fitting_widget_events.py b/src/sas/qtgui/Perspectives/Fitting/UnitTesting/test_fitting_widget_events.py new file mode 100644 index 0000000000..b4ef11da06 --- /dev/null +++ b/src/sas/qtgui/Perspectives/Fitting/UnitTesting/test_fitting_widget_events.py @@ -0,0 +1,165 @@ +""" +Unit tests for FittingWidget event mediator and bridge functionality. +""" +from unittest.mock import Mock + +import pytest + +from sas.qtgui.Perspectives.Fitting.EventMediator import FittingEventType + + +@pytest.fixture +def mock_parent(): + """Mock parent widget.""" + return Mock() + + +""" +Unit tests for FittingWidget event mediator and bridge functionality. +""" +@pytest.fixture +def mock_parent_widget(): + """Mock parent widget.""" + return Mock() + + +def test_event_mediator_creation(): + """Test that EventMediator can be created and used.""" + from sas.qtgui.Perspectives.Fitting.EventMediator import EventMediator + + # Create mediator + mediator = EventMediator() + + # Test basic functionality + assert mediator is not None + assert hasattr(mediator, 'publish') + assert hasattr(mediator, 'subscribe') + assert hasattr(mediator, 'has_subscribers') + + +def test_event_bridge_creation(): + """Test that EventBridge can be created.""" + from sas.qtgui.Perspectives.Fitting.EventMediator import EventBridge, EventMediator + + # Create mediator and bridge + mediator = EventMediator() + bridge = EventBridge(mediator) + + # Test basic functionality + assert bridge is not None + assert bridge.mediator is mediator + assert hasattr(bridge, 'connect_signal') + assert hasattr(bridge, 'connect_action') + + +def test_event_mediator_publish_subscribe(): + """Test that EventMediator can publish and subscribe to events.""" + from sas.qtgui.Perspectives.Fitting.EventMediator import EventMediator + + mediator = EventMediator() + callback_called = [] + + def test_callback(data=None): + callback_called.append(data) + + # Subscribe to an event + mediator.subscribe(FittingEventType.FIT_REQUESTED, test_callback) + + # Check that we have subscribers + assert mediator.has_subscribers(FittingEventType.FIT_REQUESTED) + + # Publish the event + mediator.publish(FittingEventType.FIT_REQUESTED, "test_data") + + # Check that callback was called + assert len(callback_called) == 1 + assert callback_called[0] == "test_data" + + +def test_event_bridge_connect_signal(): + """Test that EventBridge can connect signals.""" + from sas.qtgui.Perspectives.Fitting.EventMediator import EventBridge, EventMediator + + mediator = EventMediator() + bridge = EventBridge(mediator) + + # Mock signal + mock_signal = Mock() + + # Connect signal + bridge.connect_signal(mock_signal, FittingEventType.FIT_REQUESTED) + + # Check that signal.connect was called + mock_signal.connect.assert_called_once() + + +def test_event_bridge_connect_action(): + """Test that EventBridge can connect actions.""" + from sas.qtgui.Perspectives.Fitting.EventMediator import EventBridge, EventMediator + + mediator = EventMediator() + bridge = EventBridge(mediator) + + # Mock action (like a button click) + mock_action = Mock() + + # Connect action + bridge.connect_action(mock_action, FittingEventType.PLOT_REQUESTED) + + # Check that action.connect was called + mock_action.connect.assert_called_once() + + +def test_multiple_subscribers(): + """Test that multiple subscribers can be registered for the same event.""" + from sas.qtgui.Perspectives.Fitting.EventMediator import EventMediator + + mediator = EventMediator() + callback1_called = [] + callback2_called = [] + + def callback1(data=None): + callback1_called.append(data) + + def callback2(data=None): + callback2_called.append(data) + + # Subscribe both callbacks + mediator.subscribe(FittingEventType.MODEL_SELECTED, callback1) + mediator.subscribe(FittingEventType.MODEL_SELECTED, callback2) + + # Publish event + mediator.publish(FittingEventType.MODEL_SELECTED, "model_data") + + # Check both callbacks were called + assert len(callback1_called) == 1 + assert len(callback2_called) == 1 + assert callback1_called[0] == "model_data" + assert callback2_called[0] == "model_data" + + +def test_event_types_defined(): + """Test that all expected event types are defined.""" + # Check that key event types exist + assert hasattr(FittingEventType, 'FIT_REQUESTED') + assert hasattr(FittingEventType, 'PLOT_REQUESTED') + assert hasattr(FittingEventType, 'HELP_REQUESTED') + assert hasattr(FittingEventType, 'MODEL_SELECTED') + assert hasattr(FittingEventType, 'CATEGORY_SELECTED') + assert hasattr(FittingEventType, 'STRUCTURE_FACTOR_SELECTED') + assert hasattr(FittingEventType, 'BATCH_FILE_SELECTED') + assert hasattr(FittingEventType, 'VIEW_2D_TOGGLED') + assert hasattr(FittingEventType, 'POLYDISPERSITY_TOGGLED') + assert hasattr(FittingEventType, 'MAGNETISM_TOGGLED') + assert hasattr(FittingEventType, 'CHAIN_FIT_TOGGLED') + assert hasattr(FittingEventType, 'PARAMS_CHANGED') + assert hasattr(FittingEventType, 'SELECTION_CHANGED') + assert hasattr(FittingEventType, 'FIT_ENABLEMENT_CHANGED') + assert hasattr(FittingEventType, 'DATA_UPDATED') + assert hasattr(FittingEventType, 'MODEL_ITERATION_REQUESTED') + assert hasattr(FittingEventType, 'OPTIONS_UPDATE_REQUESTED') + assert hasattr(FittingEventType, 'CUSTOM_MODEL_CHANGED') + assert hasattr(FittingEventType, 'SMEARING_OPTIONS_UPDATED') + assert hasattr(FittingEventType, 'MODEL_CATEGORIES_UPDATED') + assert hasattr(FittingEventType, 'MASKED_DATA_UPDATED') + assert hasattr(FittingEventType, 'KEY_PRESSED')