From 769afa5a9278c71f390188a0ae0ced90740de84a Mon Sep 17 00:00:00 2001 From: Drew Galbraith Date: Wed, 25 Feb 2026 23:02:20 -0800 Subject: [PATCH] UI Sync --- TODO.md | 8 +++ src/app/mod.rs | 4 +- src/core/orchestrator.rs | 150 ++++++++++++++++++++++----------------- src/core/types.rs | 18 ++++- src/tui/events.rs | 76 +++++++++++++++++--- src/tui/input.rs | 1 + src/tui/mod.rs | 16 ++++- src/tui/render.rs | 10 ++- 8 files changed, 199 insertions(+), 84 deletions(-) diff --git a/TODO.md b/TODO.md index b1c9ab5..dd8bb47 100644 --- a/TODO.md +++ b/TODO.md @@ -3,6 +3,14 @@ - Keep UI and orchestrator in sync (i.e. messages display out of order if you queue up many.) - `:clear` clears TUI state immediately but sends `ClearHistory` to orchestrator async. A mid-stream response can ghost back in after clear. Need synchronization (e.g. clear on `TurnComplete`, or have orchestrator ack the clear). +# UX Improvements + +- Start tool uses collapsed. +- Allow traversing blocks with [, ] short cuts. +- Allow expanding and closing blocks with o (expand all with Ctrl+o) + +- Allow moving inside the text area with the arrows (and ctrl arrows) + # Scroll - `update_scroll` auto-follows in Insert mode, yanking viewport to bottom on mode switch. Only auto-follow when new content arrives (in `drain_ui_events`), not every frame. diff --git a/src/app/mod.rs b/src/app/mod.rs index 89ee428..c9858bd 100644 --- a/src/app/mod.rs +++ b/src/app/mod.rs @@ -25,7 +25,7 @@ use anyhow::Context; use tokio::sync::mpsc; use crate::core::orchestrator::Orchestrator; -use crate::core::types::{UIEvent, UserAction}; +use crate::core::types::{StampedEvent, UserAction}; use crate::provider::ClaudeProvider; use crate::sandbox::policy::SandboxPolicy; use crate::sandbox::{EnforcementMode, Sandbox}; @@ -85,7 +85,7 @@ pub async fn run(project_dir: &Path, yolo: bool) -> anyhow::Result<()> { // -- Channels ----------------------------------------------------------------- let (action_tx, action_rx) = mpsc::channel::(CHANNEL_CAP); - let (event_tx, event_rx) = mpsc::channel::(CHANNEL_CAP); + let (event_tx, event_rx) = mpsc::channel::(CHANNEL_CAP); // -- Tools & Orchestrator (background task) ------------------------------------ let tool_registry = ToolRegistry::default_tools(); diff --git a/src/core/orchestrator.rs b/src/core/orchestrator.rs index 146126d..319b657 100644 --- a/src/core/orchestrator.rs +++ b/src/core/orchestrator.rs @@ -4,7 +4,8 @@ use tracing::{debug, warn}; use crate::core::history::ConversationHistory; use crate::core::types::{ - ContentBlock, ConversationMessage, Role, StreamEvent, ToolDefinition, UIEvent, UserAction, + ContentBlock, ConversationMessage, Role, StampedEvent, StreamEvent, ToolDefinition, UIEvent, + UserAction, }; use crate::provider::ModelProvider; use crate::sandbox::Sandbox; @@ -109,10 +110,13 @@ pub struct Orchestrator

