Skip to content

Back out "fix broken tests" #751

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 6 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 7 additions & 47 deletions python/tests/test_python_actors.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,6 @@ async def test_choose():
assert_type(result, int)
assert result2 == result3

await proc.stop()


@pytest.mark.timeout(60)
async def test_stream():
Expand All @@ -96,8 +94,6 @@ async def test_stream():

assert 8 == sum([await x for x in v.value.stream()])

await proc.stop()


class To(Actor):
@endpoint
Expand All @@ -120,8 +116,6 @@ async def test_mesh_passed_to_mesh():
assert len(all) == 4
assert all[0] != all[1]

await proc.stop()


@pytest.mark.timeout(60)
async def test_mesh_passed_to_mesh_on_different_proc_mesh():
Expand All @@ -133,9 +127,6 @@ async def test_mesh_passed_to_mesh_on_different_proc_mesh():
assert len(all) == 4
assert all[0] != all[1]

await proc.stop()
await proc2.stop()


@pytest.mark.timeout(60)
async def test_actor_slicing():
Expand All @@ -152,9 +143,6 @@ async def test_actor_slicing():

assert result[0] == result[1]

await proc.stop()
await proc2.stop()


@pytest.mark.timeout(60)
async def test_aggregate():
Expand All @@ -165,8 +153,6 @@ async def test_aggregate():
r = await acc.accumulate()
assert r == 4

await proc.stop()


class RunIt(Actor):
@endpoint
Expand All @@ -184,8 +170,6 @@ async def test_rank_size():
assert 1 == await acc.accumulate(lambda: current_rank()["gpus"])
assert 4 == await acc.accumulate(lambda: current_size()["gpus"])

await proc.stop()


class SyncActor(Actor):
@endpoint
Expand All @@ -201,51 +185,41 @@ async def test_sync_actor():
r = await a.sync_endpoint.choose(c)
assert r == 5

await proc.stop()


@pytest.mark.timeout(60)
async def test_sync_actor_sync_client() -> None:
def test_sync_actor_sync_client() -> None:
proc = local_proc_mesh(gpus=2)
a = proc.spawn("actor", SyncActor).get()
c = proc.spawn("counter", Counter, 5).get()
r = a.sync_endpoint.choose(c).get()
assert r == 5

await proc.stop()


@pytest.mark.timeout(60)
async def test_proc_mesh_size() -> None:
def test_proc_mesh_size() -> None:
proc = local_proc_mesh(gpus=2)
assert 2 == proc.size("gpus")
# proc.initialized.get()
# await proc.stop()


@pytest.mark.timeout(60)
async def test_rank_size_sync() -> None:
def test_rank_size_sync() -> None:
proc = local_proc_mesh(gpus=2)
r = proc.spawn("runit", RunIt).get()

acc = Accumulator(r.run, 0, operator.add)
assert 1 == acc.accumulate(lambda: current_rank()["gpus"]).get()
assert 4 == acc.accumulate(lambda: current_size()["gpus"]).get()

await proc.stop()


@pytest.mark.timeout(60)
async def test_accumulate_sync() -> None:
def test_accumulate_sync() -> None:
proc = local_proc_mesh(gpus=2)
counter = proc.spawn("counter", Counter, 1).get()
counter.incr.broadcast()
acc = Accumulator(counter.value, 0, operator.add)
r = acc.accumulate().get()
assert r == 4

await proc.stop()


class CastToCounter(Actor):
@endpoint
Expand All @@ -254,7 +228,7 @@ def doit(self, c: Counter):


@pytest.mark.timeout(60)
async def test_value_mesh() -> None:
def test_value_mesh() -> None:
proc = local_proc_mesh(gpus=2)
counter = proc.spawn("counter", Counter, 0).get()
counter.slice(hosts=0, gpus=1).incr.broadcast()
Expand All @@ -265,8 +239,6 @@ async def test_value_mesh() -> None:
n = proc.spawn("ctc", CastToCounter).get()
assert list(x) == n.slice(gpus=0).doit.call_one(counter).get()

await proc.stop()


@pytest.mark.timeout(60)
def test_rust_binding_modules_correct() -> None:
Expand Down Expand Up @@ -333,8 +305,6 @@ async def test_actor_tls() -> None:
assert 4 == await am.get.call_one()
assert 4 == await am.get_async.call_one()

await pm.stop()


class TLSActorFullSync(Actor):
"""An actor that manages thread-local state."""
Expand Down Expand Up @@ -364,8 +334,6 @@ async def test_actor_tls_full_sync() -> None:

assert 4 == await am.get.call_one()

await pm.stop()


class AsyncActor(Actor):
def __init__(self):
Expand All @@ -392,8 +360,6 @@ async def test_async_concurrency():
await am.no_more.call()
await fut

await pm.stop()


async def awaitit(f):
return await f
Expand Down Expand Up @@ -750,7 +716,7 @@ async def send(self, port: Port[int]):


@pytest.mark.timeout(60)
async def test_port_as_argument() -> None:
def test_port_as_argument() -> None:
proc_mesh = local_proc_mesh(gpus=1)
s = proc_mesh.spawn("send_alot", SendAlot).get()
send, recv = Channel[int].open()
Expand All @@ -760,8 +726,6 @@ async def test_port_as_argument() -> None:
for i in range(100):
assert i == recv.recv().get()

await proc_mesh.stop()


@pytest.mark.timeout(15)
async def test_same_actor_twice() -> None:
Expand All @@ -778,8 +742,6 @@ async def test_same_actor_twice() -> None:
"gspawn failed: an actor with name 'dup' has already been spawned" in error_msg
), f"Expected error message about duplicate actor name, got: {error_msg}"

await pm.stop()


class TestActorMeshStop(unittest.IsolatedAsyncioTestCase):
async def test_actor_mesh_stop(self) -> None:
Expand All @@ -806,13 +768,11 @@ def add(self, port: "Port[int]", b: int) -> None:


@pytest.mark.timeout(60)
async def test_ported_actor():
def test_ported_actor():
proc_mesh = local_proc_mesh(gpus=1).get()
a = proc_mesh.spawn("port_actor", PortedActor).get()
assert 5 == a.add.call_one(2).get()

await proc_mesh.stop()


async def _recv():
return (7, 2, 3)
Expand Down