Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion deepspeed/ops/adam/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@
from .cpu_adam import DeepSpeedCPUAdam
from .fused_adam import FusedAdam
from .zenflow_cpu_adam import ZenFlowCPUAdam
from .zenflow_torch_adam import ZenFlowSelectiveAdamW
from .zenflow_torch_adam import ZenFlowSelectiveAdamW, ZenFlowSelectiveAdamW_stage3
284 changes: 251 additions & 33 deletions deepspeed/ops/adam/zenflow_torch_adam.py

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions deepspeed/runtime/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -1868,6 +1868,7 @@ def _configure_zero_optimizer(self, optimizer):
overlap_comm=self.zero_overlap_comm(),
offload_optimizer_config=self.zero_offload_optimizer(),
offload_param_config=self.zero_offload_param(),
zenflow_config=self.zenflow_config(),
sub_group_size=self.zero_sub_group_size(),
offload_ratio=self.zero_partial_offload(),
mpu=self.mpu,
Expand Down
641 changes: 641 additions & 0 deletions deepspeed/runtime/zenflow/engine_stage3.py

Large diffs are not rendered by default.

140 changes: 3 additions & 137 deletions deepspeed/runtime/zenflow/zenflow_stage_1_and_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,11 @@

# DeepSpeed Team

import os
import math
import psutil
import torch
from deepspeed import comm as dist
import torch.multiprocessing as mp

from deepspeed.runtime.zero.stage_1_and_2 import DeepSpeedZeroOptimizer
from deepspeed.runtime.zenflow.zenflow_utils import start_optimizer_process
from deepspeed.runtime.utils import (see_memory_usage)
from deepspeed.ops.adam import ZenFlowSelectiveAdamW

