Skip to content

Commit 8c306ce

Browse files
committed
add comm test
1 parent c679690 commit 8c306ce

19 files changed

+485
-454
lines changed

test/distributed/test_backends.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ def test_device_to_backend_mapping(self, device) -> None:
2323
assert dist.get_default_backend_for_device(device) == "gloo"
2424
elif "hpu" in device:
2525
assert dist.get_default_backend_for_device(device) == "hccl"
26+
elif "xpu" in device:
27+
assert dist.get_default_backend_for_device(device) == "xccl"
2628
else:
2729
with self.assertRaises(ValueError):
2830
dist.get_default_backend_for_device(device)
@@ -45,7 +47,7 @@ def test_create_pg(self, device) -> None:
4547

4648

4749
devices = ["cpu", "cuda", "hpu", "xpu"]
48-
instantiate_device_type_tests(TestMiscCollectiveUtils, globals(), only_for=devices)
50+
instantiate_device_type_tests(TestMiscCollectiveUtils, globals(), only_for=devices, allow_xpu=True)
4951

5052
if __name__ == "__main__":
5153
run_tests()

test/distributed/test_c10d_common.py

Lines changed: 46 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@
5757
else:
5858
LOOPBACK = "lo"
5959

60-
torch.backends.cuda.matmul.allow_tf32 = False
60+
torch.backends.xpu.matmul.allow_tf32 = False
6161

6262

