diff --git a/cads_adaptors/adaptors/mars.py b/cads_adaptors/adaptors/mars.py index 4769a4fd..56490df5 100644 --- a/cads_adaptors/adaptors/mars.py +++ b/cads_adaptors/adaptors/mars.py @@ -1,7 +1,32 @@ +""" +MARS adaptor for CDS data retrieval. + +IMPORTANT ARCHITECTURAL NOTE: +This module is imported by multiple services: +1. Data retrieval workers - actually execute MARS requests +2. API services - use adaptors for constraint validation, mapping, schema +3. Catalogue services - metadata and configuration management + +To avoid forcing ALL services to have cads-mars-server installed, we follow +these import guidelines: + +1. Module-level imports: Only cads_adaptors dependencies and standard library +2. Configuration: Read from environment variables (no cads-mars-server dependency) +3. Client imports: Encapsulated INSIDE execute_mars_pipe() and execute_mars_shares() + - from cads_mars_server import client (pipe mode) + - from cads_mars_server.ws_client import mars_via_ws_sync (shares mode) + +This way, services that only validate constraints or manage mappings can import +this module without needing the full MARS client infrastructure installed. +""" + import os import pathlib import time from typing import Any +from urllib.parse import urlparse, urlunparse + +from packaging import version from cads_adaptors.adaptors import Context, Request from cads_adaptors.adaptors.cds import ( @@ -22,6 +47,110 @@ ) from cads_adaptors.tools.simulate_preinterpolation import simulate_preinterpolation +# Configuration for MARS client selection and ports +# These are read from environment variables to avoid forcing a dependency on +# cads-mars-server for services that only use adaptors for constraints/mappings. +# The actual client imports (mars_client, ws_client) are deferred until needed +# inside execute_mars_pipe() and execute_mars_shares(). +DEFAULT_PIPE_PORT = int(os.getenv("MARS_PIPE_PORT", "9000")) +DEFAULT_SHARES_PORT = int(os.getenv("MARS_SHARES_PORT", "9001")) + + +def _check_cads_mars_server_supports_websocket() -> bool: + """ + Check if cads-mars-server is installed and supports WebSocket mode. + + WebSocket mode requires cads-mars-server >= 0.3.0. + + This function handles: + - Module not installed (returns False) + - Released versions (e.g., "0.3.0", "0.3.1") + - Development versions from branches (e.g., "0.3.0.dev1+g1234567") + - Missing __version__ attribute (returns False) + + For development versions, we check the base version (major.minor.patch) + ignoring pre-release and local version identifiers. This ensures that + builds from the websocketmars branch (which will have 0.3.0.devN versions) + are recognized as supporting WebSocket features. + + Environment variable override: + - Set MARS_FORCE_WEBSOCKET_MODE=true to bypass version checking + (useful for testing with development installations) + + Returns: + True if cads-mars-server >= 0.3.0 is available, False otherwise + """ + # Check for force override (for testing/development) + if os.getenv("MARS_FORCE_WEBSOCKET_MODE", "false").lower() == "true": + return True + + try: + import cads_mars_server + + # Check if __version__ attribute exists + if not hasattr(cads_mars_server, "__version__"): + # Missing version info - assume old version, fall back to pipe + return False + + installed_version = version.parse(cads_mars_server.__version__) + required_version = version.parse("0.3.0") + + # For development versions like "0.3.0.dev1+g1234567", the base_version + # gives us "0.3.0" which we can compare. This ensures branch builds + # are recognized as having the features. + if hasattr(installed_version, "base_version"): + base = version.parse(installed_version.base_version) + return base >= required_version + else: + # Fallback for older packaging versions + return installed_version >= required_version + + except ImportError: + # cads-mars-server not installed + return False + except (AttributeError, ValueError, TypeError) as e: + # AttributeError: __version__ doesn't exist (caught above, but defensive) + # ValueError: version string can't be parsed + # TypeError: __version__ is not a string + return False + except Exception: + # Any other unexpected error - fall back to pipe mode + return False + + +def _should_use_websocket_mode() -> bool: + """ + Determine if WebSocket mode should be used based on configuration and version. + + WebSocket mode is enabled when BOTH conditions are met: + 1. MARS_USE_SHARES environment variable is set to "true" + 2. cads-mars-server >= 0.3.0 is installed + + This ensures backward compatibility: older cads-mars-server versions will + fall back to pipe mode even if MARS_USE_SHARES=true is set. + + Returns: + True if WebSocket mode should be used, False for pipe mode + """ + use_shares_env = os.getenv("MARS_USE_SHARES", "false").lower() == "true" + + if not use_shares_env: + # MARS_USE_SHARES not set or false - use pipe mode + return False + + # MARS_USE_SHARES is true - check if server supports WebSocket + if not _check_cads_mars_server_supports_websocket(): + # WebSocket requested but not supported - log warning and fall back + # Note: This will be logged in execute_mars() when context is available + return False + + return True + + +# Determine at module load time which mode to use +# This value is read by execute_mars() to select the client +USE_SHARES = _should_use_websocket_mode() + # This hard requirement of MARS requests should be moved to the proxy MARS client ALWAYS_SPLIT_ON: list[str] = [ "class", @@ -61,92 +190,44 @@ def get_mars_server_list(config) -> list[str]: ) return mars_servers - -def execute_mars( - request: dict[str, Any] | list[dict[str, Any]], - context: Context = Context(), - config: dict[str, Any] = dict(), - mapping: dict[str, Any] = dict(), - target_fname: str = "data.grib", - target_dir: str | pathlib.Path = "", -) -> str: - from cads_mars_server import client as mars_client - - requests = ensure_list(request) - # Implement embargo if it is set in the config - # This is now done in normalize request, but leaving it here for now, as running twice is not a problem - # and the some adaptors may not use normalise_request yet - if config.get("embargo") is not None: - requests, _cacheable = implement_embargo(requests, config["embargo"]) - - target = str(pathlib.Path(target_dir) / target_fname) - - split_on_keys = ALWAYS_SPLIT_ON + ensure_list(config.get("split_on", [])) - requests = split_requests_on_keys(requests, split_on_keys, context, mapping) - - mars_servers = get_mars_server_list(config) - - cluster = mars_client.RemoteMarsClientCluster(urls=mars_servers, log=context) - - # Add required fields to the env dictionary: - env = { - "user_id": config.get("user_uid"), - "request_id": config.get("request_uid"), - "namespace": ( - f"{os.getenv('OPENSTACK_PROJECT', 'NO-OSPROJECT')}:" - f"{os.getenv('RUNTIME_NAMESPACE', 'NO-NAMESPACE')}" - ), - "host": os.getenv("HOSTNAME"), - } - env["username"] = str(env["namespace"]) + ":" + str(env["user_id"]).split("-")[-1] - time0 = time.time() - context.info(f"Request sent to proxy MARS client: {requests}") - reply = cluster.execute(requests, env, target) - reply_message = str(reply.message) - delta_time = time.time() - time0 - if os.path.exists(target): - filesize = os.path.getsize(target) - context.info( - f"The MARS Request produced a target " - f"(filesize={filesize * 1e-6} Mb, delta_time= {delta_time:.2f} seconds).", - delta_time=delta_time, - filesize=filesize, - ) - else: - filesize = 0 - context.info( - f"The MARS request produced no target (delta_time= {delta_time:.2f} seconds).", - delta_time=delta_time, - ) - - context.debug(message=reply_message) - - if reply.error: - error_lines = "\n".join( - [message for message in reply_message.split("\n") if "ERROR" in message] - ) - error_message = ( - "MARS has returned an error, please check your selection.\n" - f"Request submitted to the MARS server:\n{requests}\n" - f"Full error message:\n{error_lines}\n" - ) - context.add_user_visible_error(message=error_message) - - error_message += f"Exception: {reply.error}\n" - raise MarsRuntimeError(error_message) - - if not filesize: - error_message = ( - "MARS returned no data, please check your selection." - f"Request submitted to the MARS server:\n{requests}\n" - ) - context.add_user_visible_error( - message=error_message, - ) - raise MarsNoDataError(error_message) - - return target - +def get_mars_server_list_ws(config) -> list[str]: + """ + Convert HTTP pipe server URLs to WebSocket shares server URLs. + + Properly parses URLs and converts: + - http:// -> ws:// + - https:// -> wss:// + - port 9000 -> port 9001 (or configured ports) + + Args: + config: Configuration dictionary + + Returns: + List of WebSocket server URLs + """ + http_servers = get_mars_server_list(config) + ws_servers = [] + + for server in http_servers: + parsed = urlparse(server) + + # Convert HTTP scheme to WebSocket scheme + scheme = "wss" if parsed.scheme == "https" else "ws" + + # Convert port if it matches the default pipe port + netloc = parsed.netloc + if parsed.port == DEFAULT_PIPE_PORT: + netloc = netloc.replace( + f":{DEFAULT_PIPE_PORT}", f":{DEFAULT_SHARES_PORT}" + ) + elif parsed.port is None: + # No port specified, add the shares port + netloc = f"{parsed.hostname}:{DEFAULT_SHARES_PORT}" + + ws_url = urlunparse((scheme, netloc, parsed.path, "", "", "")) + ws_servers.append(ws_url) + + return ws_servers def minimal_mars_schema( allow_duplicate_values_keys=None, @@ -223,6 +304,304 @@ def minimal_mars_schema( return schema +def make_env_dict(config: dict[str, Any]) -> dict[str, Any]: + # Add required fields to the env dictionary: + env = { + "user_id": config.get("user_uid"), + "request_id": config.get("request_uid"), + "namespace": ( + f"{os.getenv('OPENSTACK_PROJECT', 'NO-OSPROJECT')}:" + f"{os.getenv('RUNTIME_NAMESPACE', 'NO-NAMESPACE')}" + ), + "host": os.getenv("HOSTNAME"), + } + env["username"] = str(env["namespace"]) + ":" + str(env["user_id"]).split("-")[-1] + return env + + +def _mars_common_output(target, requests, reply, reply_message, context, time0): + delta_time = time.time() - time0 + if os.path.exists(target): + filesize = os.path.getsize(target) + context.info( + f"The MARS Request produced a target " + f"(filesize={filesize / 1024 ** 2} MB, delta_time= {delta_time:.2f} seconds).", + delta_time=delta_time, + filesize=filesize, + ) + else: + filesize = 0 + context.info( + f"The MARS request produced no target (delta_time= {delta_time:.2f} seconds).", + delta_time=delta_time, + ) + + context.debug(message=reply_message) + + if reply.error: + error_lines = "\n".join( + [message for message in reply_message.split("\n") if "ERROR" in message] + ) + error_message = ( + "MARS has returned an error, please check your selection.\n" + f"Request submitted to the MARS server:\n{requests}\n" + f"Full error message:\n{error_lines}\n" + ) + context.add_user_visible_error(message=error_message) + + error_message += f"Exception: {reply.error}\n" + raise MarsRuntimeError(error_message) + + if not filesize: + error_message = ( + "MARS returned no data, please check your selection." + f"Request submitted to the MARS server:\n{requests}\n" + ) + context.add_user_visible_error( + message=error_message, + ) + raise MarsNoDataError(error_message) + + +def _prepare_mars_request( + request: dict[str, Any] | list[dict[str, Any]], + context: Context, + config: dict[str, Any], + mapping: dict[str, Any], + target_fname: str, + target_dir: str | pathlib.Path, +) -> tuple[list[dict[str, Any]], str, dict[str, Any]]: + """ + Common preparation logic for both pipe and shares MARS implementations. + + Args: + request: Single request dict or list of request dicts + context: Context object for logging + config: Configuration dictionary + mapping: Mapping dictionary for field transformations + target_fname: Target filename + target_dir: Target directory path + + Returns: + Tuple of (processed_requests, target_path, environment_dict) + """ + requests = ensure_list(request) + + # Implement embargo if configured + # Note: This is also done in normalise_request, but kept here for adaptors + # that may not use normalise_request yet. Running twice is safe. + if config.get("embargo") is not None: + requests, _cacheable = implement_embargo(requests, config["embargo"]) + + target = str(pathlib.Path(target_dir) / target_fname) + + split_on_keys = ALWAYS_SPLIT_ON + ensure_list(config.get("split_on", [])) + requests = split_requests_on_keys(requests, split_on_keys, context, mapping) + + # Add required fields to the env dictionary + env = make_env_dict(config) + + return requests, target, env + + +def execute_mars_pipe( + request: dict[str, Any] | list[dict[str, Any]], + context: Context = Context(), + config: dict[str, Any] = dict(), + mapping: dict[str, Any] = dict(), + target_fname: str = "data.grib", + target_dir: str | pathlib.Path = "", +) -> str: + """ + Execute MARS request using pipe-based client. + + Args: + request: MARS request(s) + context: Context for logging + config: Configuration dictionary + mapping: Field mapping dictionary + target_fname: Output filename + target_dir: Output directory + + Returns: + Path to output file + """ + from cads_mars_server import client as mars_client + + requests, target, env = _prepare_mars_request( + request, context, config, mapping, target_fname, target_dir + ) + + mars_servers = get_mars_server_list(config) + cluster = mars_client.RemoteMarsClientCluster(urls=mars_servers, log=context) + + time0 = time.time() + context.info(f"Request sent to proxy MARS client: {requests}") + reply = cluster.execute(requests, env, target) + reply_message = str(reply.message) + + _mars_common_output(target, requests, reply, reply_message, context, time0) + + return target + + +def execute_mars_shares( + request: dict[str, Any] | list[dict[str, Any]], + context: Context = Context(), + config: dict[str, Any] = dict(), + mapping: dict[str, Any] = dict(), + target_fname: str = "data.grib", + target_dir: str | pathlib.Path = "", + log_handler: Any = None, +) -> str: + """ + Execute MARS request using WebSocket-based shares client. + + This implementation uses shared filesystem access, where the server + writes directly to the shared filesystem and the client monitors progress. + + Args: + request: MARS request(s) + context: Context for logging + config: Configuration dictionary + mapping: Field mapping dictionary + target_fname: Output filename + target_dir: Output directory + log_handler: Optional custom log handler for parsing MARS output. + Handler receives (line, ws, logger) and returns formatted + line or None to suppress. Can raise exceptions to abort. + + Returns: + Path to output file + """ + from cads_mars_server.ws_client import mars_via_ws_sync as mars_client + + requests, target, env = _prepare_mars_request( + request, context, config, mapping, target_fname, target_dir + ) + + # Ensure target directory is accessible by MARS server + if target_dir: + os.chmod(target_dir, 0o777) + + mars_servers = get_mars_server_list_ws(config) + + time0 = time.time() + context.info(f"Request(s) sent to proxy MARS client: {requests}") + + reply = mars_client( + mars_servers, + requests, + env, + target=str(target), + logger=context, + log_handler=log_handler, + ) + + reply_message = str(reply.message) + + _mars_common_output(target, requests, reply, reply_message, context, time0) + + return target + + +def execute_mars( + request: dict[str, Any] | list[dict[str, Any]], + context: Context = Context(), + config: dict[str, Any] = dict(), + mapping: dict[str, Any] = dict(), + target_fname: str = "data.grib", + target_dir: str | pathlib.Path = "", + log_handler: Any = None, +) -> str: + """ + Execute MARS request using the configured client (pipe or shares). + + The client selection requires BOTH conditions: + 1. MARS_USE_SHARES environment variable set to "true" + 2. cads-mars-server >= 0.3.0 installed + + If MARS_USE_SHARES is true but cads-mars-server < 0.3.0, falls back to pipe mode + with a warning. This ensures backward compatibility with older deployments. + + Args: + request: MARS request(s) + context: Context for logging + config: Configuration dictionary + mapping: Field mapping dictionary + target_fname: Output filename + target_dir: Output directory + log_handler: Optional custom log handler for parsing MARS output. + Handler receives (line, ws, logger) and returns formatted + line or None to suppress. Can raise exceptions to abort. + Only used when USE_SHARES=True (WebSocket client). + + Returns: + Path to output file + """ + # Check if WebSocket was requested but not available + use_shares_env = os.getenv("MARS_USE_SHARES", "false").lower() == "true" + + if USE_SHARES: + # Get version info for logging + force_mode = os.getenv("MARS_FORCE_WEBSOCKET_MODE", "false").lower() == "true" + + try: + import cads_mars_server + if hasattr(cads_mars_server, "__version__"): + version_str = cads_mars_server.__version__ + mode_info = f"cads-mars-server {version_str}" + else: + mode_info = "cads-mars-server (version unknown)" + + if force_mode: + mode_info += " [FORCED MODE]" + + context.info( + f"Using MARS Shares (WebSocket) client for MARS retrievals ({mode_info})." + ) + except ImportError: + mode_info = "FORCED MODE" if force_mode else "" + context.info(f"Using MARS Shares (WebSocket) client for MARS retrievals. {mode_info}".strip()) + + return execute_mars_shares( + request, + context=context, + config=config, + mapping=mapping, + target_fname=target_fname, + target_dir=target_dir, + log_handler=log_handler, + ) + else: + # Log reason for using pipe mode + if use_shares_env: + # MARS_USE_SHARES was set but we're using pipe mode - explain why + try: + import cads_mars_server + version_str = cads_mars_server.__version__ + context.add_user_visible_log( + f"WebSocket mode requested (MARS_USE_SHARES=true) but cads-mars-server " + f"version {version_str} < 0.3.0. Falling back to pipe mode. " + f"Upgrade to cads-mars-server >= 0.3.0 to use WebSocket features." + ) + except ImportError: + context.add_user_visible_log( + "WebSocket mode requested (MARS_USE_SHARES=true) but cads-mars-server " + "is not installed. Falling back to pipe mode." + ) + + context.info("Using MARS Pipe client for MARS retrievals.") + return execute_mars_pipe( + request, + context=context, + config=config, + mapping=mapping, + target_fname=target_fname, + target_dir=target_dir, + ) + + class DirectMarsCdsAdaptor(AbstractCdsAdaptor): resources = {"MARS_CLIENT": 1} diff --git a/tests/test_15_mars.py b/tests/test_15_mars.py index fb764a62..2e488538 100644 --- a/tests/test_15_mars.py +++ b/tests/test_15_mars.py @@ -241,3 +241,85 @@ def _check_schema_pass(req_in, req_out, **schema_options): ) req_mod = adp.normalise_request(req_in) assert req_mod == req_out + + +def test_version_check_logic(): + """Test version checking logic for cads-mars-server compatibility.""" + from packaging import version + + # Test cases: (version_string, should_support_websocket, description) + test_cases = [ + ('0.3.0', True, 'Exact match - release version'), + ('0.3.1', True, 'Newer patch version'), + ('0.4.0', True, 'Newer minor version'), + ('1.0.0', True, 'Major version upgrade'), + ('0.3.0.dev1+g1234567', True, 'Dev version from websocketmars branch'), + ('0.3.0rc1', True, 'Release candidate'), + ('0.2.9', False, 'Older version'), + ('0.2.5.1', False, 'Current old version'), + ] + + required_version = version.parse('0.3.0') + + for version_str, expected_result, description in test_cases: + parsed_version = version.parse(version_str) + + # Replicate the logic from _check_cads_mars_server_supports_websocket() + if hasattr(parsed_version, 'base_version'): + base = version.parse(parsed_version.base_version) + result = base >= required_version + else: + result = parsed_version >= required_version + + assert result == expected_result, ( + f"Version check failed for {version_str} ({description}): " + f"expected {expected_result}, got {result}" + ) + + +def test_version_check_with_mock(monkeypatch): + """Test the actual version checking functions with mocked cads-mars-server.""" + + # Test 1: Module not installed + def mock_import_fail(name, *args, **kwargs): + if name == 'cads_mars_server': + raise ImportError("No module named 'cads_mars_server'") + return __import__(name, *args, **kwargs) + + # Note: We can't easily monkeypatch __import__ in a way that works with + # the function definition at module load time, so instead we test the + # logic directly by checking the version parsing behavior + + # Test 2: Check that base_version attribute exists + from packaging import version + dev_version = version.parse('0.3.0.dev1+g1234567') + assert hasattr(dev_version, 'base_version') + assert dev_version.base_version == '0.3.0' + + # Test 3: Verify old version is rejected + old_version = version.parse('0.2.5.1') + assert version.parse(old_version.base_version) < version.parse('0.3.0') + + +def test_should_use_websocket_mode(monkeypatch): + """Test _should_use_websocket_mode function behavior with different env vars.""" + + # Test 1: MARS_USE_SHARES not set (or False) + monkeypatch.delenv('MARS_USE_SHARES', raising=False) + # Re-import to get fresh module state + import importlib + importlib.reload(mars) + assert mars.USE_SHARES is False, "USE_SHARES should be False when env var not set" + + # Test 2: MARS_USE_SHARES=false explicitly + monkeypatch.setenv('MARS_USE_SHARES', 'false') + importlib.reload(mars) + assert mars.USE_SHARES is False, "USE_SHARES should be False when env var is 'false'" + + # Test 3: MARS_USE_SHARES=true but need to check version + # (actual behavior depends on installed cads-mars-server version) + monkeypatch.setenv('MARS_USE_SHARES', 'true') + importlib.reload(mars) + # USE_SHARES will be True or False depending on installed version + # We just verify it's a boolean + assert isinstance(mars.USE_SHARES, bool), "USE_SHARES should be a boolean"