Skip to content

Commit b5ecd8f

Browse files
mariusaefacebook-github-bot
authored andcommitted
simplify supervision propagation; unhandled events always cause failure (#784)
Summary: Pull Request resolved: #784 Currently (local) supervision can propagate events without also killing an intermediate actor. This is 1) wrong; and 2) complicated. Instead, we treat an unhandled supervision event as an actor failure, and then reduce the propagation paths to one: that of an actor failing. In order to retain accurate attribution, we add a "caused_by" field to the actor supervision events. ghstack-source-id: 301725856 exported-using-ghexport Reviewed By: shayne-fletcher Differential Revision: D79702385 fbshipit-source-id: ccce9e04b206973b6a9b372180a967bbffda79a5
1 parent 3a651e2 commit b5ecd8f

File tree

8 files changed

+213
-50
lines changed

8 files changed

+213
-50
lines changed

hyperactor/src/actor.rs

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -260,8 +260,8 @@ where
260260
/// with the ID of the actor being served.
261261
#[derive(Debug)]
262262
pub struct ActorError {
263-
actor_id: ActorId,
264-
kind: ActorErrorKind,
263+
pub(crate) actor_id: ActorId,
264+
pub(crate) kind: ActorErrorKind,
265265
}
266266

267267
/// The kinds of actor serving errors.
@@ -300,6 +300,10 @@ pub enum ActorErrorKind {
300300
#[error("actor is in an indeterminate state")]
301301
IndeterminateState,
302302

303+
/// An actor supervision event was not handled.
304+
#[error("supervision: {0}")]
305+
UnhandledSupervisionEvent(#[from] ActorSupervisionEvent),
306+
303307
/// A special kind of error that allows us to clone errors: we can keep the
304308
/// error string, but we lose the error structure.
305309
#[error("{0}")]
@@ -349,6 +353,15 @@ impl From<MailboxSenderError> for ActorError {
349353
}
350354
}
351355

356+
impl From<ActorSupervisionEvent> for ActorError {
357+
fn from(inner: ActorSupervisionEvent) -> Self {
358+
Self::new(
359+
inner.actor_id.clone(),
360+
ActorErrorKind::UnhandledSupervisionEvent(inner),
361+
)
362+
}
363+
}
364+
352365
/// A collection of signals to control the behavior of the actor.
353366
/// Signals are internal runtime control plane messages and should not be
354367
/// sent outside of the runtime.

hyperactor/src/mailbox/undeliverable.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,7 @@ pub fn supervise_undeliverable_messages(
164164
envelope
165165
)),
166166
message_headers: Some(envelope.headers().clone()),
167+
caused_by: None,
167168
})
168169
.is_err()
169170
{

hyperactor/src/proc.rs

Lines changed: 168 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -964,13 +964,30 @@ impl<A: Actor> Instance<A> {
964964

965965
let result = self.run_actor_tree(&mut actor, actor_loop_receivers).await;
966966

967-
let actor_status = match result {
968-
Ok(_) => ActorStatus::Stopped,
969-
Err(err) => ActorStatus::Failed(err.to_string()),
967+
let (actor_status, event) = match result {
968+
Ok(_) => (ActorStatus::Stopped, None),
969+
Err(ActorError {
970+
kind: ActorErrorKind::UnhandledSupervisionEvent(event),
971+
..
972+
}) => (event.actor_status.clone(), Some(event)),
973+
Err(err) => (
974+
ActorStatus::Failed(err.to_string()),
975+
Some(ActorSupervisionEvent {
976+
actor_id: self.cell.actor_id().clone(),
977+
actor_status: ActorStatus::Failed(err.to_string()),
978+
message_headers: None,
979+
caused_by: None,
980+
}),
981+
),
970982
};
971983

972-
let result = self.cell.maybe_unlink_parent();
973-
if let Some(parent) = result {
984+
if let Some(parent) = self.cell.maybe_unlink_parent() {
985+
if let Some(event) = event {
986+
// Parent exists, failure should be propagated to the parent.
987+
parent.send_supervision_event_or_crash(event);
988+
}
989+
// TODO: we should get rid of this signal, and use *only* supervision events for
990+
// the purpose of conveying lifecycle changes
974991
if let Err(err) = parent.signal(Signal::ChildStopped(self.cell.pid())) {
975992
tracing::error!(
976993
"{}: failed to send stop message to parent pid {}: {:?}",
@@ -979,26 +996,14 @@ impl<A: Actor> Instance<A> {
979996
err
980997
);
981998
}
982-
if actor_status.is_failed() {
983-
// Parent exists, failure should be propagated to the parent.
984-
parent.send_supervision_event_or_crash(ActorSupervisionEvent {
985-
actor_id: self.cell.actor_id().clone(),
986-
actor_status: actor_status.clone(),
987-
message_headers: None,
988-
});
989-
}
990999
} else {
9911000
// Failure happened to the root actor or orphaned child actors.
9921001
// In either case, the failure should be propagated to proc.
9931002
//
9941003
// Note that orphaned actor is unexpected and would only happen if
9951004
// there is a bug.
996-
if actor_status.is_failed() {
997-
self.proc.handle_supervision_event(ActorSupervisionEvent {
998-
actor_id: self.cell.actor_id().clone(),
999-
actor_status: actor_status.clone(),
1000-
message_headers: None,
1001-
})
1005+
if let Some(event) = event {
1006+
self.proc.handle_supervision_event(event);
10021007
}
10031008
}
10041009
self.change_status(actor_status);
@@ -1103,7 +1108,7 @@ impl<A: Actor> Instance<A> {
11031108
let work = work.expect("inconsistent work queue state");
11041109
if let Err(err) = work.handle(actor, self).await {
11051110
for supervision_event in supervision_event_receiver.drain() {
1106-
self.handle_supervision_event(actor, supervision_event).await;
1111+
self.handle_supervision_event(actor, supervision_event).await?;
11071112
}
11081113
return Err(ActorError::new(self.self_id().clone(), ActorErrorKind::Processing(err)));
11091114
}
@@ -1122,7 +1127,7 @@ impl<A: Actor> Instance<A> {
11221127
}
11231128
}
11241129
Ok(supervision_event) = supervision_event_receiver.recv() => {
1125-
self.handle_supervision_event(actor, supervision_event).await;
1130+
self.handle_supervision_event(actor, supervision_event).await?;
11261131
}
11271132
}
11281133
self.cell
@@ -1154,24 +1159,41 @@ impl<A: Actor> Instance<A> {
11541159
&self,
11551160
actor: &mut A,
11561161
supervision_event: ActorSupervisionEvent,
1157-
) {
1162+
) -> Result<(), ActorError> {
11581163
// Handle the supervision event with the current actor.
1159-
if let Ok(false) = actor
1164+
match actor
11601165
.handle_supervision_event(self, &supervision_event)
11611166
.await
11621167
{
1163-
// The supervision event wasn't handled by this actor, try to bubble it up.
1164-
let result = self.cell.get_parent_cell();
1165-
if let Some(parent) = result {
1166-
parent.send_supervision_event_or_crash(supervision_event);
1167-
} else {
1168-
// Reaching here means the actor is either a root actor, or an orphaned
1169-
// child actor (i.e. the parent actor was dropped unexpectedly). In either
1170-
// case, the supervision event should be sent to proc.
1171-
//
1172-
// Note that orphaned actor is unexpected and would only happen if there
1173-
// is a bug.
1174-
self.proc.handle_supervision_event(supervision_event);
1168+
Ok(true) => {
1169+
// The supervision event was handled by this actor, nothing more to do.
1170+
Ok(())
1171+
}
1172+
Ok(false) => {
1173+
// The supervision event wasn't handled by this actor, chain it and bubble it up.
1174+
let supervision_event = ActorSupervisionEvent {
1175+
actor_id: self.self_id().clone(),
1176+
actor_status: ActorStatus::Failed(
1177+
"did not handle supervision event".to_string(),
1178+
),
1179+
message_headers: None,
1180+
caused_by: Some(Box::new(supervision_event)),
1181+
};
1182+
Err(supervision_event.into())
1183+
}
1184+
Err(err) => {
1185+
// The actor failed to handle the supervision event, it should die.
1186+
// Create a new supervision event for this failure and propagate it.
1187+
let supervision_event = ActorSupervisionEvent {
1188+
actor_id: self.self_id().clone(),
1189+
actor_status: ActorStatus::Failed(format!(
1190+
"failed to handle supervision event: {}",
1191+
err
1192+
)),
1193+
message_headers: None,
1194+
caused_by: Some(Box::new(supervision_event)),
1195+
};
1196+
Err(supervision_event.into())
11751197
}
11761198
}
11771199
}
@@ -2174,12 +2196,12 @@ mod tests {
21742196

21752197
// TODO: should we provide finer-grained stop reasons, e.g., to indicate it was
21762198
// stopped by a parent failure?
2177-
assert_matches!(root_2_1.await, ActorStatus::Stopped);
2178-
2179-
for actor in [root_1, root] {
2180-
// The other actors were unaffected.
2181-
assert_matches!(*actor.status().borrow(), ActorStatus::Idle);
2182-
}
2199+
assert_eq!(
2200+
root.await,
2201+
ActorStatus::Failed("did not handle supervision event".to_string())
2202+
);
2203+
assert_eq!(root_2_1.await, ActorStatus::Stopped);
2204+
assert_eq!(root_1.await, ActorStatus::Stopped);
21832205
}
21842206

21852207
#[tokio::test]
@@ -2602,8 +2624,111 @@ mod tests {
26022624
assert!(!root_2_1_state.load(Ordering::SeqCst));
26032625
assert_eq!(
26042626
reported_event.event().map(|e| e.actor_id.clone()),
2605-
Some(root_2_1.actor_id().clone())
2627+
Some(root.actor_id().clone())
2628+
);
2629+
}
2630+
2631+
#[tokio::test]
2632+
async fn test_supervision_event_handler_propagates() {
2633+
#[derive(Debug)]
2634+
struct FailingSupervisionActor;
2635+
2636+
#[async_trait]
2637+
impl Actor for FailingSupervisionActor {
2638+
type Params = ();
2639+
2640+
async fn new(_: ()) -> Result<Self, anyhow::Error> {
2641+
Ok(Self)
2642+
}
2643+
2644+
async fn handle_supervision_event(
2645+
&mut self,
2646+
_this: &Instance<Self>,
2647+
_event: &ActorSupervisionEvent,
2648+
) -> Result<bool, anyhow::Error> {
2649+
anyhow::bail!("failed to handle supervision event!")
2650+
}
2651+
}
2652+
2653+
#[async_trait]
2654+
impl Handler<String> for FailingSupervisionActor {
2655+
async fn handle(
2656+
&mut self,
2657+
_cx: &crate::Context<Self>,
2658+
message: String,
2659+
) -> anyhow::Result<()> {
2660+
Err(anyhow::anyhow!(message))
2661+
}
2662+
}
2663+
2664+
#[derive(Debug)]
2665+
struct ParentActor(tokio::sync::mpsc::UnboundedSender<ActorSupervisionEvent>);
2666+
2667+
#[async_trait]
2668+
impl Actor for ParentActor {
2669+
type Params = tokio::sync::mpsc::UnboundedSender<ActorSupervisionEvent>;
2670+
2671+
async fn new(
2672+
supervision_events: tokio::sync::mpsc::UnboundedSender<ActorSupervisionEvent>,
2673+
) -> Result<Self, anyhow::Error> {
2674+
Ok(Self(supervision_events))
2675+
}
2676+
2677+
async fn handle_supervision_event(
2678+
&mut self,
2679+
_this: &Instance<Self>,
2680+
event: &ActorSupervisionEvent,
2681+
) -> Result<bool, anyhow::Error> {
2682+
self.0.send(event.clone()).unwrap();
2683+
Ok(true)
2684+
}
2685+
}
2686+
2687+
let proc = Proc::local();
2688+
2689+
let (event_tx, mut event_rx) = tokio::sync::mpsc::unbounded_channel();
2690+
2691+
let parent = proc.spawn::<ParentActor>("parent", event_tx).await.unwrap();
2692+
let child = proc
2693+
.spawn_child::<FailingSupervisionActor>(parent.cell().clone(), ())
2694+
.await
2695+
.unwrap();
2696+
let grandchild = proc
2697+
.spawn_child::<FailingSupervisionActor>(child.cell().clone(), ())
2698+
.await
2699+
.unwrap();
2700+
2701+
let child_actor_id = child.actor_id().clone();
2702+
let grandchild_actor_id = grandchild.actor_id().clone();
2703+
2704+
// Grandchild fails, triggering failure up the tree, finally receiving
2705+
// the event at the root.
2706+
grandchild.send("trigger failure".to_string()).unwrap();
2707+
2708+
assert!(grandchild.await.is_failed());
2709+
assert!(child.await.is_failed());
2710+
2711+
assert_eq!(
2712+
event_rx.recv().await.unwrap(),
2713+
ActorSupervisionEvent {
2714+
actor_id: child_actor_id,
2715+
actor_status: ActorStatus::Failed(
2716+
"failed to handle supervision event: failed to handle supervision event!"
2717+
.to_string()
2718+
),
2719+
message_headers: None,
2720+
caused_by: Some(Box::new(ActorSupervisionEvent {
2721+
actor_id: grandchild_actor_id,
2722+
actor_status: ActorStatus::Failed(
2723+
"serving local[0].parent[2]: processing error: trigger failure".to_string()
2724+
),
2725+
message_headers: None,
2726+
caused_by: None,
2727+
})),
2728+
}
26062729
);
2730+
2731+
assert!(event_rx.try_recv().is_err());
26072732
}
26082733

26092734
#[tokio::test]
@@ -2615,7 +2740,7 @@ mod tests {
26152740
impl Actor for TestActor {
26162741
type Params = ();
26172742

2618-
async fn new(_param: ()) -> Result<Self, anyhow::Error> {
2743+
async fn new(_params: ()) -> Result<Self, anyhow::Error> {
26192744
Ok(Self)
26202745
}
26212746
}

hyperactor/src/supervision.rs

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,15 +32,36 @@ pub struct ActorSupervisionEvent {
3232
/// If this event is associated with a message, the message headers.
3333
#[derivative(PartialEq = "ignore")]
3434
pub message_headers: Option<Attrs>,
35+
/// Optional supervision event that caused this event, for recursive propagation.
36+
pub caused_by: Option<Box<ActorSupervisionEvent>>,
3537
}
3638

39+
impl ActorSupervisionEvent {
40+
/// Compute an actor status from this event, ensuring that "caused-by"
41+
/// events are included in failure states. This should be used as the
42+
/// actor status when reporting events to users.
43+
pub fn status(&self) -> ActorStatus {
44+
match &self.actor_status {
45+
ActorStatus::Failed(msg) => {
46+
ActorStatus::Failed(format!("{}: {}", self.to_string(), msg))
47+
}
48+
status => status.clone(),
49+
}
50+
}
51+
}
52+
53+
impl std::error::Error for ActorSupervisionEvent {}
54+
3755
impl fmt::Display for ActorSupervisionEvent {
3856
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3957
write!(f, "{}: {}", self.actor_id, self.actor_status)?;
4058
if let Some(message_headers) = &self.message_headers {
4159
let headers = serde_json::to_string(&message_headers)
4260
.expect("could not serialize message headers");
43-
write!(f, " headers: {}", headers)?;
61+
write!(f, " (headers: {})", headers)?;
62+
}
63+
if let Some(caused_by) = &self.caused_by {
64+
write!(f, ": caused by: {})", caused_by)?;
4465
}
4566
Ok(())
4667
}

hyperactor/src/test_utils/proc_supervison.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ impl ProcSupervisionCoordinator {
4545
let coordinator = proc
4646
.spawn::<ProcSupervisionCoordinator>("coordinator", state.clone())
4747
.await?;
48-
proc.set_supervision_coordinator(coordinator.port::<ActorSupervisionEvent>())?;
48+
proc.set_supervision_coordinator(coordinator.port())?;
4949
Ok(state)
5050
}
5151
}

hyperactor_mesh/src/proc_mesh.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -588,6 +588,7 @@ impl ProcEvents {
588588
actor_id: proc_id.actor_id("any", 0),
589589
actor_status: ActorStatus::Failed(format!("proc {} is stopped", proc_id)),
590590
message_headers: None,
591+
caused_by: None,
591592
};
592593
if entry.value().send(event).is_err() {
593594
tracing::warn!("unable to transmit supervision event to actor {}", entry.key());
@@ -617,6 +618,7 @@ impl ProcEvents {
617618
};
618619
let actor_id = event.actor_id.clone();
619620
let actor_status = event.actor_status.clone();
621+
let reason = event.to_string();
620622
let Some(rank) = self.ranks.get(actor_id.proc_id()) else {
621623
tracing::warn!("received supervision event for unmapped actor {}", actor_id);
622624
continue;
@@ -641,7 +643,7 @@ impl ProcEvents {
641643
);
642644

643645
// Send this event to Python proc mesh to keep its health status up to date.
644-
break Some(ProcEvent::Crashed(*rank, actor_status.to_string()))
646+
break Some(ProcEvent::Crashed(*rank, reason))
645647
}
646648
}
647649
}

0 commit comments

Comments
 (0)