6363
def gpus_for_rank(world_size):
@@ -66,8 +66,8 @@ def gpus_for_rank(world_size):
6666
On a single node, all visible GPUs are evenly
6767
divided to subsets, each process only uses a subset.
6868
"""
69-
visible_devices = list(range(torch.cuda.device_count()))
70-
gpus_per_process = torch.cuda.device_count() // world_size
69+
visible_devices = list(range(torch.xpu.device_count()))
70+
gpus_per_process = torch.xpu.device_count() // world_size
7171
gpus_for_rank = []
7272
for rank in range(world_size):
7373
gpus_for_rank.append(
@@ -339,7 +339,7 @@ def _prepare_single_device_module(
339339
gradient_as_bucket_view=False,
340340
):
341341
model = Net()
342-
device = devices[0] if devices else torch.device(f"cuda:{self.rank:d}")
342+
device = devices[0] if devices else torch.device(f"xpu:{self.rank:d}")
343343
ddp_model = DistributedDataParallel(
344344
copy.deepcopy(model).to(device),
345345
device_ids=device_ids,
@@ -380,7 +380,7 @@ def _prepare_multi_device_module(
380380
gradient_as_bucket_view=gradient_as_bucket_view,
381381
)
382382

383-
input = torch.randn(global_batch_size, 2).cuda(devices[0])
383+
input = torch.randn(global_batch_size, 2).xpu(devices[0])
384384
target = torch.randn(global_batch_size, 4)
385385

386386
return model, ddp_model, input, target
@@ -414,10 +414,10 @@ def _test_ddp_checkpointing(
414414
allow_none_grads=False,
415415
):
416416
# to reproduce the same training results
417-
torch.cuda.set_device(self.rank)
417+
torch.xpu.set_device(self.rank)
418418
torch.manual_seed(31415)
419-
model = copy.deepcopy(input_model).cuda()
420-
ddp_model = copy.deepcopy(input_model).cuda()
419+
model = copy.deepcopy(input_model).xpu()
420+
ddp_model = copy.deepcopy(input_model).xpu()
421421
ddp_model = nn.parallel.DistributedDataParallel(
422422
ddp_model,
423423
bucket_cap_mb=1,
@@ -533,8 +533,8 @@ def __init__(self, use_reentrant=True):
533533
def _prepare_dummy_data(self):
534534
ddp_bs = 16
535535
bs = ddp_bs * self.world_size
536-
input = torch.rand((bs, 20), device="cuda", requires_grad=True)
537-
target = torch.randn((bs, 20), device="cuda")
536+
input = torch.rand((bs, 20), device="xpu", requires_grad=True)
537+
target = torch.randn((bs, 20), device="xpu")
538538
offset = self.rank * ddp_bs
539539
ddp_input = input[offset : offset + ddp_bs]
540540
ddp_target = target[offset : offset + ddp_bs]
@@ -694,7 +694,7 @@ def test_ddp_checkpointing_weight_sharing(self, use_reentrant):
694694
Test that checkpointing with weight sharing works.
695695
"""
696696
process_group = self._get_process_group()
697-
torch.cuda.set_device(self.rank)
697+
torch.xpu.set_device(self.rank)
698698
for use_bucket_view, static_graph in product((False, True), (False, True)):
699699
torch.manual_seed(31415)
700700
l1 = nn.Linear(20, 20)
@@ -717,7 +717,7 @@ def test_ddp_checkpointing_twice_weight_sharing(self):
717717
same layer twice and having weights shared across layers.
718718
"""
719719
process_group = self._get_process_group()
720-
torch.cuda.set_device(self.rank)
720+
torch.xpu.set_device(self.rank)
721721
for use_bucket_view in (True, False):
722722
self._test_ddp_checkpointing(
723723
self.CheckpointTwiceModuleWeightSharing(),
@@ -1141,7 +1141,7 @@ def _test_sequence_num_incremented(self, process_group, ranks):
11411141

11421142
# Verify sequence numbers are appropriately incremented
11431143
for i in range(10):
1144-
t = torch.ones(1, device=torch.cuda.current_device())
1144+
t = torch.ones(1, device=torch.xpu.current_device())
11451145
dist.all_reduce(t, group=process_group)
11461146
if not c10d._rank_not_in_group(process_group):
11471147
seq_num = self._verify_sequence_number_across_pg(
@@ -1172,7 +1172,7 @@ def _test_sequence_num_incremented(self, process_group, ranks):
11721172
self.assertEqual(rank_to_seq_num[0] + 1, rank_to_seq_num[1])
11731173

11741174
def _test_sequence_num_incremented_default_group(self, backend_name):
1175-
torch.cuda.set_device(self.rank)
1175+
torch.xpu.set_device(self.rank)
11761176
store = dist.FileStore(self.file_name, self.world_size)
11771177
dist.init_process_group(
11781178
backend_name,
@@ -1186,7 +1186,7 @@ def _test_sequence_num_incremented_default_group(self, backend_name):
11861186
)
11871187

11881188
def _test_sequence_num_incremented_subgroup(self, backend_name):
1189-
torch.cuda.set_device(self.rank)
1189+
torch.xpu.set_device(self.rank)
11901190
store = dist.FileStore(self.file_name, self.world_size)
11911191
dist.init_process_group(
11921192
backend_name,
@@ -1241,8 +1241,8 @@ def _test_warn_not_in_group(self, backend):
12411241
in_group_ranks = list(filter(lambda x: x % 2 == 0, range(self.world_size)))
12421242
group = dist.new_group(in_group_ranks)
12431243

1244-
x = torch.zeros(2, 2).cuda(self.rank)
1245-
xs = [torch.zeros(2, 2).cuda(self.rank) for _ in range(len(in_group_ranks))]
1244+
x = torch.zeros(2, 2).xpu(self.rank)
1245+
xs = [torch.zeros(2, 2).xpu(self.rank) for _ in range(len(in_group_ranks))]
12461246
if self.rank not in in_group_ranks:
12471247
msg = ".*{}.*does not belong to.*"
12481248
with self.assertWarnsOnceRegex(UserWarning, msg.format("all_gather")):
@@ -1371,7 +1371,7 @@ def _test_bool_tensors(self, backend):
13711371
rank=self.rank,
13721372
store=store,
13731373
)
1374-
device = "cuda" if backend == "nccl" else "cpu"
1374+
device = "xpu" if backend == "xccl" else "cpu"
13751375
# test alltoall_base
13761376
tensor = torch.tensor([1, 0, 0, 1], dtype=torch.bool, device=device)
13771377
zeros = torch.tensor([0, 0, 0, 0], dtype=torch.bool, device=device)
@@ -1553,8 +1553,8 @@ def test_debug_level(self):
15531553

15541554
class DummyWork(dist._Work):
15551555
def wait(self, timeout=5.0):
1556-
if torch.cuda.is_available():
1557-
torch.cuda.current_stream().synchronize()
1556+
if torch.xpu.is_available():
1557+
torch.xpu.current_stream().synchronize()
15581558
return True
15591559

15601560

@@ -1664,17 +1664,17 @@ def test_backend_config(self):
16641664

16651665
# Ensure backend config can be created with the following arguments
16661666
backend_config_strings_and_expected_values = [
1667-
(dist.Backend.GLOO, "cpu:gloo,cuda:gloo"),
1668-
(dist.Backend.NCCL, "cuda:nccl"),
1669-
(dist.Backend.MPI, "cpu:mpi,cuda:mpi"),
1670-
(dist.Backend.UCC, "cpu:ucc,cuda:ucc"),
1671-
(dist.Backend.DUMMY, "cpu:dummy,cuda:dummy"),
1672-
("DUMMY", "cpu:dummy,cuda:dummy"),
1673-
("dummy", "cpu:dummy,cuda:dummy"),
1674-
("cpu:dummy,cuda:dummy", "cpu:dummy,cuda:dummy"),
1675-
("cpu:dummy,cuda:nccl", "cpu:dummy,cuda:nccl"),
1676-
("cpu:gloo,cuda:dummy", "cpu:gloo,cuda:dummy"),
1677-
("cpu:gloo,cuda:nccl", "cpu:gloo,cuda:nccl"),
1667+
(dist.Backend.GLOO, "cpu:gloo,xpu:gloo"),
1668+
(dist.Backend.XCCL, "xpu:xccl"),
1669+
(dist.Backend.MPI, "cpu:mpi,xpu:mpi"),
1670+
(dist.Backend.UCC, "cpu:ucc,xpu:ucc"),
1671+
(dist.Backend.DUMMY, "cpu:dummy,xpu:dummy"),
1672+
("DUMMY", "cpu:dummy,xpu:dummy"),
1673+
("dummy", "cpu:dummy,xpu:dummy"),
1674+
("cpu:dummy,xpu:dummy", "cpu:dummy,xpu:dummy"),
1675+
("cpu:dummy,xpu:xccl", "cpu:dummy,xpu:xccl"),
1676+
("cpu:gloo,xpu:dummy", "cpu:gloo,xpu:dummy"),
1677+
("cpu:gloo,xpu:xccl", "cpu:gloo,xpu:xccl"),
16781678
]
16791679

16801680
for config_str, expected_value in backend_config_strings_and_expected_values:
@@ -1685,8 +1685,8 @@ def test_backend_config(self):
16851685

16861686
# Ensure backend config will raise ValueError with the following arguments
16871687
invalid_backend_config_strings = [
1688-
"cpu:gloo,cuda:nccl,", # trailing comma
1689-
"cpu:gloo,cuda:nccl,cpu:dummy", # duplicate device
1688+
"cpu:gloo,xpu:xccl,", # trailing comma
1689+
"cpu:gloo,xpu:xccl,cpu:dummy", # duplicate device
16901690
]
16911691
for config_str in invalid_backend_config_strings:
16921692
with self.subTest(config_str):
@@ -1701,7 +1701,7 @@ def test_init_process_group_with_multiple_backends(self):
17011701
os.environ["MASTER_ADDR"] = "localhost"
17021702
os.environ["MASTER_PORT"] = "6789"
17031703
dist.init_process_group(
1704-
"cpu:dummy,cuda:dummy", rank=self.rank, world_size=self.world_size
1704+
"cpu:dummy,xpu:dummy", rank=self.rank, world_size=self.world_size
17051705
)
17061706

17071707
# test all_gather
@@ -1816,8 +1816,8 @@ def tearDown(self):
18161816

18171817
def test_init_process_group_optional_backend(self):
18181818
store = dist.FileStore(self.file_name, self.world_size)
1819-
# creates both gloo and nccl backend
1820-
if dist.is_gloo_available() and dist.is_nccl_available():
1819+
# creates both gloo and xccl backend
1820+
if dist.is_gloo_available() and dist.is_xccl_available():
18211821
dist.init_process_group(
18221822
store=store,
18231823
rank=self.rank,
@@ -1834,8 +1834,8 @@ def test_init_process_group_for_all_backends(self):
18341834
elif backend == dist.Backend.MPI:
18351835
if not dist.is_mpi_available():
18361836
continue
1837-
elif backend == dist.Backend.NCCL:
1838-
if not dist.is_nccl_available() or not torch.cuda.is_available():
1837+
elif backend == dist.Backend.XCCL:
1838+
if not dist.is_xccl_available() or not torch.xpu.is_available():
18391839
continue
18401840
elif backend == dist.Backend.GLOO:
18411841
if not dist.is_gloo_available():
@@ -1871,8 +1871,8 @@ def _call_collective_with_varying_tensors(self, backend, collective, *args):
18711871
# correctly dispatched
18721872

18731873
# TODO: this will be updated in the future to not be backend specific
1874-
device = "cuda" if backend == "nccl" else "cpu"
1875-
# ensure supported devices (cpu, cuda) succeeds during dispatch call
1874+
device = "xpu" if backend == "xccl" else "cpu"
1875+
# ensure supported devices (cpu, xpu) succeeds during dispatch call
18761876
tensor = torch.zeros(2, 2, device=torch.device(device))
18771877
# multi tensor collectives
18781878
if collective == dist.barrier:
@@ -1923,7 +1923,7 @@ def _test_allreduce_coalesced(self, backend):
19231923
store=store,
19241924
)
19251925
# TODO: this will be updated in the future to not be backend specific
1926-
device = "cuda" if backend == "nccl" else "cpu"
1926+
device = "xpu" if backend == "xccl" else "cpu"
19271927
tensors = [torch.ones(10, 10, device=torch.device(device))]
19281928
dist.all_reduce_coalesced(tensors, dist.ReduceOp.SUM)
19291929
for tensor in tensors:
@@ -1937,7 +1937,7 @@ def _test_all_to_all_single(self, backend):
19371937
rank=self.rank,
19381938
store=store,
19391939
)
1940-
device = "cuda" if backend == "nccl" else "cpu"
1940+
device = "xpu" if backend == "xccl" else "cpu"
19411941
# test alltoall_base
19421942
input_tensor = torch.ones(2, 2, device=torch.device(device))
19431943
output_tensor = torch.zeros(2, 2, device=torch.device(device))
@@ -1964,7 +1964,7 @@ def test_op_isinstance_of_reduceop(self):
19641964
self.assertTrue(isinstance(reduce_op, c10d.ReduceOp))
19651965
for scale in (torch.tensor(1.0), 2.0):
19661966
self.assertTrue(
1967-
isinstance(dist._make_nccl_premul_sum(scale), c10d.ReduceOp)
1967+
isinstance(dist._make_xccl_premul_sum(scale), c10d.ReduceOp)
19681968
)
19691969

19701970
# Ref: https://github.com/pytorch/pytorch/pull/87303#discussion_r1002879700
@@ -1985,7 +1985,7 @@ def test_reduceop_copyable(self):
19851985
self.assertEqual(copy.deepcopy(c10d.ReduceOp(reduce_op)), reduce_op)
19861986

19871987
for scale in (torch.tensor(1.0), 2.0):
1988-
reduce_op = dist._make_nccl_premul_sum(scale)
1988+
reduce_op = dist._make_xccl_premul_sum(scale)
19891989
self.assertEqual(copy.copy(reduce_op), reduce_op)
19901990
self.assertEqual(copy.deepcopy(reduce_op), reduce_op)
19911991

@@ -2004,7 +2004,7 @@ def test_reduceop_pickle(self):
20042004
orig = c10d.ReduceOp(reduce_op)
20052005
self.assertEqual(pickle.loads(pickle.dumps(orig)), orig)
20062006
for scale in (torch.tensor(1.0), 2.0):
2007-
reduce_op = dist._make_nccl_premul_sum(scale)
2007+
reduce_op = dist._make_xccl_premul_sum(scale)
20082008
self.assertEqual(pickle.loads(pickle.dumps(reduce_op)), reduce_op)
20092009

20102010
# Ref: https://github.com/pytorch/pytorch/issues/90072
@@ -2070,7 +2070,7 @@ def testNodeLocalRank(self):
20702070

20712071
if __name__ == "__main__":
20722072
assert (
2073-
not torch.cuda._initialized
2073+
not torch.xpu._initialized
20742074
), "test_distributed must not have initialized CUDA context on main process"
20752075

20762076
run_tests()

0 commit comments

Comments
 (0)