Skip to content

Commit c9ac3a0

Browse files
committed
[hyperactor] simplify supervision propagation; unhandled events always cause failure
Currently (local) supervision can propagate events without also killing an intermediate actor. This is 1) wrong; and 2) complicated. Instead, we treat an unhandled supervision event as an actor failure, and then reduce the propagation paths to one: that of an actor failing. In order to retain accurate attribution, we add a "caused_by" field to the actor supervision events. Differential Revision: [D79702385](https://our.internmc.facebook.com/intern/diff/D79702385/) **NOTE FOR REVIEWERS**: This PR has internal Meta-specific changes or comments, please review them on [Phabricator](https://our.internmc.facebook.com/intern/diff/D79702385/)! ghstack-source-id: 301209925 Pull Request resolved: #784
1 parent c222810 commit c9ac3a0

File tree

7 files changed

+256
-47
lines changed

7 files changed

+256
-47
lines changed

hyperactor/src/actor.rs

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -249,8 +249,8 @@ where
249249
/// with the ID of the actor being served.
250250
#[derive(Debug)]
251251
pub struct ActorError {
252-
actor_id: ActorId,
253-
kind: ActorErrorKind,
252+
pub(crate) actor_id: ActorId,
253+
pub(crate) kind: ActorErrorKind,
254254
}
255255

256256
/// The kinds of actor serving errors.
@@ -289,6 +289,10 @@ pub enum ActorErrorKind {
289289
#[error("actor is in an indeterminate state")]
290290
IndeterminateState,
291291

292+
/// An actor supervision event was not handled.
293+
#[error("supervision: {0}")]
294+
UnhandledSupervisionEvent(#[from] ActorSupervisionEvent),
295+
292296
/// A special kind of error that allows us to clone errors: we can keep the
293297
/// error string, but we lose the error structure.
294298
#[error("{0}")]
@@ -338,6 +342,15 @@ impl From<MailboxSenderError> for ActorError {
338342
}
339343
}
340344

345+
impl From<ActorSupervisionEvent> for ActorError {
346+
fn from(inner: ActorSupervisionEvent) -> Self {
347+
Self::new(
348+
inner.actor_id.clone(),
349+
ActorErrorKind::UnhandledSupervisionEvent(inner),
350+
)
351+
}
352+
}
353+
341354
/// A collection of signals to control the behavior of the actor.
342355
/// Signals are internal runtime control plane messages and should not be
343356
/// sent outside of the runtime.

hyperactor/src/mailbox/undeliverable.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,7 @@ pub fn supervise_undeliverable_messages(
164164
envelope
165165
)),
166166
message_headers: Some(envelope.headers().clone()),
167+
caused_by: None,
167168
})
168169
.is_err()
169170
{

hyperactor/src/proc.rs

Lines changed: 216 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -936,13 +936,30 @@ impl<A: Actor> Instance<A> {
936936
async fn serve(mut self, mut actor: A) {
937937
let result = self.run_actor_tree(&mut actor).await;
938938

939-
let actor_status = match result {
940-
Ok(_) => ActorStatus::Stopped,
941-
Err(err) => ActorStatus::Failed(err.to_string()),
939+
let (actor_status, event) = match result {
940+
Ok(_) => (ActorStatus::Stopped, None),
941+
Err(ActorError {
942+
kind: ActorErrorKind::UnhandledSupervisionEvent(event),
943+
..
944+
}) => (event.actor_status.clone(), Some(event)),
945+
Err(err) => (
946+
ActorStatus::Failed(err.to_string()),
947+
Some(ActorSupervisionEvent {
948+
actor_id: self.cell.actor_id().clone(),
949+
actor_status: ActorStatus::Failed(err.to_string()),
950+
message_headers: None,
951+
caused_by: None,
952+
}),
953+
),
942954
};
943955

944-
let result = self.cell.maybe_unlink_parent();
945-
if let Some(parent) = result {
956+
if let Some(parent) = self.cell.maybe_unlink_parent() {
957+
if let Some(event) = event {
958+
// Parent exists, failure should be propagated to the parent.
959+
parent.send_supervision_event_or_crash(event);
960+
}
961+
// TODO: we should get rid of this signal, and use *only* supervision events for
962+
// the purpose of conveying lifecycle changes
946963
if let Err(err) = parent.signal(Signal::ChildStopped(self.cell.pid())) {
947964
tracing::error!(
948965
"{}: failed to send stop message to parent pid {}: {:?}",
@@ -951,26 +968,14 @@ impl<A: Actor> Instance<A> {
951968
err
952969
);
953970
}
954-
if actor_status.is_failed() {
955-
// Parent exists, failure should be propagated to the parent.
956-
parent.send_supervision_event_or_crash(ActorSupervisionEvent {
957-
actor_id: self.cell.actor_id().clone(),
958-
actor_status: actor_status.clone(),
959-
message_headers: None,
960-
});
961-
}
962971
} else {
963972
// Failure happened to the root actor or orphaned child actors.
964973
// In either case, the failure should be propagated to proc.
965974
//
966975
// Note that orphaned actor is unexpected and would only happen if
967976
// there is a bug.
968-
if actor_status.is_failed() {
969-
self.proc.handle_supervision_event(ActorSupervisionEvent {
970-
actor_id: self.cell.actor_id().clone(),
971-
actor_status: actor_status.clone(),
972-
message_headers: None,
973-
})
977+
if let Some(event) = event {
978+
self.proc.handle_supervision_event(event);
974979
}
975980
}
976981
self.change_status(actor_status);
@@ -1061,7 +1066,7 @@ impl<A: Actor> Instance<A> {
10611066
let work = work.expect("inconsistent work queue state");
10621067
if let Err(err) = work.handle(actor, self).await {
10631068
for supervision_event in self.supervision_event_receiver.drain() {
1064-
self.handle_supervision_event(actor, supervision_event).await;
1069+
self.handle_supervision_event(actor, supervision_event).await?;
10651070
}
10661071
return Err(ActorError::new(self.self_id().clone(), ActorErrorKind::Processing(err)));
10671072
}
@@ -1080,7 +1085,7 @@ impl<A: Actor> Instance<A> {
10801085
}
10811086
}
10821087
Ok(supervision_event) = self.supervision_event_receiver.recv() => {
1083-
self.handle_supervision_event(actor, supervision_event).await;
1088+
self.handle_supervision_event(actor, supervision_event).await?;
10841089
}
10851090
}
10861091
self.cell
@@ -1112,24 +1117,41 @@ impl<A: Actor> Instance<A> {
11121117
&self,
11131118
actor: &mut A,
11141119
supervision_event: ActorSupervisionEvent,
1115-
) {
1120+
) -> Result<(), ActorError> {
11161121
// Handle the supervision event with the current actor.
1117-
if let Ok(false) = actor
1122+
match actor
11181123
.handle_supervision_event(self, &supervision_event)
11191124
.await
11201125
{
1121-
// The supervision event wasn't handled by this actor, try to bubble it up.
1122-
let result = self.cell.get_parent_cell();
1123-
if let Some(parent) = result {
1124-
parent.send_supervision_event_or_crash(supervision_event);
1125-
} else {
1126-
// Reaching here means the actor is either a root actor, or an orphaned
1127-
// child actor (i.e. the parent actor was dropped unexpectedly). In either
1128-
// case, the supervision event should be sent to proc.
1129-
//
1130-
// Note that orphaned actor is unexpected and would only happen if there
1131-
// is a bug.
1132-
self.proc.handle_supervision_event(supervision_event);
1126+
Ok(true) => {
1127+
// The supervision event was handled by this actor, nothing more to do.
1128+
Ok(())
1129+
}
1130+
Ok(false) => {
1131+
// The supervision event wasn't handled by this actor, chain it and bubble it up.
1132+
let supervision_event = ActorSupervisionEvent {
1133+
actor_id: self.self_id().clone(),
1134+
actor_status: ActorStatus::Failed(
1135+
"did not handle supervision event".to_string(),
1136+
),
1137+
message_headers: None,
1138+
caused_by: Some(Box::new(supervision_event)),
1139+
};
1140+
Err(supervision_event.into())
1141+
}
1142+
Err(err) => {
1143+
// The actor failed to handle the supervision event, it should die.
1144+
// Create a new supervision event for this failure and propagate it.
1145+
let supervision_event = ActorSupervisionEvent {
1146+
actor_id: self.self_id().clone(),
1147+
actor_status: ActorStatus::Failed(format!(
1148+
"failed to handle supervision event: {}",
1149+
err
1150+
)),
1151+
message_headers: None,
1152+
caused_by: Some(Box::new(supervision_event)),
1153+
};
1154+
Err(supervision_event.into())
11331155
}
11341156
}
11351157
}
@@ -2105,12 +2127,12 @@ mod tests {
21052127

21062128
// TODO: should we provide finer-grained stop reasons, e.g., to indicate it was
21072129
// stopped by a parent failure?
2108-
assert_matches!(root_2_1.await, ActorStatus::Stopped);
2109-
2110-
for actor in [root_1, root] {
2111-
// The other actors were unaffected.
2112-
assert_matches!(*actor.status().borrow(), ActorStatus::Idle);
2113-
}
2130+
assert_eq!(
2131+
root.await,
2132+
ActorStatus::Failed("did not handle supervision event".to_string())
2133+
);
2134+
assert_eq!(root_2_1.await, ActorStatus::Stopped);
2135+
assert_eq!(root_1.await, ActorStatus::Stopped);
21142136
}
21152137

21162138
#[tokio::test]
@@ -2533,10 +2555,162 @@ mod tests {
25332555
assert!(!root_2_1_state.load(Ordering::SeqCst));
25342556
assert_eq!(
25352557
reported_event.event().map(|e| e.actor_id.clone()),
2536-
Some(root_2_1.actor_id().clone())
2558+
Some(root.actor_id().clone())
25372559
);
25382560
}
25392561

2562+
#[tokio::test]
2563+
async fn test_instance() {
2564+
#[derive(Debug)]
2565+
struct TestActor;
2566+
2567+
#[async_trait]
2568+
impl Actor for TestActor {
2569+
type Params = ();
2570+
2571+
async fn new(_param: ()) -> Result<Self, anyhow::Error> {
2572+
Ok(Self)
2573+
}
2574+
}
2575+
2576+
#[async_trait]
2577+
impl Handler<(String, PortRef<String>)> for TestActor {
2578+
async fn handle(
2579+
&mut self,
2580+
cx: &crate::Context<Self>,
2581+
(message, port): (String, PortRef<String>),
2582+
) -> anyhow::Result<()> {
2583+
port.send(cx, message)?;
2584+
Ok(())
2585+
}
2586+
}
2587+
2588+
let proc = Proc::local();
2589+
2590+
let (instance, handle) = proc.instance("my_test_actor").unwrap();
2591+
2592+
let child_actor = TestActor::spawn(&instance, ()).await.unwrap();
2593+
2594+
let (port, mut receiver) = instance.open_port();
2595+
child_actor
2596+
.send(("hello".to_string(), port.bind()))
2597+
.unwrap();
2598+
2599+
let message = receiver.recv().await.unwrap();
2600+
assert_eq!(message, "hello");
2601+
2602+
child_actor.drain_and_stop().unwrap();
2603+
child_actor.await;
2604+
2605+
assert_eq!(*handle.status().borrow(), ActorStatus::Client);
2606+
drop(instance);
2607+
assert_eq!(*handle.status().borrow(), ActorStatus::Stopped);
2608+
handle.await;
2609+
}
2610+
2611+
#[tokio::test]
2612+
async fn test_supervision_event_handler_propagates() {
2613+
#[derive(Debug)]
2614+
struct FailingSupervisionActor;
2615+
2616+
#[async_trait]
2617+
impl Actor for FailingSupervisionActor {
2618+
type Params = ();
2619+
2620+
async fn new(_: ()) -> Result<Self, anyhow::Error> {
2621+
Ok(Self)
2622+
}
2623+
2624+
async fn handle_supervision_event(
2625+
&mut self,
2626+
_this: &Instance<Self>,
2627+
_event: &ActorSupervisionEvent,
2628+
) -> Result<bool, anyhow::Error> {
2629+
anyhow::bail!("failed to handle supervision event!")
2630+
}
2631+
}
2632+
2633+
#[async_trait]
2634+
impl Handler<String> for FailingSupervisionActor {
2635+
async fn handle(
2636+
&mut self,
2637+
_cx: &crate::Context<Self>,
2638+
message: String,
2639+
) -> anyhow::Result<()> {
2640+
Err(anyhow::anyhow!(message))
2641+
}
2642+
}
2643+
2644+
#[derive(Debug)]
2645+
struct ParentActor(tokio::sync::mpsc::UnboundedSender<ActorSupervisionEvent>);
2646+
2647+
#[async_trait]
2648+
impl Actor for ParentActor {
2649+
type Params = tokio::sync::mpsc::UnboundedSender<ActorSupervisionEvent>;
2650+
2651+
async fn new(
2652+
supervision_events: tokio::sync::mpsc::UnboundedSender<ActorSupervisionEvent>,
2653+
) -> Result<Self, anyhow::Error> {
2654+
Ok(Self(supervision_events))
2655+
}
2656+
2657+
async fn handle_supervision_event(
2658+
&mut self,
2659+
_this: &Instance<Self>,
2660+
event: &ActorSupervisionEvent,
2661+
) -> Result<bool, anyhow::Error> {
2662+
self.0.send(event.clone()).unwrap();
2663+
Ok(true)
2664+
}
2665+
}
2666+
2667+
let proc = Proc::local();
2668+
2669+
let (event_tx, mut event_rx) = tokio::sync::mpsc::unbounded_channel();
2670+
2671+
let parent = proc.spawn::<ParentActor>("parent", event_tx).await.unwrap();
2672+
let child = proc
2673+
.spawn_child::<FailingSupervisionActor>(parent.cell().clone(), ())
2674+
.await
2675+
.unwrap();
2676+
let grandchild = proc
2677+
.spawn_child::<FailingSupervisionActor>(child.cell().clone(), ())
2678+
.await
2679+
.unwrap();
2680+
2681+
let child_actor_id = child.actor_id().clone();
2682+
let grandchild_actor_id = grandchild.actor_id().clone();
2683+
2684+
// Grandchild fails, triggering failure up the tree, finally receiving
2685+
// the event at the root.
2686+
grandchild.send("trigger failure".to_string()).unwrap();
2687+
2688+
assert!(grandchild.await.is_failed());
2689+
assert!(child.await.is_failed());
2690+
2691+
assert_eq!(
2692+
event_rx.recv().await.unwrap(),
2693+
ActorSupervisionEvent {
2694+
actor_id: child_actor_id,
2695+
actor_status: ActorStatus::Failed(
2696+
"failed to handle supervision event: failed to handle supervision event!"
2697+
.to_string()
2698+
),
2699+
message_headers: None,
2700+
caused_by: Some(Box::new(ActorSupervisionEvent {
2701+
actor_id: grandchild_actor_id,
2702+
actor_status: ActorStatus::Failed(
2703+
"serving local[0].parent[2]: processing error: trigger failure".to_string()
2704+
),
2705+
message_headers: None,
2706+
caused_by: None,
2707+
})),
2708+
}
2709+
);
2710+
2711+
assert!(event_rx.try_recv().is_err());
2712+
}
2713+
25402714
#[tokio::test]
25412715
async fn test_proc_terminate_without_coordinator() {
25422716
if std::env::var("CARGO_TEST").is_ok() {

hyperactor/src/supervision.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,4 +31,23 @@ pub struct ActorSupervisionEvent {
3131
/// If this event is associated with a message, the message headers.
3232
#[derivative(PartialEq = "ignore")]
3333
pub message_headers: Option<Attrs>,
34+
/// Optional supervision event that caused this event, for recursive propagation.
35+
pub caused_by: Option<Box<ActorSupervisionEvent>>,
36+
}
37+
38+
impl std::error::Error for ActorSupervisionEvent {}
39+
40+
impl fmt::Display for ActorSupervisionEvent {
41+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
42+
write!(f, "{}: {}", self.actor_id, self.actor_status)?;
43+
if let Some(message_headers) = &self.message_headers {
44+
let headers = serde_json::to_string(&message_headers)
45+
.expect("could not serialize message headers");
46+
write!(f, " (headers: {})", headers)?;
47+
}
48+
if let Some(caused_by) = &self.caused_by {
49+
write!(f, ": caused by: {})", caused_by)?;
50+
}
51+
Ok(())
52+
}
3453
}

0 commit comments

Comments
 (0)