{ tool_registry: ToolRegistry, sandbox: Sandbox, action_rx: mpsc::Receiver, - event_tx: mpsc::Sender, + event_tx: mpsc::Sender, /// Messages typed by the user while an approval prompt is open. They are /// queued here and replayed as new turns once the current turn completes. queued_messages: Vec, + /// Monotonic epoch incremented on each `:clear`. Events are tagged with + /// this value so the TUI can discard stale in-flight messages. + epoch: u64, } impl Orchestrator

{ @@ -122,7 +126,7 @@ impl Orchestrator

{ tool_registry: ToolRegistry, sandbox: Sandbox, action_rx: mpsc::Receiver, - event_tx: mpsc::Sender, + event_tx: mpsc::Sender, ) -> Self { Self { history: ConversationHistory::new(), @@ -132,6 +136,25 @@ impl Orchestrator

{ action_rx, event_tx, queued_messages: Vec::new(), + epoch: 0, + } + } + + /// Send a [`UIEvent`] stamped with the current epoch. + /// + /// Drops the event silently if the channel is full or closed -- this is + /// intentional (TUI death should not stall the orchestrator), but the + /// debug log below makes backpressure visible during development. + async fn send(&self, event: UIEvent) { + let result = self + .event_tx + .send(StampedEvent { + epoch: self.epoch, + event, + }) + .await; + if result.is_err() { + debug!("ui event dropped -- channel closed or full"); } } @@ -155,7 +178,7 @@ impl Orchestrator

{ match event { StreamEvent::TextDelta(chunk) => { text_buf.push_str(&chunk); - let _ = self.event_tx.send(UIEvent::StreamDelta(chunk)).await; + self.send(UIEvent::StreamDelta(chunk)).await; } StreamEvent::ToolUseStart { id, name } => { // Flush any accumulated text before starting tool block. @@ -226,7 +249,7 @@ impl Orchestrator

{ match result { StreamResult::Error(msg) => { - let _ = self.event_tx.send(UIEvent::Error(msg)).await; + self.send(UIEvent::Error(msg)).await; return; } StreamResult::Done(blocks) => { @@ -241,7 +264,7 @@ impl Orchestrator

{ }); if !has_tool_use { - let _ = self.event_tx.send(UIEvent::TurnComplete).await; + self.send(UIEvent::TurnComplete).await; return; } @@ -269,16 +292,15 @@ impl Orchestrator

{ role: Role::User, content: tool_results, }); + } } } warn!("tool-use loop reached max iterations ({MAX_TOOL_ITERATIONS})"); - let _ = self - .event_tx - .send(UIEvent::Error( - "tool-use loop reached maximum iterations".to_string(), - )) - .await; + self.send(UIEvent::Error( + "tool-use loop reached maximum iterations".to_string(), + )) + .await; } /// Execute a single tool, handling approval if needed. @@ -308,24 +330,20 @@ impl Orchestrator

{ // Check approval. let approved = match risk { RiskLevel::AutoApprove => { - let _ = self - .event_tx - .send(UIEvent::ToolExecuting { - tool_name: tool_name.to_string(), - input_summary: input_summary.clone(), - }) - .await; + self.send(UIEvent::ToolExecuting { + tool_name: tool_name.to_string(), + input_summary: input_summary.clone(), + }) + .await; true } RiskLevel::RequiresApproval => { - let _ = self - .event_tx - .send(UIEvent::ToolApprovalRequest { - tool_use_id: tool_use_id.to_string(), - tool_name: tool_name.to_string(), - input_summary: input_summary.clone(), - }) - .await; + self.send(UIEvent::ToolApprovalRequest { + tool_use_id: tool_use_id.to_string(), + tool_name: tool_name.to_string(), + input_summary: input_summary.clone(), + }) + .await; // Wait for approval response from TUI. self.wait_for_approval(tool_use_id).await @@ -343,26 +361,22 @@ impl Orchestrator

{ let tool = self.tool_registry.get(tool_name).unwrap(); match tool.execute(input, &self.sandbox).await { Ok(output) => { - let _ = self - .event_tx - .send(UIEvent::ToolResult { - tool_name: tool_name.to_string(), - output_summary: truncate(&output.content, 200), - is_error: output.is_error, - }) - .await; + self.send(UIEvent::ToolResult { + tool_name: tool_name.to_string(), + output_summary: truncate(&output.content, 200), + is_error: output.is_error, + }) + .await; output } Err(e) => { let msg = e.to_string(); - let _ = self - .event_tx - .send(UIEvent::ToolResult { - tool_name: tool_name.to_string(), - output_summary: msg.clone(), - is_error: true, - }) - .await; + self.send(UIEvent::ToolResult { + tool_name: tool_name.to_string(), + output_summary: msg.clone(), + is_error: true, + }) + .await; ToolOutput { content: msg, is_error: true, @@ -391,7 +405,15 @@ impl Orchestrator

{ UserAction::SendMessage(text) => { self.queued_messages.push(text); } - _ => {} // discard stale approvals / ClearHistory during wait + UserAction::ClearHistory { epoch } => { + // Keep epoch in sync even while blocked on an approval + // prompt. Without this, events emitted after the approval + // resolves would carry the pre-clear epoch and be silently + // discarded by the TUI. + self.epoch = epoch; + self.history.clear(); + } + _ => {} // discard stale approvals } } false @@ -402,7 +424,8 @@ impl Orchestrator

{ while let Some(action) = self.action_rx.recv().await { match action { UserAction::Quit => break, - UserAction::ClearHistory => { + UserAction::ClearHistory { epoch } => { + self.epoch = epoch; self.history.clear(); } @@ -412,10 +435,7 @@ impl Orchestrator

{ UserAction::SetNetworkPolicy(allowed) => { self.sandbox.set_network_allowed(allowed); - let _ = self - .event_tx - .send(UIEvent::NetworkPolicyChanged(allowed)) - .await; + self.send(UIEvent::NetworkPolicyChanged(allowed)).await; } UserAction::SendMessage(text) => { @@ -474,7 +494,7 @@ mod tests { fn test_orchestrator( provider: P, action_rx: mpsc::Receiver, - event_tx: mpsc::Sender, + event_tx: mpsc::Sender, ) -> Orchestrator

{ use crate::sandbox::policy::SandboxPolicy; use crate::sandbox::{EnforcementMode, Sandbox}; @@ -494,11 +514,11 @@ mod tests { /// Collect all UIEvents that arrive within one orchestrator turn, stopping /// when the channel is drained after a `TurnComplete` or `Error`. - async fn collect_events(rx: &mut mpsc::Receiver) -> Vec { + async fn collect_events(rx: &mut mpsc::Receiver) -> Vec { let mut out = Vec::new(); - while let Ok(ev) = rx.try_recv() { - let done = matches!(ev, UIEvent::TurnComplete | UIEvent::Error(_)); - out.push(ev); + while let Ok(stamped) = rx.try_recv() { + let done = matches!(stamped.event, UIEvent::TurnComplete | UIEvent::Error(_)); + out.push(stamped.event); if done { break; } @@ -525,7 +545,7 @@ mod tests { ]); let (action_tx, action_rx) = mpsc::channel::(8); - let (event_tx, mut event_rx) = mpsc::channel::(16); + let (event_tx, mut event_rx) = mpsc::channel::(16); let orch = test_orchestrator(provider, action_rx, event_tx); let handle = tokio::spawn(orch.run()); @@ -563,7 +583,7 @@ mod tests { ]); let (action_tx, action_rx) = mpsc::channel::(8); - let (event_tx, mut event_rx) = mpsc::channel::(16); + let (event_tx, mut event_rx) = mpsc::channel::(16); let orch = test_orchestrator(provider, action_rx, event_tx); let handle = tokio::spawn(orch.run()); @@ -605,7 +625,7 @@ mod tests { } let (action_tx, action_rx) = mpsc::channel::(8); - let (event_tx, _event_rx) = mpsc::channel::(8); + let (event_tx, _event_rx) = mpsc::channel::(8); let orch = test_orchestrator(NeverCalledProvider, action_rx, event_tx); let handle = tokio::spawn(orch.run()); @@ -658,7 +678,7 @@ mod tests { let provider = MultiTurnMock { turns }; let (action_tx, action_rx) = mpsc::channel::(8); - let (event_tx, mut event_rx) = mpsc::channel::(32); + let (event_tx, mut event_rx) = mpsc::channel::(32); let orch = test_orchestrator(provider, action_rx, event_tx); let handle = tokio::spawn(orch.run()); @@ -730,7 +750,7 @@ mod tests { ]))); let (action_tx, action_rx) = mpsc::channel::(8); - let (event_tx, mut event_rx) = mpsc::channel::(32); + let (event_tx, mut event_rx) = mpsc::channel::(32); // Use a real ToolRegistry so read_file works. use crate::sandbox::policy::SandboxPolicy; @@ -817,7 +837,7 @@ mod tests { ]); let (action_tx, action_rx) = mpsc::channel::(8); - let (event_tx, mut event_rx) = mpsc::channel::(16); + let (event_tx, mut event_rx) = mpsc::channel::(16); let orch = test_orchestrator(provider, action_rx, event_tx); let handle = tokio::spawn(orch.run()); @@ -894,7 +914,7 @@ mod tests { ]))); let (action_tx, action_rx) = mpsc::channel::(16); - let (event_tx, mut event_rx) = mpsc::channel::(64); + let (event_tx, mut event_rx) = mpsc::channel::(64); let orch = test_orchestrator(MultiCallMock { turns }, action_rx, event_tx); let handle = tokio::spawn(orch.run()); @@ -929,7 +949,7 @@ mod tests { // Collect everything. let mut all_events = Vec::new(); while let Ok(ev) = event_rx.try_recv() { - all_events.push(ev); + all_events.push(ev.event); } // The queued message must have produced "queued reply". @@ -962,7 +982,7 @@ mod tests { } let (action_tx, action_rx) = mpsc::channel::(8); - let (event_tx, mut event_rx) = mpsc::channel::(8); + let (event_tx, mut event_rx) = mpsc::channel::(8); let orch = test_orchestrator(NeverCalledProvider, action_rx, event_tx); let handle = tokio::spawn(orch.run()); @@ -974,8 +994,8 @@ mod tests { tokio::time::sleep(std::time::Duration::from_millis(10)).await; let mut found = false; - while let Ok(ev) = event_rx.try_recv() { - if matches!(ev, UIEvent::NetworkPolicyChanged(true)) { + while let Ok(stamped) = event_rx.try_recv() { + if matches!(stamped.event, UIEvent::NetworkPolicyChanged(true)) { found = true; } } diff --git a/src/core/types.rs b/src/core/types.rs index c02d90a..212aa82 100644 --- a/src/core/types.rs +++ b/src/core/types.rs @@ -31,7 +31,10 @@ pub enum UserAction { /// The user has requested to quit. Quit, /// The user has requested to clear conversation history. - ClearHistory, + /// + /// Carries the new epoch so the orchestrator can tag subsequent events, + /// allowing the TUI to discard stale in-flight events from before the clear. + ClearHistory { epoch: u64 }, /// The user has toggled the network policy via `:net on/off`. SetNetworkPolicy(bool), } @@ -66,6 +69,19 @@ pub enum UIEvent { Error(String), } +/// A [`UIEvent`] tagged with the orchestrator's current epoch. +/// +/// The TUI discards any `StampedEvent` whose epoch is older than its own, +/// preventing stale in-flight events (e.g. stream deltas from before a +/// `:clear`) from repopulating the message list. +#[derive(Debug)] +pub struct StampedEvent { + /// The orchestrator epoch at the time this event was emitted. + pub epoch: u64, + /// The wrapped event. + pub event: UIEvent, +} + /// The role of a participant in a conversation. #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "lowercase")] diff --git a/src/tui/events.rs b/src/tui/events.rs index e39fa58..064c8bb 100644 --- a/src/tui/events.rs +++ b/src/tui/events.rs @@ -4,7 +4,7 @@ use tokio::sync::mpsc; use tracing::debug; use super::AppState; -use crate::core::types::{Role, UIEvent}; +use crate::core::types::{Role, StampedEvent, UIEvent}; /// Drain all pending [`UIEvent`]s from `event_rx` and apply them to `state`. /// @@ -19,15 +19,25 @@ use crate::core::types::{Role, UIEvent}; /// | `ToolResult` | Display tool result | /// | `TurnComplete` | No structural change; logged at debug level | /// | `Error(msg)` | Push `(Assistant, "[error] {msg}")` | -pub(super) fn drain_ui_events(event_rx: &mut mpsc::Receiver, state: &mut AppState) { - while let Ok(event) = event_rx.try_recv() { - match event { +pub(super) fn drain_ui_events(event_rx: &mut mpsc::Receiver, state: &mut AppState) { + while let Ok(stamped) = event_rx.try_recv() { + // Discard events from before the most recent :clear. + if stamped.epoch < state.epoch { + debug!( + event_epoch = stamped.epoch, + state_epoch = state.epoch, + "dropping stale event" + ); + continue; + } + match stamped.event { UIEvent::StreamDelta(chunk) => { if let Some((Role::Assistant, content)) = state.messages.last_mut() { content.push_str(&chunk); } else { state.messages.push((Role::Assistant, chunk)); } + state.content_changed = true; } UIEvent::ToolApprovalRequest { tool_use_id, @@ -47,6 +57,7 @@ pub(super) fn drain_ui_events(event_rx: &mut mpsc::Receiver, state: &mu state .messages .push((Role::Assistant, format!("[{tool_name}] {input_summary}"))); + state.content_changed = true; } UIEvent::ToolResult { tool_name, @@ -58,6 +69,7 @@ pub(super) fn drain_ui_events(event_rx: &mut mpsc::Receiver, state: &mu Role::Assistant, format!("[{tool_name} {prefix}] {output_summary}"), )); + state.content_changed = true; } UIEvent::TurnComplete => { debug!("turn complete"); @@ -69,6 +81,7 @@ pub(super) fn drain_ui_events(event_rx: &mut mpsc::Receiver, state: &mu state .messages .push((Role::Assistant, format!("[error] {msg}"))); + state.content_changed = true; } } } @@ -86,12 +99,17 @@ pub struct PendingApproval { mod tests { use super::*; + /// Wrap a [`UIEvent`] in a [`StampedEvent`] at epoch 0 for tests. + fn stamp(event: UIEvent) -> StampedEvent { + StampedEvent { epoch: 0, event } + } + #[tokio::test] async fn drain_appends_to_existing_assistant() { let (tx, mut rx) = tokio::sync::mpsc::channel(8); let mut state = AppState::new(); state.messages.push((Role::Assistant, "hello".to_string())); - tx.send(UIEvent::StreamDelta(" world".to_string())) + tx.send(stamp(UIEvent::StreamDelta(" world".to_string()))) .await .unwrap(); drop(tx); @@ -104,7 +122,7 @@ mod tests { let (tx, mut rx) = tokio::sync::mpsc::channel(8); let mut state = AppState::new(); state.messages.push((Role::User, "hi".to_string())); - tx.send(UIEvent::StreamDelta("hello".to_string())) + tx.send(stamp(UIEvent::StreamDelta("hello".to_string()))) .await .unwrap(); drop(tx); @@ -118,11 +136,11 @@ mod tests { async fn drain_tool_approval_sets_pending() { let (tx, mut rx) = tokio::sync::mpsc::channel(8); let mut state = AppState::new(); - tx.send(UIEvent::ToolApprovalRequest { + tx.send(stamp(UIEvent::ToolApprovalRequest { tool_use_id: "t1".to_string(), tool_name: "write_file".to_string(), input_summary: "path: foo.txt".to_string(), - }) + })) .await .unwrap(); drop(tx); @@ -136,11 +154,11 @@ mod tests { async fn drain_tool_result_adds_message() { let (tx, mut rx) = tokio::sync::mpsc::channel(8); let mut state = AppState::new(); - tx.send(UIEvent::ToolResult { + tx.send(stamp(UIEvent::ToolResult { tool_name: "read_file".to_string(), output_summary: "file contents...".to_string(), is_error: false, - }) + })) .await .unwrap(); drop(tx); @@ -148,4 +166,42 @@ mod tests { assert_eq!(state.messages.len(), 1); assert!(state.messages[0].1.contains("read_file result")); } + + #[tokio::test] + async fn drain_discards_stale_epoch_events() { + let (tx, mut rx) = tokio::sync::mpsc::channel(8); + let mut state = AppState::new(); + state.epoch = 2; + // Event from epoch 1 should be discarded. + tx.send(StampedEvent { + epoch: 1, + event: UIEvent::StreamDelta("ghost".to_string()), + }) + .await + .unwrap(); + // Event from epoch 2 should be accepted. + tx.send(StampedEvent { + epoch: 2, + event: UIEvent::StreamDelta("real".to_string()), + }) + .await + .unwrap(); + drop(tx); + drain_ui_events(&mut rx, &mut state); + assert_eq!(state.messages.len(), 1); + assert_eq!(state.messages[0].1, "real"); + } + + #[tokio::test] + async fn drain_sets_content_changed() { + let (tx, mut rx) = tokio::sync::mpsc::channel(8); + let mut state = AppState::new(); + assert!(!state.content_changed); + tx.send(stamp(UIEvent::StreamDelta("hi".to_string()))) + .await + .unwrap(); + drop(tx); + drain_ui_events(&mut rx, &mut state); + assert!(state.content_changed); + } } diff --git a/src/tui/input.rs b/src/tui/input.rs index fb4e316..0d86d9e 100644 --- a/src/tui/input.rs +++ b/src/tui/input.rs @@ -198,6 +198,7 @@ fn execute_command(buf: &str, state: &mut AppState) -> Option { match buf.trim() { "quit" | "q" => Some(LoopControl::Quit), "clear" => { + state.epoch += 1; state.messages.clear(); state.scroll = 0; Some(LoopControl::ClearHistory) diff --git a/src/tui/mod.rs b/src/tui/mod.rs index dcb231c..98fd726 100644 --- a/src/tui/mod.rs +++ b/src/tui/mod.rs @@ -26,7 +26,7 @@ use ratatui::Terminal; use ratatui::backend::CrosstermBackend; use tokio::sync::mpsc; -use crate::core::types::{Role, UIEvent, UserAction}; +use crate::core::types::{Role, StampedEvent, UserAction}; /// Errors that can occur in the TUI layer. #[derive(Debug, thiserror::Error)] @@ -78,6 +78,12 @@ pub struct AppState { pub sandbox_yolo: bool, /// Whether network access is currently allowed. pub network_allowed: bool, + /// Monotonic epoch incremented on `:clear`. Events with an older epoch are + /// discarded by `drain_ui_events` to prevent ghost messages. + pub epoch: u64, + /// Set by `drain_ui_events` when message content changes; consumed by + /// `update_scroll` to auto-follow only when new content arrives. + pub content_changed: bool, } impl AppState { @@ -94,6 +100,8 @@ impl AppState { pending_approval: None, sandbox_yolo: false, network_allowed: false, + epoch: 0, + content_changed: false, } } } @@ -150,7 +158,7 @@ pub fn install_panic_hook() { /// returns `Ok(())`. pub async fn run( action_tx: mpsc::Sender, - mut event_rx: mpsc::Receiver, + mut event_rx: mpsc::Receiver, sandbox_yolo: bool, ) -> Result<(), TuiError> { install_panic_hook(); @@ -194,7 +202,9 @@ pub async fn run( break; } Some(input::LoopControl::ClearHistory) => { - let _ = action_tx.send(UserAction::ClearHistory).await; + let _ = action_tx + .send(UserAction::ClearHistory { epoch: state.epoch }) + .await; } Some(input::LoopControl::ToolApproval { tool_use_id, diff --git a/src/tui/render.rs b/src/tui/render.rs index ab221c7..bb32b57 100644 --- a/src/tui/render.rs +++ b/src/tui/render.rs @@ -38,15 +38,18 @@ pub(super) fn update_scroll(state: &mut AppState, area: Rect) { let max_scroll = total_lines.saturating_sub(viewport_height); match state.mode { - // In Insert mode, auto-follow the bottom of the conversation. - Mode::Insert => { + // In Insert mode, auto-follow only when new content has arrived. + Mode::Insert if state.content_changed => { state.scroll = max_scroll; } - // In Normal/Command mode, the user controls scroll -- just clamp to bounds. + // Otherwise, just clamp to bounds. _ => { state.scroll = state.scroll.min(max_scroll); } } + // Reset unconditionally -- this is the single place that consumes the flag, + // regardless of which scroll branch ran above. + state.content_changed = false; } /// Compute the overlay rectangle for the command palette, centered on `output_area`. @@ -298,6 +301,7 @@ mod tests { for i in 0..50 { state.messages.push((Role::User, format!("message {i}"))); } + state.content_changed = true; // simulate new content arriving let area = Rect::new(0, 0, 80, 24); update_scroll(&mut state, area); assert!(state.scroll > 0, "expected scroll > 0 with 50 messages"); -- 2.49.1