Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/chat-cli-ui/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ tokio.workspace = true
eyre.workspace = true
tokio-util.workspace = true
futures.workspace = true
rustyline.workspace = true
ratatui = "0.29.0"

[target.'cfg(unix)'.dependencies]
Expand Down
265 changes: 241 additions & 24 deletions crates/chat-cli-ui/src/conduit.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use std::future;
use std::io::Write as _;
use std::marker::PhantomData;
use std::path::PathBuf;
use std::pin::Pin;

use crossterm::style::{
self,
Expand All @@ -10,11 +13,20 @@ use crossterm::{
execute,
queue,
};

use crate::legacy_ui_util::ThemeSource;
use rustyline::EditMode;
use tokio::signal::ctrl_c;
use tracing::error;

use crate::legacy_ui_util::{
ThemeSource,
generate_prompt,
rl,
};
use crate::protocol::{
Event,
InputEvent,
LegacyPassThroughOutput,
MetaEvent,
ToolCallRejection,
ToolCallStart,
};
Expand All @@ -25,7 +37,7 @@ const CONTINUATION_LINE: &str = " ⋮ ";
#[derive(thiserror::Error, Debug)]
pub enum ConduitError {
#[error(transparent)]
Send(#[from] Box<std::sync::mpsc::SendError<Event>>),
Send(#[from] Box<tokio::sync::mpsc::error::SendError<Event>>),
#[error(transparent)]
Utf8(#[from] std::string::FromUtf8Error),
#[error("No event set")]
Expand All @@ -40,23 +52,92 @@ pub enum ConduitError {
/// - To deliver state changes from the control layer to the view layer
pub struct ViewEnd {
/// Used by the view to send input to the control
// TODO: later on we will need replace this byte array with an actual event type from ACP
pub sender: tokio::sync::mpsc::Sender<Vec<u8>>,
pub sender: tokio::sync::mpsc::Sender<InputEvent>,
/// To receive messages from control about state changes
pub receiver: std::sync::mpsc::Receiver<Event>,
pub receiver: tokio::sync::mpsc::UnboundedReceiver<Event>,
}

impl ViewEnd {
/// Method to facilitate in the interim
/// It takes possible messages from the old even loop and queues write to the output provided
/// This blocks the current thread and consumes the [ViewEnd]
/// Converts the ViewEnd into legacy mode operation. This mainly serves a purpose in the
/// following circumstances:
/// - To preserve the UX of the current event loop while abstracting away the impl Write it
/// writes to
/// - To serve as an interim UI for the new event loop while preserving the UX of the current
/// product while the new UI is being worked out
///
/// # Parameters
///
/// * `ui_managed_input` - When true, the UI layer will manage user input through readline. When
/// false, input handling is delegated to the event loop (via InputSource).
/// * `ui_managed_ctrl_c` - When true, the UI layer will handle Ctrl+C interrupts. When false,
/// interrupt handling is delegated to the event loop (via its own ctrl c handler).
/// * `theme_source` - Provider for terminal styling and theming information.
/// * `stderr` - Standard error stream for error output.
/// * `stdout` - Standard output stream for normal output.
///
/// # Returns
///
/// Returns `Ok(())` on successful initialization, or a `ConduitError` if setup fails.
pub fn into_legacy_mode(
self,
mut self,
ui_managed_input: bool,
ui_managed_ctrl_c: bool,
theme_source: impl ThemeSource,
mut stderr: std::io::Stderr,
mut stdout: std::io::Stdout,
) -> Result<(), ConduitError> {
while let Ok(event) = self.receiver.recv() {
#[derive(Debug)]
enum IncomingEvent {
Input(String),
Interrupt,
}

#[derive(Clone, Debug)]
struct PromptSignal {
active_agent: Option<String>,
trust_all: bool,
is_in_tangent_mode: bool,
available_prompts: Vec<String>,
history_path: PathBuf,
available_commands: Vec<String>,
edit_mode: EditMode,
history_hints_enabled: bool,
usage_percentage: Option<f32>,
}

impl Default for PromptSignal {
fn default() -> Self {
Self {
active_agent: Default::default(),
trust_all: Default::default(),
available_prompts: Default::default(),
is_in_tangent_mode: Default::default(),
history_path: Default::default(),
available_commands: Default::default(),
history_hints_enabled: Default::default(),
usage_percentage: Default::default(),
edit_mode: EditMode::Emacs,
}
}
}

#[derive(Default, Debug)]
enum DisplayState {
Prompting,
UserInsertingText,
StreamingOutput,
#[default]
Hidden,
}

#[inline]
fn handle_session_event_legacy_mode(
event: Event,
stderr: &mut std::io::Stderr,
stdout: &mut std::io::Stdout,
theme_source: &impl ThemeSource,
display_state: Option<&mut DisplayState>,
) -> Result<(), ConduitError> {
match event {
Event::LegacyPassThrough(content) => match content {
LegacyPassThroughOutput::Stderr(content) => {
Expand All @@ -74,9 +155,17 @@ impl ViewEnd {
Event::StepStarted(_step_started) => {},
Event::StepFinished(_step_finished) => {},
Event::TextMessageStart(_text_message_start) => {
if let Some(display_state) = display_state {
*display_state = DisplayState::StreamingOutput;
}

queue!(stdout, theme_source.success_fg(), Print("> "), theme_source.reset(),)?;
},
Event::TextMessageContent(text_message_content) => {
if let Some(display_state) = display_state {
*display_state = DisplayState::StreamingOutput;
}

stdout.write_all(&text_message_content.delta)?;
stdout.flush()?;
},
Expand All @@ -86,6 +175,10 @@ impl ViewEnd {
},
Event::TextMessageChunk(_text_message_chunk) => {},
Event::ToolCallStart(tool_call_start) => {
if let Some(display_state) = display_state {
*display_state = DisplayState::StreamingOutput;
}

let ToolCallStart {
tool_call_name,
is_trusted,
Expand Down Expand Up @@ -134,12 +227,8 @@ impl ViewEnd {
execute!(stdout, style::Print(tool_call_args.delta))?;
}
},
Event::ToolCallEnd(_tool_call_end) => {
// noop for now
},
Event::ToolCallResult(_tool_call_result) => {
// noop for now (currently we don't show the tool call results to users)
},
Event::ToolCallEnd(_tool_call_end) => {},
Event::ToolCallResult(_tool_call_result) => {},
Event::StateSnapshot(_state_snapshot) => {},
Event::StateDelta(_state_delta) => {},
Event::MessagesSnapshot(_messages_snapshot) => {},
Expand All @@ -153,7 +242,17 @@ impl ViewEnd {
Event::ReasoningMessageEnd(_reasoning_message_end) => {},
Event::ReasoningMessageChunk(_reasoning_message_chunk) => {},
Event::ReasoningEnd(_reasoning_end) => {},
Event::MetaEvent(_meta_event) => {},
Event::MetaEvent(MetaEvent { meta_type, payload }) => {
if meta_type.as_str() == "timing" {
if let serde_json::Value::String(s) = payload {
if s.as_str() == "prompt_user" {
if let Some(display_state) = display_state {
*display_state = DisplayState::Prompting;
}
}
}
}
},
Event::ToolCallRejection(tool_call_rejection) => {
let ToolCallRejection { reason, name, .. } = tool_call_rejection;

Expand All @@ -171,6 +270,124 @@ impl ViewEnd {
)?;
},
}

Ok::<(), ConduitError>(())
}

if ui_managed_input {
let (incoming_events_tx, mut incoming_events_rx) = tokio::sync::mpsc::unbounded_channel::<IncomingEvent>();
let (prompt_signal_tx, prompt_signal_rx) = std::sync::mpsc::channel::<PromptSignal>();

tokio::task::spawn_blocking(move || {
while let Ok(prompt_signal) = prompt_signal_rx.recv() {
let PromptSignal {
active_agent,
trust_all,
available_prompts,
history_path,
available_commands,
edit_mode,
history_hints_enabled,
is_in_tangent_mode,
usage_percentage,
} = prompt_signal;

let mut rl = rl(
history_hints_enabled,
edit_mode,
history_path,
available_commands,
available_prompts,
)
.expect("Failed to spawn readline");

let prompt =
generate_prompt(active_agent.as_deref(), trust_all, is_in_tangent_mode, usage_percentage);

match rl.readline(&prompt) {
Ok(input) => {
_ = incoming_events_tx.send(IncomingEvent::Input(input));
},
Err(rustyline::error::ReadlineError::Interrupted) => {
_ = incoming_events_tx.send(IncomingEvent::Interrupt);
},
Err(e) => panic!("Failed to spawn readline: {:?}", e),
};

drop(rl);
}
});

tokio::spawn(async move {
let mut display_state = DisplayState::default();
let prompt_signal = PromptSignal::default();

loop {
let ctrl_c_handler: Pin<
Box<dyn Future<Output = Result<(), std::io::Error>> + Send + Sync + 'static>,
>;

if matches!(display_state, DisplayState::Prompting) {
if let Err(e) = prompt_signal_tx.send(prompt_signal.clone()) {
error!("Error sending prompt signal: {:?}", e);
}
display_state = DisplayState::UserInsertingText;

ctrl_c_handler = Box::pin(future::pending());
} else if ui_managed_ctrl_c {
ctrl_c_handler = Box::pin(ctrl_c());
} else {
ctrl_c_handler = Box::pin(future::pending());
}

tokio::select! {
_ = ctrl_c_handler => {
_ = self.sender.send(InputEvent::Interrupt).await;
},
Some(incoming_event) = incoming_events_rx.recv() => {
match display_state {
DisplayState::UserInsertingText => {
match incoming_event {
IncomingEvent::Input(content) => {
if let Err(e) = self.sender.send(InputEvent::Text(content)).await {
error!("Error sending input event: {:?}", e);
}
display_state = DisplayState::StreamingOutput;
},
IncomingEvent::Interrupt => {
display_state = DisplayState::default();
_ = self.sender.send(InputEvent::Interrupt).await;
},
}
},
DisplayState::StreamingOutput if matches!(incoming_event, IncomingEvent::Interrupt)=> {
_ = self.sender.send(InputEvent::Interrupt).await;
},
DisplayState::Hidden | DisplayState::StreamingOutput | DisplayState::Prompting => {
// We ignore everything that's not a sigint here
}
}
},
session_event = self.receiver.recv() => {
if let Some(event) = session_event {
handle_session_event_legacy_mode(event, &mut stderr, &mut stdout, &theme_source, Some(&mut display_state))?;
} else {
break;
}
}
}
}

Ok::<(), ConduitError>(())
});
} else {
tokio::spawn(async move {
while let Some(event) = self.receiver.recv().await {
handle_session_event_legacy_mode(event, &mut stderr, &mut stdout, &theme_source, None)?;
}

Ok::<(), ConduitError>(())
});
}

Ok(())
Expand All @@ -184,15 +401,15 @@ pub struct DestinationStderr;
#[derive(Clone, Debug)]
pub struct DestinationStructuredOutput;

pub type InputReceiver = tokio::sync::mpsc::Receiver<Vec<u8>>;
pub type InputReceiver = tokio::sync::mpsc::Receiver<InputEvent>;

/// This compliments the [ViewEnd]. It can be thought of as the "other end" of a pipe.
/// The control would own this.
#[derive(Debug)]
pub struct ControlEnd<T> {
pub current_event: Option<Event>,
/// Used by the control to send state changes to the view
pub sender: std::sync::mpsc::Sender<Event>,
pub sender: tokio::sync::mpsc::UnboundedSender<Event>,
/// Flag indicating whether structured events should be sent through the conduit.
/// When true, the control end will send structured event data in addition to
/// raw pass-through content, enabling richer communication between layers.
Expand Down Expand Up @@ -381,15 +598,15 @@ pub fn get_legacy_conduits(
ControlEnd<DestinationStderr>,
ControlEnd<DestinationStdout>,
) {
let (state_tx, state_rx) = std::sync::mpsc::channel::<Event>();
let (byte_tx, byte_rx) = tokio::sync::mpsc::channel::<Vec<u8>>(10);
let (state_tx, state_rx) = tokio::sync::mpsc::unbounded_channel::<Event>();
let (input_tx, input_rx) = tokio::sync::mpsc::channel::<InputEvent>(10);

(
ViewEnd {
sender: byte_tx,
sender: input_tx,
receiver: state_rx,
},
byte_rx,
input_rx,
ControlEnd {
current_event: None,
should_send_structured_event,
Expand Down
Loading