From e7b06ea59c3e9cfe119088c4632e7163ca2b1c93 Mon Sep 17 00:00:00 2001 From: Saadat Ali Date: Tue, 23 Sep 2025 10:08:10 +0100 Subject: [PATCH 1/3] Update parallel.py Add Azure ML compatibility to ParallelRunner via distributed env var support (RANK/WORLD_SIZE) and env:// init method --- src/anemoi/inference/runners/parallel.py | 49 +++++++++++++++++++----- 1 file changed, 39 insertions(+), 10 deletions(-) diff --git a/src/anemoi/inference/runners/parallel.py b/src/anemoi/inference/runners/parallel.py index 2dce2d4b1..1715c75db 100644 --- a/src/anemoi/inference/runners/parallel.py +++ b/src/anemoi/inference/runners/parallel.py @@ -259,6 +259,17 @@ def _bootstrap_processes(self) -> None: LOG.warning( f"world size ({self.config.world_size}) set in the config is ignored because we are launching via srun, using 'SLURM_NTASKS' instead" ) + elif 'RANK' in os.environ and 'WORLD_SIZE' in os.environ: + # New branch for Azure ML / general distributed env (e.g., env:// mode) + self.global_rank = int(os.environ['RANK']) + self.local_rank = int(os.environ.get('LOCAL_RANK', self.global_rank)) # Fallback to global if LOCAL_RANK unset + self.world_size = int(os.environ['WORLD_SIZE']) + self.master_addr = os.environ.get('MASTER_ADDR') + self.master_port = os.environ.get('MASTER_PORT') + if self.master_addr is None or self.master_port is None: + raise ValueError("MASTER_ADDR and MASTER_PORT must be set for distributed initialization (e.g., in Azure ML)") + if self.config.world_size != 1 and self.config.world_size != self.world_size: + LOG.warning(f"Config world_size ({self.config.world_size}) ignored; using WORLD_SIZE from environment ({self.world_size})") else: # If srun is not available, spawn procs manually on a node @@ -358,19 +369,29 @@ def _init_parallel(self) -> dist.ProcessGroup | None: else: backend = "gloo" - dist.init_process_group( - backend=backend, - init_method=f"tcp://{self.master_addr}:{self.master_port}", - timeout=datetime.timedelta(minutes=3), - world_size=self.world_size, - rank=self.global_rank, - ) + if backend == "mpi": + # MPI backend: No init_method or explicit sizes needed + dist.init_process_group(backend="mpi") + model_comm_group_ranks = np.arange(self.world_size, dtype=int) + model_comm_group = dist.new_group(model_comm_group_ranks) + else: + if self._using_distributed_env(): + init_method = "env://" # Azure ML recommended + else: + init_method = f"tcp://{self.master_addr}:{self.master_port}" + dist.init_process_group( + backend=backend, + init_method=init_method, + timeout=datetime.timedelta(minutes=3), + world_size=self.world_size, + rank=self.global_rank, + ) + model_comm_group_ranks = np.arange(self.world_size, dtype=int) + model_comm_group = dist.new_group(model_comm_group_ranks) LOG.info(f"Creating a model communication group with {self.world_size} devices with the {backend} backend") - - model_comm_group_ranks = np.arange(self.world_size, dtype=int) - model_comm_group = dist.new_group(model_comm_group_ranks) else: model_comm_group = None + LOG.warning("ParallelRunner selected but world size of 1 detected") return model_comm_group @@ -387,3 +408,11 @@ def _get_parallel_info_from_slurm(self) -> tuple[int, int, int]: world_size = int(os.environ.get("SLURM_NTASKS", 1)) # Total number of processes return global_rank, local_rank, world_size + + def _using_distributed_env(self) -> bool: + """Checks for distributed env vars like those in Azure ML.""" + return 'RANK' in os.environ and 'WORLD_SIZE' in os.environ + + def _is_mpi_env(self) -> bool: + """Detects common MPI implementations (optional, for generality).""" + return 'OMPI_COMM_WORLD_SIZE' in os.environ or 'PMI_SIZE' in os.environ \ No newline at end of file From a07843406183bbca1e88ca41b4d4125452da522e Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 23 Sep 2025 09:32:22 +0000 Subject: [PATCH 2/3] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/anemoi/inference/runners/parallel.py | 26 +++++++++++++++--------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/src/anemoi/inference/runners/parallel.py b/src/anemoi/inference/runners/parallel.py index 1715c75db..4c474dd3d 100644 --- a/src/anemoi/inference/runners/parallel.py +++ b/src/anemoi/inference/runners/parallel.py @@ -259,17 +259,23 @@ def _bootstrap_processes(self) -> None: LOG.warning( f"world size ({self.config.world_size}) set in the config is ignored because we are launching via srun, using 'SLURM_NTASKS' instead" ) - elif 'RANK' in os.environ and 'WORLD_SIZE' in os.environ: + elif "RANK" in os.environ and "WORLD_SIZE" in os.environ: # New branch for Azure ML / general distributed env (e.g., env:// mode) - self.global_rank = int(os.environ['RANK']) - self.local_rank = int(os.environ.get('LOCAL_RANK', self.global_rank)) # Fallback to global if LOCAL_RANK unset - self.world_size = int(os.environ['WORLD_SIZE']) - self.master_addr = os.environ.get('MASTER_ADDR') - self.master_port = os.environ.get('MASTER_PORT') + self.global_rank = int(os.environ["RANK"]) + self.local_rank = int( + os.environ.get("LOCAL_RANK", self.global_rank) + ) # Fallback to global if LOCAL_RANK unset + self.world_size = int(os.environ["WORLD_SIZE"]) + self.master_addr = os.environ.get("MASTER_ADDR") + self.master_port = os.environ.get("MASTER_PORT") if self.master_addr is None or self.master_port is None: - raise ValueError("MASTER_ADDR and MASTER_PORT must be set for distributed initialization (e.g., in Azure ML)") + raise ValueError( + "MASTER_ADDR and MASTER_PORT must be set for distributed initialization (e.g., in Azure ML)" + ) if self.config.world_size != 1 and self.config.world_size != self.world_size: - LOG.warning(f"Config world_size ({self.config.world_size}) ignored; using WORLD_SIZE from environment ({self.world_size})") + LOG.warning( + f"Config world_size ({self.config.world_size}) ignored; using WORLD_SIZE from environment ({self.world_size})" + ) else: # If srun is not available, spawn procs manually on a node @@ -411,8 +417,8 @@ def _get_parallel_info_from_slurm(self) -> tuple[int, int, int]: def _using_distributed_env(self) -> bool: """Checks for distributed env vars like those in Azure ML.""" - return 'RANK' in os.environ and 'WORLD_SIZE' in os.environ + return "RANK" in os.environ and "WORLD_SIZE" in os.environ def _is_mpi_env(self) -> bool: """Detects common MPI implementations (optional, for generality).""" - return 'OMPI_COMM_WORLD_SIZE' in os.environ or 'PMI_SIZE' in os.environ \ No newline at end of file + return "OMPI_COMM_WORLD_SIZE" in os.environ or "PMI_SIZE" in os.environ From 88284932a76c8f6736479a67023606f3706bdb88 Mon Sep 17 00:00:00 2001 From: Saadat Ali Date: Tue, 14 Oct 2025 19:30:38 +0100 Subject: [PATCH 3/3] Update parallel.py add TODOs for env bootstrap and init delegation --- src/anemoi/inference/runners/parallel.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/anemoi/inference/runners/parallel.py b/src/anemoi/inference/runners/parallel.py index 829c24366..c13ccc903 100644 --- a/src/anemoi/inference/runners/parallel.py +++ b/src/anemoi/inference/runners/parallel.py @@ -277,6 +277,9 @@ def _bootstrap_processes(self) -> None: f"world size ({self.config.world_size}) set in the config is ignored because we are launching via srun, using 'SLURM_NTASKS' instead" ) elif "RANK" in os.environ and "WORLD_SIZE" in os.environ: + # TODO(refactor): Extract AzureML/general env bootstrap (RANK/WORLD_SIZE/LOCAL_RANK and + # MASTER_ADDR/MASTER_PORT wiring) into a delegated ClusterEnvironment + # (e.g., AzureMLEnvironment.bootstrap()) in a follow-up PR. # New branch for Azure ML / general distributed env (e.g., env:// mode) self.global_rank = int(os.environ["RANK"]) self.local_rank = int( @@ -400,6 +403,8 @@ def _init_parallel(self) -> Optional["torch.distributed.ProcessGroup"]: model_comm_group_ranks = np.arange(self.world_size, dtype=int) model_comm_group = dist.new_group(model_comm_group_ranks) else: + # TODO(refactor): Delegate backend + init_method selection and dist.init_process_group(...) + # to a ClusterEnvironment (e.g., env.init_parallel()) so ParallelRunner has no branching here. if self._using_distributed_env(): init_method = "env://" # Azure ML recommended else: