Skip to content

[hyperactor] simplify supervision propagation; unhandled events always cause failure #784

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions hyperactor/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,8 +260,8 @@ where
/// with the ID of the actor being served.
#[derive(Debug)]
pub struct ActorError {
actor_id: ActorId,
kind: ActorErrorKind,
pub(crate) actor_id: ActorId,
pub(crate) kind: ActorErrorKind,
}

/// The kinds of actor serving errors.
Expand Down Expand Up @@ -300,6 +300,10 @@ pub enum ActorErrorKind {
#[error("actor is in an indeterminate state")]
IndeterminateState,

/// An actor supervision event was not handled.
#[error("supervision: {0}")]
UnhandledSupervisionEvent(#[from] ActorSupervisionEvent),

/// A special kind of error that allows us to clone errors: we can keep the
/// error string, but we lose the error structure.
#[error("{0}")]
Expand Down Expand Up @@ -349,6 +353,15 @@ impl From<MailboxSenderError> for ActorError {
}
}

impl From<ActorSupervisionEvent> for ActorError {
fn from(inner: ActorSupervisionEvent) -> Self {
Self::new(
inner.actor_id.clone(),
ActorErrorKind::UnhandledSupervisionEvent(inner),
)
}
}

/// A collection of signals to control the behavior of the actor.
/// Signals are internal runtime control plane messages and should not be
/// sent outside of the runtime.
Expand Down
1 change: 1 addition & 0 deletions hyperactor/src/mailbox/undeliverable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ pub fn supervise_undeliverable_messages(
envelope
)),
message_headers: Some(envelope.headers().clone()),
caused_by: None,
})
.is_err()
{
Expand Down
211 changes: 168 additions & 43 deletions hyperactor/src/proc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -964,13 +964,30 @@ impl<A: Actor> Instance<A> {

let result = self.run_actor_tree(&mut actor, actor_loop_receivers).await;

let actor_status = match result {
Ok(_) => ActorStatus::Stopped,
Err(err) => ActorStatus::Failed(err.to_string()),
let (actor_status, event) = match result {
Ok(_) => (ActorStatus::Stopped, None),
Err(ActorError {
kind: ActorErrorKind::UnhandledSupervisionEvent(event),
..
}) => (event.actor_status.clone(), Some(event)),
Err(err) => (
ActorStatus::Failed(err.to_string()),
Some(ActorSupervisionEvent {
actor_id: self.cell.actor_id().clone(),
actor_status: ActorStatus::Failed(err.to_string()),
message_headers: None,
caused_by: None,
}),
),
};

let result = self.cell.maybe_unlink_parent();
if let Some(parent) = result {
if let Some(parent) = self.cell.maybe_unlink_parent() {
if let Some(event) = event {
// Parent exists, failure should be propagated to the parent.
parent.send_supervision_event_or_crash(event);
}
// TODO: we should get rid of this signal, and use *only* supervision events for
// the purpose of conveying lifecycle changes
if let Err(err) = parent.signal(Signal::ChildStopped(self.cell.pid())) {
tracing::error!(
"{}: failed to send stop message to parent pid {}: {:?}",
Expand All @@ -979,26 +996,14 @@ impl<A: Actor> Instance<A> {
err
);
}
if actor_status.is_failed() {
// Parent exists, failure should be propagated to the parent.
parent.send_supervision_event_or_crash(ActorSupervisionEvent {
actor_id: self.cell.actor_id().clone(),
actor_status: actor_status.clone(),
message_headers: None,
});
}
} else {
// Failure happened to the root actor or orphaned child actors.
// In either case, the failure should be propagated to proc.
//
// Note that orphaned actor is unexpected and would only happen if
// there is a bug.
if actor_status.is_failed() {
self.proc.handle_supervision_event(ActorSupervisionEvent {
actor_id: self.cell.actor_id().clone(),
actor_status: actor_status.clone(),
message_headers: None,
})
if let Some(event) = event {
self.proc.handle_supervision_event(event);
}
}
self.change_status(actor_status);
Expand Down Expand Up @@ -1103,7 +1108,7 @@ impl<A: Actor> Instance<A> {
let work = work.expect("inconsistent work queue state");
if let Err(err) = work.handle(actor, self).await {
for supervision_event in supervision_event_receiver.drain() {
self.handle_supervision_event(actor, supervision_event).await;
self.handle_supervision_event(actor, supervision_event).await?;
}
return Err(ActorError::new(self.self_id().clone(), ActorErrorKind::Processing(err)));
}
Expand All @@ -1122,7 +1127,7 @@ impl<A: Actor> Instance<A> {
}
}
Ok(supervision_event) = supervision_event_receiver.recv() => {
self.handle_supervision_event(actor, supervision_event).await;
self.handle_supervision_event(actor, supervision_event).await?;
}
}
self.cell
Expand Down Expand Up @@ -1154,24 +1159,41 @@ impl<A: Actor> Instance<A> {
&self,
actor: &mut A,
supervision_event: ActorSupervisionEvent,
) {
) -> Result<(), ActorError> {
// Handle the supervision event with the current actor.
if let Ok(false) = actor
match actor
.handle_supervision_event(self, &supervision_event)
.await
{
// The supervision event wasn't handled by this actor, try to bubble it up.
let result = self.cell.get_parent_cell();
if let Some(parent) = result {
parent.send_supervision_event_or_crash(supervision_event);
} else {
// Reaching here means the actor is either a root actor, or an orphaned
// child actor (i.e. the parent actor was dropped unexpectedly). In either
// case, the supervision event should be sent to proc.
//
// Note that orphaned actor is unexpected and would only happen if there
// is a bug.
self.proc.handle_supervision_event(supervision_event);
Ok(true) => {
// The supervision event was handled by this actor, nothing more to do.
Ok(())
}
Ok(false) => {
// The supervision event wasn't handled by this actor, chain it and bubble it up.
let supervision_event = ActorSupervisionEvent {
actor_id: self.self_id().clone(),
actor_status: ActorStatus::Failed(
"did not handle supervision event".to_string(),
),
message_headers: None,
caused_by: Some(Box::new(supervision_event)),
};
Err(supervision_event.into())
}
Err(err) => {
// The actor failed to handle the supervision event, it should die.
// Create a new supervision event for this failure and propagate it.
let supervision_event = ActorSupervisionEvent {
actor_id: self.self_id().clone(),
actor_status: ActorStatus::Failed(format!(
"failed to handle supervision event: {}",
err
)),
message_headers: None,
caused_by: Some(Box::new(supervision_event)),
};
Err(supervision_event.into())
}
}
}
Expand Down Expand Up @@ -2174,12 +2196,12 @@ mod tests {

// TODO: should we provide finer-grained stop reasons, e.g., to indicate it was
// stopped by a parent failure?
assert_matches!(root_2_1.await, ActorStatus::Stopped);

for actor in [root_1, root] {
// The other actors were unaffected.
assert_matches!(*actor.status().borrow(), ActorStatus::Idle);
}
assert_eq!(
root.await,
ActorStatus::Failed("did not handle supervision event".to_string())
);
assert_eq!(root_2_1.await, ActorStatus::Stopped);
assert_eq!(root_1.await, ActorStatus::Stopped);
}

#[tokio::test]
Expand Down Expand Up @@ -2602,8 +2624,111 @@ mod tests {
assert!(!root_2_1_state.load(Ordering::SeqCst));
assert_eq!(
reported_event.event().map(|e| e.actor_id.clone()),
Some(root_2_1.actor_id().clone())
Some(root.actor_id().clone())
);
}

#[tokio::test]
async fn test_supervision_event_handler_propagates() {
#[derive(Debug)]
struct FailingSupervisionActor;

#[async_trait]
impl Actor for FailingSupervisionActor {
type Params = ();

async fn new(_: ()) -> Result<Self, anyhow::Error> {
Ok(Self)
}

async fn handle_supervision_event(
&mut self,
_this: &Instance<Self>,
_event: &ActorSupervisionEvent,
) -> Result<bool, anyhow::Error> {
anyhow::bail!("failed to handle supervision event!")
}
}

#[async_trait]
impl Handler<String> for FailingSupervisionActor {
async fn handle(
&mut self,
_cx: &crate::Context<Self>,
message: String,
) -> anyhow::Result<()> {
Err(anyhow::anyhow!(message))
}
}

#[derive(Debug)]
struct ParentActor(tokio::sync::mpsc::UnboundedSender<ActorSupervisionEvent>);

#[async_trait]
impl Actor for ParentActor {
type Params = tokio::sync::mpsc::UnboundedSender<ActorSupervisionEvent>;

async fn new(
supervision_events: tokio::sync::mpsc::UnboundedSender<ActorSupervisionEvent>,
) -> Result<Self, anyhow::Error> {
Ok(Self(supervision_events))
}

async fn handle_supervision_event(
&mut self,
_this: &Instance<Self>,
event: &ActorSupervisionEvent,
) -> Result<bool, anyhow::Error> {
self.0.send(event.clone()).unwrap();
Ok(true)
}
}

let proc = Proc::local();

let (event_tx, mut event_rx) = tokio::sync::mpsc::unbounded_channel();

let parent = proc.spawn::<ParentActor>("parent", event_tx).await.unwrap();
let child = proc
.spawn_child::<FailingSupervisionActor>(parent.cell().clone(), ())
.await
.unwrap();
let grandchild = proc
.spawn_child::<FailingSupervisionActor>(child.cell().clone(), ())
.await
.unwrap();

let child_actor_id = child.actor_id().clone();
let grandchild_actor_id = grandchild.actor_id().clone();

// Grandchild fails, triggering failure up the tree, finally receiving
// the event at the root.
grandchild.send("trigger failure".to_string()).unwrap();

assert!(grandchild.await.is_failed());
assert!(child.await.is_failed());

assert_eq!(
event_rx.recv().await.unwrap(),
ActorSupervisionEvent {
actor_id: child_actor_id,
actor_status: ActorStatus::Failed(
"failed to handle supervision event: failed to handle supervision event!"
.to_string()
),
message_headers: None,
caused_by: Some(Box::new(ActorSupervisionEvent {
actor_id: grandchild_actor_id,
actor_status: ActorStatus::Failed(
"serving local[0].parent[2]: processing error: trigger failure".to_string()
),
message_headers: None,
caused_by: None,
})),
}
);

assert!(event_rx.try_recv().is_err());
}

#[tokio::test]
Expand All @@ -2615,7 +2740,7 @@ mod tests {
impl Actor for TestActor {
type Params = ();

async fn new(_param: ()) -> Result<Self, anyhow::Error> {
async fn new(_params: ()) -> Result<Self, anyhow::Error> {
Ok(Self)
}
}
Expand Down
23 changes: 22 additions & 1 deletion hyperactor/src/supervision.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,36 @@ pub struct ActorSupervisionEvent {
/// If this event is associated with a message, the message headers.
#[derivative(PartialEq = "ignore")]
pub message_headers: Option<Attrs>,
/// Optional supervision event that caused this event, for recursive propagation.
pub caused_by: Option<Box<ActorSupervisionEvent>>,
}

impl ActorSupervisionEvent {
/// Compute an actor status from this event, ensuring that "caused-by"
/// events are included in failure states. This should be used as the
/// actor status when reporting events to users.
pub fn status(&self) -> ActorStatus {
match &self.actor_status {
ActorStatus::Failed(msg) => {
ActorStatus::Failed(format!("{}: {}", self.to_string(), msg))
}
status => status.clone(),
}
}
}

impl std::error::Error for ActorSupervisionEvent {}

impl fmt::Display for ActorSupervisionEvent {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}: {}", self.actor_id, self.actor_status)?;
if let Some(message_headers) = &self.message_headers {
let headers = serde_json::to_string(&message_headers)
.expect("could not serialize message headers");
write!(f, " headers: {}", headers)?;
write!(f, " (headers: {})", headers)?;
}
if let Some(caused_by) = &self.caused_by {
write!(f, ": caused by: {})", caused_by)?;
}
Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion hyperactor/src/test_utils/proc_supervison.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ impl ProcSupervisionCoordinator {
let coordinator = proc
.spawn::<ProcSupervisionCoordinator>("coordinator", state.clone())
.await?;
proc.set_supervision_coordinator(coordinator.port::<ActorSupervisionEvent>())?;
proc.set_supervision_coordinator(coordinator.port())?;
Ok(state)
}
}
Expand Down
4 changes: 3 additions & 1 deletion hyperactor_mesh/src/proc_mesh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,7 @@ impl ProcEvents {
actor_id: proc_id.actor_id("any", 0),
actor_status: ActorStatus::Failed(format!("proc {} is stopped", proc_id)),
message_headers: None,
caused_by: None,
};
if entry.value().send(event).is_err() {
tracing::warn!("unable to transmit supervision event to actor {}", entry.key());
Expand Down Expand Up @@ -617,6 +618,7 @@ impl ProcEvents {
};
let actor_id = event.actor_id.clone();
let actor_status = event.actor_status.clone();
let reason = event.to_string();
let Some(rank) = self.ranks.get(actor_id.proc_id()) else {
tracing::warn!("received supervision event for unmapped actor {}", actor_id);
continue;
Expand All @@ -641,7 +643,7 @@ impl ProcEvents {
);

// Send this event to Python proc mesh to keep its health status up to date.
break Some(ProcEvent::Crashed(*rank, actor_status.to_string()))
break Some(ProcEvent::Crashed(*rank, reason))
}
}
}
Expand Down
Loading