Expand Down Expand Up @@ -97,6 +94,8 @@ def __init__(self,
self.full_warm_up_rounds = zenflow_config.full_warm_up_rounds
self.offload_selective_optimizer = zenflow_config.offload
self.pt_reserved_cores_perc = zenflow_config.pt_reserved_cores_perc
self.start_optimizer_process = lambda: start_optimizer_process(self)
self.zf_stage3 = False

if self.offload_selective_optimizer:
assert overlap_comm, "offload selective optimizer should be used with overlap_comm"
Expand Down Expand Up @@ -636,64 +635,10 @@ def zenflow_cpu_optimizer_step(self, group_no):
self.optimizer.step(step_id=self.micro_step + 1)


def disable_accelerator():
accelerator = get_accelerator()
accelerator.is_available = lambda: False
accelerator.device_count = lambda: 0
accelerator.current_device = lambda: -1
# Optionally mark it as initialized if needed
if hasattr(accelerator, "_initialized"):
accelerator._initialized = True


def zenflow_optimizer_process(pipe, curr_rank, total_rank, param_groups, shared_overlap_grad_map,
shared_stale_param_map, zf_affinity):
disable_accelerator()

current_process = psutil.Process()
current_process.cpu_affinity(zf_affinity)
os.environ['OMP_NUM_THREADS'] = str(len(zf_affinity))

from deepspeed.ops.adam import ZenFlowCPUAdam
optimizer = ZenFlowCPUAdam(param_groups, overlap_step=True)

pipe.send({"type": "ready"})

# TODO: replace this with rpc

while True:
cmd = pipe.recv()
if cmd["type"] == "step":
now_state = cmd["now_state"]
micro_step = cmd["micro_step"]
group_infos = cmd["group_infos"]

for group_no, group_info in enumerate(group_infos):
original_param_groups = optimizer.param_groups
optimizer.param_groups = [original_param_groups[group_no]]
group = optimizer.param_groups[0]

for param_idx, param in enumerate(group["params"]):
key = (group_no, param_idx)
if key in shared_overlap_grad_map:
param.overlap_grad = shared_overlap_grad_map[key]
if key in shared_stale_param_map:
param.stale_param = shared_stale_param_map[key]

optimizer.step(step_id=micro_step + 1, now_state=now_state, group_info=group_info)

optimizer.param_groups = original_param_groups

pipe.send({"type": "done"})
elif cmd["type"] == "exit":
break


class ZenFlowZeroOptimizerParallel(ZenFlowZeroOptimizer):

def __init__(self, *args, **kwargs):
super(ZenFlowZeroOptimizerParallel, self).__init__(*args, **kwargs)
self.process_pool = mp.Pool(1)
self.process_optimizer_established = False
self.first_update_round_after_warmup = True

Expand Down Expand Up @@ -759,85 +704,6 @@ def async_inplace_copy_grad_to_fp32_buffer_from_gpu(self, param):
dest_tensor.copy_(src_tensor, non_blocking=True)
param.grad = None #offload only

# check if all tensors in the list are equal to each other
def all_tensors_equal(self, tensor_list):
first_tensor = tensor_list[0]
for tensor in tensor_list[1:]:
if not torch.equal(first_tensor, tensor):
return False
return True

def start_optimizer_process(self):
from multiprocessing import Pipe, get_context, Manager

ctx = get_context("spawn")
self.parent_conn, self.child_conn = Pipe()

manager = Manager()
self.shared_overlap_grad_map = manager.dict()
self.shared_stale_param_map = manager.dict()

for group_no, group in enumerate(self.optimizer.param_groups):
for param_idx, param in enumerate(group['params']):
param.data.share_memory_()
if not hasattr(param, 'stale_param'):
param.stale_param = torch.zeros_like(param.data, dtype=param.dtype, device=param.device)
param.stale_param.data.share_memory_()
key = (group_no, param_idx)
self.shared_stale_param_map[key] = param.stale_param
if param.overlap_grad is not None:
param.overlap_grad[0].data.share_memory_()
param.overlap_grad[1].data.share_memory_()
key = (group_no, param_idx)
self.shared_overlap_grad_map[key] = param.overlap_grad

param_groups_data = self.optimizer.param_groups
curr_rank = dist.get_rank()
total_rank = dist.get_world_size()

current_process = psutil.Process()
current_affinity = current_process.cpu_affinity()
all_affinities = [
torch.zeros(len(current_affinity),
dtype=type(current_affinity[0]),
device=get_accelerator().current_device_name()) for _ in range(total_rank)
]
dist.all_gather(
all_affinities,
torch.tensor(current_affinity,
dtype=type(current_affinity[0]),
device=get_accelerator().current_device_name()))
# When affinity across all ranks are the same, the workers are not binded. Do a soft bind here
if self.all_tensors_equal(all_affinities):
num_phy_cores = psutil.cpu_count(logical=False)
available_phy_cores = [i for i in current_affinity if i < num_phy_cores]
num_available_phy_cores = len(available_phy_cores)
my_rank = curr_rank
my_size = total_rank
cores_per_rank = num_available_phy_cores // my_size
current_affinity = available_phy_cores[my_rank * cores_per_rank:(my_rank + 1) * cores_per_rank]
pt_num_cores = math.ceil(self.pt_reserved_cores_perc * len(current_affinity))
if pt_num_cores > 0 and pt_num_cores < len(current_affinity):
zf_affinity = current_affinity[pt_num_cores:]
pt_affinity = current_affinity[:pt_num_cores]
else:
zf_affinity = current_affinity
pt_affinity = current_affinity
self.process = ctx.Process(
target=zenflow_optimizer_process,
args=(self.child_conn, curr_rank, total_rank, param_groups_data, self.shared_overlap_grad_map,
self.shared_stale_param_map, zf_affinity),
)
self.process.daemon = True
self.process.start()
current_process.cpu_affinity(pt_affinity)
os.environ['OMP_NUM_THREADS'] = str(len(pt_affinity))

msg = self.parent_conn.recv()
assert msg["type"] == "ready", "Optimizer process did not initialize correctly."

self.process_optimizer_established = True

def wait_last_update_and_copy(self):

if not hasattr(self, 'parent_conn'):
Expand Down
149 changes: 149 additions & 0 deletions deepspeed/runtime/zenflow/zenflow_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,12 @@

# DeepSpeed Team

import os
import math
import torch
import psutil
from deepspeed import comm as dist
from deepspeed.accelerator import get_accelerator


def _flatten_dense_tensors(tensors):
Expand Down Expand Up @@ -40,3 +45,147 @@ def _unflatten_dense_tensors(flat, tensors):
transposed_tensors = [t.transpose(0, 1) if t.dim() == 2 else t for t in tensors]
unflat = torch._C._nn.unflatten_dense_tensors(flat, transposed_tensors)
return [t.transpose(0, 1) if t.dim() == 2 else t for t in unflat]


def disable_accelerator():
accelerator = get_accelerator()
accelerator.is_available = lambda: False
accelerator.device_count = lambda: 0
accelerator.current_device = lambda: -1
# Optionally mark it as initialized if needed
if hasattr(accelerator, "_initialized"):
accelerator._initialized = True


def zenflow_optimizer_process(pipe, param_groups, shared_overlap_grad_map, shared_stale_param_map, zf_affinity):
disable_accelerator()

current_process = psutil.Process()
current_process.cpu_affinity(zf_affinity)
os.environ['OMP_NUM_THREADS'] = str(len(zf_affinity))

from deepspeed.ops.adam import ZenFlowCPUAdam
optimizer = ZenFlowCPUAdam(param_groups, overlap_step=True)

pipe.send({"type": "ready"})

# TODO: replace this with rpc

while True:
cmd = pipe.recv()
if cmd["type"] == "step":
now_state = cmd["now_state"]
micro_step = cmd["micro_step"]
group_infos = cmd["group_infos"]

for group_no, group_info in enumerate(group_infos):
original_param_groups = optimizer.param_groups
optimizer.param_groups = [original_param_groups[group_no]]
group = optimizer.param_groups[0]

for param_idx, param in enumerate(group["params"]):
key = (group_no, param_idx)
if key in shared_overlap_grad_map:
param.overlap_grad = shared_overlap_grad_map[key]
if key in shared_stale_param_map:
param.stale_param = shared_stale_param_map[key]

optimizer.step(step_id=micro_step + 1, now_state=now_state, group_info=group_info)

optimizer.param_groups = original_param_groups

pipe.send({"type": "done"})
elif cmd["type"] == "exit":
break


def all_tensors_equal(tensor_list):
first_tensor = tensor_list[0]
for tensor in tensor_list[1:]:
if not torch.equal(first_tensor, tensor):
return False
return True


def start_optimizer_process(zf_optimizer):
from multiprocessing import Pipe, get_context, Manager

ctx = get_context("spawn")
zf_optimizer.parent_conn, zf_optimizer.child_conn = Pipe()

manager = Manager()
zf_optimizer.shared_overlap_grad_map = manager.dict()
zf_optimizer.shared_stale_param_map = manager.dict()

if zf_optimizer.zf_stage3:
params_iter = [((group_no, 0), param)
for group_no, param in enumerate(zf_optimizer.fp32_partitioned_groups_flat)]
else:
params_iter = [((group_no, param_idx), param)
for group_no, group in enumerate(zf_optimizer.optimizer.param_groups)
for param_idx, param in enumerate(group["params"])]

for key, param in params_iter:
param.data.share_memory_()

if not hasattr(param, "stale_param"):
param.stale_param = torch.zeros_like(param.data, dtype=param.dtype, device=param.device)
param.stale_param.data.share_memory_()
zf_optimizer.shared_stale_param_map[key] = param.stale_param

if getattr(param, "overlap_grad", None) is not None:
param.overlap_grad[0].data.share_memory_()
param.overlap_grad[1].data.share_memory_()
zf_optimizer.shared_overlap_grad_map[key] = param.overlap_grad

param_groups_data = ([{
"params": [param]
} for param in zf_optimizer.fp32_partitioned_groups_flat]
if zf_optimizer.zf_stage3 else zf_optimizer.optimizer.param_groups)

curr_rank = dist.get_rank()
total_rank = dist.get_world_size()

current_process = psutil.Process()
current_affinity = current_process.cpu_affinity()
all_affinities = [
torch.zeros(len(current_affinity),
dtype=type(current_affinity[0]),
device=get_accelerator().current_device_name()) for _ in range(total_rank)
]
dist.all_gather(
all_affinities,
torch.tensor(current_affinity, dtype=type(current_affinity[0]),
device=get_accelerator().current_device_name()))
# When affinity across all ranks are the same, the workers are not binded. Do a soft bind here
if all_tensors_equal(all_affinities):
num_phy_cores = psutil.cpu_count(logical=False)
available_phy_cores = [i for i in current_affinity if i < num_phy_cores]
num_available_phy_cores = len(available_phy_cores)
my_rank = curr_rank
my_size = total_rank
cores_per_rank = num_available_phy_cores // my_size
current_affinity = available_phy_cores[my_rank * cores_per_rank:(my_rank + 1) * cores_per_rank]
pt_num_cores = math.ceil(zf_optimizer.pt_reserved_cores_perc * len(current_affinity))
if pt_num_cores > 0 and pt_num_cores < len(current_affinity):
zf_affinity = current_affinity[pt_num_cores:]
pt_affinity = current_affinity[:pt_num_cores]
else:
zf_affinity = current_affinity
pt_affinity = current_affinity

zf_optimizer.process = ctx.Process(
target=zenflow_optimizer_process,
args=(zf_optimizer.child_conn, param_groups_data, zf_optimizer.shared_overlap_grad_map,
zf_optimizer.shared_stale_param_map, zf_affinity),
)
zf_optimizer.process.daemon = True
zf_optimizer.process.start()

current_process.cpu_affinity(pt_affinity)
os.environ['OMP_NUM_THREADS'] = str(len(pt_affinity))

msg = zf_optimizer.parent_conn.recv()
assert msg["type"] == "ready", "Optimizer process did not initialize correctly."

zf_optimizer.process_optimizer_established = True
Loading