Fix some issues with UI getting out of sync. #7

Merged
drew merged 1 commit from drew/ui-sync into main 2026-03-11 06:53:50 +00:00
8 changed files with 199 additions and 84 deletions

View file

@ -3,6 +3,14 @@
- Keep UI and orchestrator in sync (i.e. messages display out of order if you queue up many.) - Keep UI and orchestrator in sync (i.e. messages display out of order if you queue up many.)
- `:clear` clears TUI state immediately but sends `ClearHistory` to orchestrator async. A mid-stream response can ghost back in after clear. Need synchronization (e.g. clear on `TurnComplete`, or have orchestrator ack the clear). - `:clear` clears TUI state immediately but sends `ClearHistory` to orchestrator async. A mid-stream response can ghost back in after clear. Need synchronization (e.g. clear on `TurnComplete`, or have orchestrator ack the clear).
# UX Improvements
- Start tool uses collapsed.
- Allow traversing blocks with [, ] short cuts.
- Allow expanding and closing blocks with o (expand all with Ctrl+o)
- Allow moving inside the text area with the arrows (and ctrl arrows)
# Scroll # Scroll
- `update_scroll` auto-follows in Insert mode, yanking viewport to bottom on mode switch. Only auto-follow when new content arrives (in `drain_ui_events`), not every frame. - `update_scroll` auto-follows in Insert mode, yanking viewport to bottom on mode switch. Only auto-follow when new content arrives (in `drain_ui_events`), not every frame.

View file

@ -25,7 +25,7 @@ use anyhow::Context;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use crate::core::orchestrator::Orchestrator; use crate::core::orchestrator::Orchestrator;
use crate::core::types::{UIEvent, UserAction}; use crate::core::types::{StampedEvent, UserAction};
use crate::provider::ClaudeProvider; use crate::provider::ClaudeProvider;
use crate::sandbox::policy::SandboxPolicy; use crate::sandbox::policy::SandboxPolicy;
use crate::sandbox::{EnforcementMode, Sandbox}; use crate::sandbox::{EnforcementMode, Sandbox};
@ -85,7 +85,7 @@ pub async fn run(project_dir: &Path, yolo: bool) -> anyhow::Result<()> {
// -- Channels ----------------------------------------------------------------- // -- Channels -----------------------------------------------------------------
let (action_tx, action_rx) = mpsc::channel::<UserAction>(CHANNEL_CAP); let (action_tx, action_rx) = mpsc::channel::<UserAction>(CHANNEL_CAP);
let (event_tx, event_rx) = mpsc::channel::<UIEvent>(CHANNEL_CAP); let (event_tx, event_rx) = mpsc::channel::<StampedEvent>(CHANNEL_CAP);
// -- Tools & Orchestrator (background task) ------------------------------------ // -- Tools & Orchestrator (background task) ------------------------------------
let tool_registry = ToolRegistry::default_tools(); let tool_registry = ToolRegistry::default_tools();

View file

@ -4,7 +4,8 @@ use tracing::{debug, warn};
use crate::core::history::ConversationHistory; use crate::core::history::ConversationHistory;
use crate::core::types::{ use crate::core::types::{
ContentBlock, ConversationMessage, Role, StreamEvent, ToolDefinition, UIEvent, UserAction, ContentBlock, ConversationMessage, Role, StampedEvent, StreamEvent, ToolDefinition, UIEvent,
UserAction,
}; };
use crate::provider::ModelProvider; use crate::provider::ModelProvider;
use crate::sandbox::Sandbox; use crate::sandbox::Sandbox;
@ -109,10 +110,13 @@ pub struct Orchestrator<P> {
tool_registry: ToolRegistry, tool_registry: ToolRegistry,
sandbox: Sandbox, sandbox: Sandbox,
action_rx: mpsc::Receiver<UserAction>, action_rx: mpsc::Receiver<UserAction>,
event_tx: mpsc::Sender<UIEvent>, event_tx: mpsc::Sender<StampedEvent>,
/// Messages typed by the user while an approval prompt is open. They are /// Messages typed by the user while an approval prompt is open. They are
/// queued here and replayed as new turns once the current turn completes. /// queued here and replayed as new turns once the current turn completes.
queued_messages: Vec<String>, queued_messages: Vec<String>,
/// Monotonic epoch incremented on each `:clear`. Events are tagged with
/// this value so the TUI can discard stale in-flight messages.
epoch: u64,
} }
impl<P: ModelProvider> Orchestrator<P> { impl<P: ModelProvider> Orchestrator<P> {
@ -122,7 +126,7 @@ impl<P: ModelProvider> Orchestrator<P> {
tool_registry: ToolRegistry, tool_registry: ToolRegistry,
sandbox: Sandbox, sandbox: Sandbox,
action_rx: mpsc::Receiver<UserAction>, action_rx: mpsc::Receiver<UserAction>,
event_tx: mpsc::Sender<UIEvent>, event_tx: mpsc::Sender<StampedEvent>,
) -> Self { ) -> Self {
Self { Self {
history: ConversationHistory::new(), history: ConversationHistory::new(),
@ -132,6 +136,25 @@ impl<P: ModelProvider> Orchestrator<P> {
action_rx, action_rx,
event_tx, event_tx,
queued_messages: Vec::new(), queued_messages: Vec::new(),
epoch: 0,
}
}
/// Send a [`UIEvent`] stamped with the current epoch.
///
/// Drops the event silently if the channel is full or closed -- this is
/// intentional (TUI death should not stall the orchestrator), but the
/// debug log below makes backpressure visible during development.
async fn send(&self, event: UIEvent) {
let result = self
.event_tx
.send(StampedEvent {
epoch: self.epoch,
event,
})
.await;
if result.is_err() {
debug!("ui event dropped -- channel closed or full");
} }
} }
@ -155,7 +178,7 @@ impl<P: ModelProvider> Orchestrator<P> {
match event { match event {
StreamEvent::TextDelta(chunk) => { StreamEvent::TextDelta(chunk) => {
text_buf.push_str(&chunk); text_buf.push_str(&chunk);
let _ = self.event_tx.send(UIEvent::StreamDelta(chunk)).await; self.send(UIEvent::StreamDelta(chunk)).await;
} }
StreamEvent::ToolUseStart { id, name } => { StreamEvent::ToolUseStart { id, name } => {
// Flush any accumulated text before starting tool block. // Flush any accumulated text before starting tool block.
@ -226,7 +249,7 @@ impl<P: ModelProvider> Orchestrator<P> {
match result { match result {
StreamResult::Error(msg) => { StreamResult::Error(msg) => {
let _ = self.event_tx.send(UIEvent::Error(msg)).await; self.send(UIEvent::Error(msg)).await;
return; return;
} }
StreamResult::Done(blocks) => { StreamResult::Done(blocks) => {
@ -241,7 +264,7 @@ impl<P: ModelProvider> Orchestrator<P> {
}); });
if !has_tool_use { if !has_tool_use {
let _ = self.event_tx.send(UIEvent::TurnComplete).await; self.send(UIEvent::TurnComplete).await;
return; return;
} }
@ -269,16 +292,15 @@ impl<P: ModelProvider> Orchestrator<P> {
role: Role::User, role: Role::User,
content: tool_results, content: tool_results,
}); });
} }
} }
} }
warn!("tool-use loop reached max iterations ({MAX_TOOL_ITERATIONS})"); warn!("tool-use loop reached max iterations ({MAX_TOOL_ITERATIONS})");
let _ = self self.send(UIEvent::Error(
.event_tx "tool-use loop reached maximum iterations".to_string(),
.send(UIEvent::Error( ))
"tool-use loop reached maximum iterations".to_string(), .await;
))
.await;
} }
/// Execute a single tool, handling approval if needed. /// Execute a single tool, handling approval if needed.
@ -308,24 +330,20 @@ impl<P: ModelProvider> Orchestrator<P> {
// Check approval. // Check approval.
let approved = match risk { let approved = match risk {
RiskLevel::AutoApprove => { RiskLevel::AutoApprove => {
let _ = self self.send(UIEvent::ToolExecuting {
.event_tx tool_name: tool_name.to_string(),
.send(UIEvent::ToolExecuting { input_summary: input_summary.clone(),
tool_name: tool_name.to_string(), })
input_summary: input_summary.clone(), .await;
})
.await;
true true
} }
RiskLevel::RequiresApproval => { RiskLevel::RequiresApproval => {
let _ = self self.send(UIEvent::ToolApprovalRequest {
.event_tx tool_use_id: tool_use_id.to_string(),
.send(UIEvent::ToolApprovalRequest { tool_name: tool_name.to_string(),
tool_use_id: tool_use_id.to_string(), input_summary: input_summary.clone(),
tool_name: tool_name.to_string(), })
input_summary: input_summary.clone(), .await;
})
.await;
// Wait for approval response from TUI. // Wait for approval response from TUI.
self.wait_for_approval(tool_use_id).await self.wait_for_approval(tool_use_id).await
@ -343,26 +361,22 @@ impl<P: ModelProvider> Orchestrator<P> {
let tool = self.tool_registry.get(tool_name).unwrap(); let tool = self.tool_registry.get(tool_name).unwrap();
match tool.execute(input, &self.sandbox).await { match tool.execute(input, &self.sandbox).await {
Ok(output) => { Ok(output) => {
let _ = self self.send(UIEvent::ToolResult {
.event_tx tool_name: tool_name.to_string(),
.send(UIEvent::ToolResult { output_summary: truncate(&output.content, 200),
tool_name: tool_name.to_string(), is_error: output.is_error,
output_summary: truncate(&output.content, 200), })
is_error: output.is_error, .await;
})
.await;
output output
} }
Err(e) => { Err(e) => {
let msg = e.to_string(); let msg = e.to_string();
let _ = self self.send(UIEvent::ToolResult {
.event_tx tool_name: tool_name.to_string(),
.send(UIEvent::ToolResult { output_summary: msg.clone(),
tool_name: tool_name.to_string(), is_error: true,
output_summary: msg.clone(), })
is_error: true, .await;
})
.await;
ToolOutput { ToolOutput {
content: msg, content: msg,
is_error: true, is_error: true,
@ -391,7 +405,15 @@ impl<P: ModelProvider> Orchestrator<P> {
UserAction::SendMessage(text) => { UserAction::SendMessage(text) => {
self.queued_messages.push(text); self.queued_messages.push(text);
} }
_ => {} // discard stale approvals / ClearHistory during wait UserAction::ClearHistory { epoch } => {
// Keep epoch in sync even while blocked on an approval
// prompt. Without this, events emitted after the approval
// resolves would carry the pre-clear epoch and be silently
// discarded by the TUI.
self.epoch = epoch;
self.history.clear();
}
_ => {} // discard stale approvals
} }
} }
false false
@ -402,7 +424,8 @@ impl<P: ModelProvider> Orchestrator<P> {
while let Some(action) = self.action_rx.recv().await { while let Some(action) = self.action_rx.recv().await {
match action { match action {
UserAction::Quit => break, UserAction::Quit => break,
UserAction::ClearHistory => { UserAction::ClearHistory { epoch } => {
self.epoch = epoch;
self.history.clear(); self.history.clear();
} }
@ -412,10 +435,7 @@ impl<P: ModelProvider> Orchestrator<P> {
UserAction::SetNetworkPolicy(allowed) => { UserAction::SetNetworkPolicy(allowed) => {
self.sandbox.set_network_allowed(allowed); self.sandbox.set_network_allowed(allowed);
let _ = self self.send(UIEvent::NetworkPolicyChanged(allowed)).await;
.event_tx
.send(UIEvent::NetworkPolicyChanged(allowed))
.await;
} }
UserAction::SendMessage(text) => { UserAction::SendMessage(text) => {
@ -474,7 +494,7 @@ mod tests {
fn test_orchestrator<P: ModelProvider>( fn test_orchestrator<P: ModelProvider>(
provider: P, provider: P,
action_rx: mpsc::Receiver<UserAction>, action_rx: mpsc::Receiver<UserAction>,
event_tx: mpsc::Sender<UIEvent>, event_tx: mpsc::Sender<StampedEvent>,
) -> Orchestrator<P> { ) -> Orchestrator<P> {
use crate::sandbox::policy::SandboxPolicy; use crate::sandbox::policy::SandboxPolicy;
use crate::sandbox::{EnforcementMode, Sandbox}; use crate::sandbox::{EnforcementMode, Sandbox};
@ -494,11 +514,11 @@ mod tests {
/// Collect all UIEvents that arrive within one orchestrator turn, stopping /// Collect all UIEvents that arrive within one orchestrator turn, stopping
/// when the channel is drained after a `TurnComplete` or `Error`. /// when the channel is drained after a `TurnComplete` or `Error`.
async fn collect_events(rx: &mut mpsc::Receiver<UIEvent>) -> Vec<UIEvent> { async fn collect_events(rx: &mut mpsc::Receiver<StampedEvent>) -> Vec<UIEvent> {
let mut out = Vec::new(); let mut out = Vec::new();
while let Ok(ev) = rx.try_recv() { while let Ok(stamped) = rx.try_recv() {
let done = matches!(ev, UIEvent::TurnComplete | UIEvent::Error(_)); let done = matches!(stamped.event, UIEvent::TurnComplete | UIEvent::Error(_));
out.push(ev); out.push(stamped.event);
if done { if done {
break; break;
} }
@ -525,7 +545,7 @@ mod tests {
]); ]);
let (action_tx, action_rx) = mpsc::channel::<UserAction>(8); let (action_tx, action_rx) = mpsc::channel::<UserAction>(8);
let (event_tx, mut event_rx) = mpsc::channel::<UIEvent>(16); let (event_tx, mut event_rx) = mpsc::channel::<StampedEvent>(16);
let orch = test_orchestrator(provider, action_rx, event_tx); let orch = test_orchestrator(provider, action_rx, event_tx);
let handle = tokio::spawn(orch.run()); let handle = tokio::spawn(orch.run());
@ -563,7 +583,7 @@ mod tests {
]); ]);
let (action_tx, action_rx) = mpsc::channel::<UserAction>(8); let (action_tx, action_rx) = mpsc::channel::<UserAction>(8);
let (event_tx, mut event_rx) = mpsc::channel::<UIEvent>(16); let (event_tx, mut event_rx) = mpsc::channel::<StampedEvent>(16);
let orch = test_orchestrator(provider, action_rx, event_tx); let orch = test_orchestrator(provider, action_rx, event_tx);
let handle = tokio::spawn(orch.run()); let handle = tokio::spawn(orch.run());
@ -605,7 +625,7 @@ mod tests {
} }
let (action_tx, action_rx) = mpsc::channel::<UserAction>(8); let (action_tx, action_rx) = mpsc::channel::<UserAction>(8);
let (event_tx, _event_rx) = mpsc::channel::<UIEvent>(8); let (event_tx, _event_rx) = mpsc::channel::<StampedEvent>(8);
let orch = test_orchestrator(NeverCalledProvider, action_rx, event_tx); let orch = test_orchestrator(NeverCalledProvider, action_rx, event_tx);
let handle = tokio::spawn(orch.run()); let handle = tokio::spawn(orch.run());
@ -658,7 +678,7 @@ mod tests {
let provider = MultiTurnMock { turns }; let provider = MultiTurnMock { turns };
let (action_tx, action_rx) = mpsc::channel::<UserAction>(8); let (action_tx, action_rx) = mpsc::channel::<UserAction>(8);
let (event_tx, mut event_rx) = mpsc::channel::<UIEvent>(32); let (event_tx, mut event_rx) = mpsc::channel::<StampedEvent>(32);
let orch = test_orchestrator(provider, action_rx, event_tx); let orch = test_orchestrator(provider, action_rx, event_tx);
let handle = tokio::spawn(orch.run()); let handle = tokio::spawn(orch.run());
@ -730,7 +750,7 @@ mod tests {
]))); ])));
let (action_tx, action_rx) = mpsc::channel::<UserAction>(8); let (action_tx, action_rx) = mpsc::channel::<UserAction>(8);
let (event_tx, mut event_rx) = mpsc::channel::<UIEvent>(32); let (event_tx, mut event_rx) = mpsc::channel::<StampedEvent>(32);
// Use a real ToolRegistry so read_file works. // Use a real ToolRegistry so read_file works.
use crate::sandbox::policy::SandboxPolicy; use crate::sandbox::policy::SandboxPolicy;
@ -817,7 +837,7 @@ mod tests {
]); ]);
let (action_tx, action_rx) = mpsc::channel::<UserAction>(8); let (action_tx, action_rx) = mpsc::channel::<UserAction>(8);
let (event_tx, mut event_rx) = mpsc::channel::<UIEvent>(16); let (event_tx, mut event_rx) = mpsc::channel::<StampedEvent>(16);
let orch = test_orchestrator(provider, action_rx, event_tx); let orch = test_orchestrator(provider, action_rx, event_tx);
let handle = tokio::spawn(orch.run()); let handle = tokio::spawn(orch.run());
@ -894,7 +914,7 @@ mod tests {
]))); ])));
let (action_tx, action_rx) = mpsc::channel::<UserAction>(16); let (action_tx, action_rx) = mpsc::channel::<UserAction>(16);
let (event_tx, mut event_rx) = mpsc::channel::<UIEvent>(64); let (event_tx, mut event_rx) = mpsc::channel::<StampedEvent>(64);
let orch = test_orchestrator(MultiCallMock { turns }, action_rx, event_tx); let orch = test_orchestrator(MultiCallMock { turns }, action_rx, event_tx);
let handle = tokio::spawn(orch.run()); let handle = tokio::spawn(orch.run());
@ -929,7 +949,7 @@ mod tests {
// Collect everything. // Collect everything.
let mut all_events = Vec::new(); let mut all_events = Vec::new();
while let Ok(ev) = event_rx.try_recv() { while let Ok(ev) = event_rx.try_recv() {
all_events.push(ev); all_events.push(ev.event);
} }
// The queued message must have produced "queued reply". // The queued message must have produced "queued reply".
@ -962,7 +982,7 @@ mod tests {
} }
let (action_tx, action_rx) = mpsc::channel::<UserAction>(8); let (action_tx, action_rx) = mpsc::channel::<UserAction>(8);
let (event_tx, mut event_rx) = mpsc::channel::<UIEvent>(8); let (event_tx, mut event_rx) = mpsc::channel::<StampedEvent>(8);
let orch = test_orchestrator(NeverCalledProvider, action_rx, event_tx); let orch = test_orchestrator(NeverCalledProvider, action_rx, event_tx);
let handle = tokio::spawn(orch.run()); let handle = tokio::spawn(orch.run());
@ -974,8 +994,8 @@ mod tests {
tokio::time::sleep(std::time::Duration::from_millis(10)).await; tokio::time::sleep(std::time::Duration::from_millis(10)).await;
let mut found = false; let mut found = false;
while let Ok(ev) = event_rx.try_recv() { while let Ok(stamped) = event_rx.try_recv() {
if matches!(ev, UIEvent::NetworkPolicyChanged(true)) { if matches!(stamped.event, UIEvent::NetworkPolicyChanged(true)) {
found = true; found = true;
} }
} }

View file

@ -31,7 +31,10 @@ pub enum UserAction {
/// The user has requested to quit. /// The user has requested to quit.
Quit, Quit,
/// The user has requested to clear conversation history. /// The user has requested to clear conversation history.
ClearHistory, ///
/// Carries the new epoch so the orchestrator can tag subsequent events,
/// allowing the TUI to discard stale in-flight events from before the clear.
ClearHistory { epoch: u64 },
/// The user has toggled the network policy via `:net on/off`. /// The user has toggled the network policy via `:net on/off`.
SetNetworkPolicy(bool), SetNetworkPolicy(bool),
} }
@ -66,6 +69,19 @@ pub enum UIEvent {
Error(String), Error(String),
} }
/// A [`UIEvent`] tagged with the orchestrator's current epoch.
///
/// The TUI discards any `StampedEvent` whose epoch is older than its own,
/// preventing stale in-flight events (e.g. stream deltas from before a
/// `:clear`) from repopulating the message list.
#[derive(Debug)]
pub struct StampedEvent {
/// The orchestrator epoch at the time this event was emitted.
pub epoch: u64,
/// The wrapped event.
pub event: UIEvent,
}
/// The role of a participant in a conversation. /// The role of a participant in a conversation.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")] #[serde(rename_all = "lowercase")]

View file

@ -4,7 +4,7 @@ use tokio::sync::mpsc;
use tracing::debug; use tracing::debug;
use super::AppState; use super::AppState;
use crate::core::types::{Role, UIEvent}; use crate::core::types::{Role, StampedEvent, UIEvent};
/// Drain all pending [`UIEvent`]s from `event_rx` and apply them to `state`. /// Drain all pending [`UIEvent`]s from `event_rx` and apply them to `state`.
/// ///
@ -19,15 +19,25 @@ use crate::core::types::{Role, UIEvent};
/// | `ToolResult` | Display tool result | /// | `ToolResult` | Display tool result |
/// | `TurnComplete` | No structural change; logged at debug level | /// | `TurnComplete` | No structural change; logged at debug level |
/// | `Error(msg)` | Push `(Assistant, "[error] {msg}")` | /// | `Error(msg)` | Push `(Assistant, "[error] {msg}")` |
pub(super) fn drain_ui_events(event_rx: &mut mpsc::Receiver<UIEvent>, state: &mut AppState) { pub(super) fn drain_ui_events(event_rx: &mut mpsc::Receiver<StampedEvent>, state: &mut AppState) {
while let Ok(event) = event_rx.try_recv() { while let Ok(stamped) = event_rx.try_recv() {
match event { // Discard events from before the most recent :clear.
if stamped.epoch < state.epoch {
debug!(
event_epoch = stamped.epoch,
state_epoch = state.epoch,
"dropping stale event"
);
continue;
}
match stamped.event {
UIEvent::StreamDelta(chunk) => { UIEvent::StreamDelta(chunk) => {
if let Some((Role::Assistant, content)) = state.messages.last_mut() { if let Some((Role::Assistant, content)) = state.messages.last_mut() {
content.push_str(&chunk); content.push_str(&chunk);
} else { } else {
state.messages.push((Role::Assistant, chunk)); state.messages.push((Role::Assistant, chunk));
} }
state.content_changed = true;
} }
UIEvent::ToolApprovalRequest { UIEvent::ToolApprovalRequest {
tool_use_id, tool_use_id,
@ -47,6 +57,7 @@ pub(super) fn drain_ui_events(event_rx: &mut mpsc::Receiver<UIEvent>, state: &mu
state state
.messages .messages
.push((Role::Assistant, format!("[{tool_name}] {input_summary}"))); .push((Role::Assistant, format!("[{tool_name}] {input_summary}")));
state.content_changed = true;
} }
UIEvent::ToolResult { UIEvent::ToolResult {
tool_name, tool_name,
@ -58,6 +69,7 @@ pub(super) fn drain_ui_events(event_rx: &mut mpsc::Receiver<UIEvent>, state: &mu
Role::Assistant, Role::Assistant,
format!("[{tool_name} {prefix}] {output_summary}"), format!("[{tool_name} {prefix}] {output_summary}"),
)); ));
state.content_changed = true;
} }
UIEvent::TurnComplete => { UIEvent::TurnComplete => {
debug!("turn complete"); debug!("turn complete");
@ -69,6 +81,7 @@ pub(super) fn drain_ui_events(event_rx: &mut mpsc::Receiver<UIEvent>, state: &mu
state state
.messages .messages
.push((Role::Assistant, format!("[error] {msg}"))); .push((Role::Assistant, format!("[error] {msg}")));
state.content_changed = true;
} }
} }
} }
@ -86,12 +99,17 @@ pub struct PendingApproval {
mod tests { mod tests {
use super::*; use super::*;
/// Wrap a [`UIEvent`] in a [`StampedEvent`] at epoch 0 for tests.
fn stamp(event: UIEvent) -> StampedEvent {
StampedEvent { epoch: 0, event }
}
#[tokio::test] #[tokio::test]
async fn drain_appends_to_existing_assistant() { async fn drain_appends_to_existing_assistant() {
let (tx, mut rx) = tokio::sync::mpsc::channel(8); let (tx, mut rx) = tokio::sync::mpsc::channel(8);
let mut state = AppState::new(); let mut state = AppState::new();
state.messages.push((Role::Assistant, "hello".to_string())); state.messages.push((Role::Assistant, "hello".to_string()));
tx.send(UIEvent::StreamDelta(" world".to_string())) tx.send(stamp(UIEvent::StreamDelta(" world".to_string())))
.await .await
.unwrap(); .unwrap();
drop(tx); drop(tx);
@ -104,7 +122,7 @@ mod tests {
let (tx, mut rx) = tokio::sync::mpsc::channel(8); let (tx, mut rx) = tokio::sync::mpsc::channel(8);
let mut state = AppState::new(); let mut state = AppState::new();
state.messages.push((Role::User, "hi".to_string())); state.messages.push((Role::User, "hi".to_string()));
tx.send(UIEvent::StreamDelta("hello".to_string())) tx.send(stamp(UIEvent::StreamDelta("hello".to_string())))
.await .await
.unwrap(); .unwrap();
drop(tx); drop(tx);
@ -118,11 +136,11 @@ mod tests {
async fn drain_tool_approval_sets_pending() { async fn drain_tool_approval_sets_pending() {
let (tx, mut rx) = tokio::sync::mpsc::channel(8); let (tx, mut rx) = tokio::sync::mpsc::channel(8);
let mut state = AppState::new(); let mut state = AppState::new();
tx.send(UIEvent::ToolApprovalRequest { tx.send(stamp(UIEvent::ToolApprovalRequest {
tool_use_id: "t1".to_string(), tool_use_id: "t1".to_string(),
tool_name: "write_file".to_string(), tool_name: "write_file".to_string(),
input_summary: "path: foo.txt".to_string(), input_summary: "path: foo.txt".to_string(),
}) }))
.await .await
.unwrap(); .unwrap();
drop(tx); drop(tx);
@ -136,11 +154,11 @@ mod tests {
async fn drain_tool_result_adds_message() { async fn drain_tool_result_adds_message() {
let (tx, mut rx) = tokio::sync::mpsc::channel(8); let (tx, mut rx) = tokio::sync::mpsc::channel(8);
let mut state = AppState::new(); let mut state = AppState::new();
tx.send(UIEvent::ToolResult { tx.send(stamp(UIEvent::ToolResult {
tool_name: "read_file".to_string(), tool_name: "read_file".to_string(),
output_summary: "file contents...".to_string(), output_summary: "file contents...".to_string(),
is_error: false, is_error: false,
}) }))
.await .await
.unwrap(); .unwrap();
drop(tx); drop(tx);
@ -148,4 +166,42 @@ mod tests {
assert_eq!(state.messages.len(), 1); assert_eq!(state.messages.len(), 1);
assert!(state.messages[0].1.contains("read_file result")); assert!(state.messages[0].1.contains("read_file result"));
} }
#[tokio::test]
async fn drain_discards_stale_epoch_events() {
let (tx, mut rx) = tokio::sync::mpsc::channel(8);
let mut state = AppState::new();
state.epoch = 2;
// Event from epoch 1 should be discarded.
tx.send(StampedEvent {
epoch: 1,
event: UIEvent::StreamDelta("ghost".to_string()),
})
.await
.unwrap();
// Event from epoch 2 should be accepted.
tx.send(StampedEvent {
epoch: 2,
event: UIEvent::StreamDelta("real".to_string()),
})
.await
.unwrap();
drop(tx);
drain_ui_events(&mut rx, &mut state);
assert_eq!(state.messages.len(), 1);
assert_eq!(state.messages[0].1, "real");
}
#[tokio::test]
async fn drain_sets_content_changed() {
let (tx, mut rx) = tokio::sync::mpsc::channel(8);
let mut state = AppState::new();
assert!(!state.content_changed);
tx.send(stamp(UIEvent::StreamDelta("hi".to_string())))
.await
.unwrap();
drop(tx);
drain_ui_events(&mut rx, &mut state);
assert!(state.content_changed);
}
} }

View file

@ -198,6 +198,7 @@ fn execute_command(buf: &str, state: &mut AppState) -> Option<LoopControl> {
match buf.trim() { match buf.trim() {
"quit" | "q" => Some(LoopControl::Quit), "quit" | "q" => Some(LoopControl::Quit),
"clear" => { "clear" => {
state.epoch += 1;
state.messages.clear(); state.messages.clear();
state.scroll = 0; state.scroll = 0;
Some(LoopControl::ClearHistory) Some(LoopControl::ClearHistory)

View file

@ -26,7 +26,7 @@ use ratatui::Terminal;
use ratatui::backend::CrosstermBackend; use ratatui::backend::CrosstermBackend;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use crate::core::types::{Role, UIEvent, UserAction}; use crate::core::types::{Role, StampedEvent, UserAction};
/// Errors that can occur in the TUI layer. /// Errors that can occur in the TUI layer.
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
@ -78,6 +78,12 @@ pub struct AppState {
pub sandbox_yolo: bool, pub sandbox_yolo: bool,
/// Whether network access is currently allowed. /// Whether network access is currently allowed.
pub network_allowed: bool, pub network_allowed: bool,
/// Monotonic epoch incremented on `:clear`. Events with an older epoch are
/// discarded by `drain_ui_events` to prevent ghost messages.
pub epoch: u64,
/// Set by `drain_ui_events` when message content changes; consumed by
/// `update_scroll` to auto-follow only when new content arrives.
pub content_changed: bool,
} }
impl AppState { impl AppState {
@ -94,6 +100,8 @@ impl AppState {
pending_approval: None, pending_approval: None,
sandbox_yolo: false, sandbox_yolo: false,
network_allowed: false, network_allowed: false,
epoch: 0,
content_changed: false,
} }
} }
} }
@ -150,7 +158,7 @@ pub fn install_panic_hook() {
/// returns `Ok(())`. /// returns `Ok(())`.
pub async fn run( pub async fn run(
action_tx: mpsc::Sender<UserAction>, action_tx: mpsc::Sender<UserAction>,
mut event_rx: mpsc::Receiver<UIEvent>, mut event_rx: mpsc::Receiver<StampedEvent>,
sandbox_yolo: bool, sandbox_yolo: bool,
) -> Result<(), TuiError> { ) -> Result<(), TuiError> {
install_panic_hook(); install_panic_hook();
@ -194,7 +202,9 @@ pub async fn run(
break; break;
} }
Some(input::LoopControl::ClearHistory) => { Some(input::LoopControl::ClearHistory) => {
let _ = action_tx.send(UserAction::ClearHistory).await; let _ = action_tx
.send(UserAction::ClearHistory { epoch: state.epoch })
.await;
} }
Some(input::LoopControl::ToolApproval { Some(input::LoopControl::ToolApproval {
tool_use_id, tool_use_id,

View file

@ -38,15 +38,18 @@ pub(super) fn update_scroll(state: &mut AppState, area: Rect) {
let max_scroll = total_lines.saturating_sub(viewport_height); let max_scroll = total_lines.saturating_sub(viewport_height);
match state.mode { match state.mode {
// In Insert mode, auto-follow the bottom of the conversation. // In Insert mode, auto-follow only when new content has arrived.
Mode::Insert => { Mode::Insert if state.content_changed => {
state.scroll = max_scroll; state.scroll = max_scroll;
} }
// In Normal/Command mode, the user controls scroll -- just clamp to bounds. // Otherwise, just clamp to bounds.
_ => { _ => {
state.scroll = state.scroll.min(max_scroll); state.scroll = state.scroll.min(max_scroll);
} }
} }
// Reset unconditionally -- this is the single place that consumes the flag,
// regardless of which scroll branch ran above.
state.content_changed = false;
} }
/// Compute the overlay rectangle for the command palette, centered on `output_area`. /// Compute the overlay rectangle for the command palette, centered on `output_area`.
@ -298,6 +301,7 @@ mod tests {
for i in 0..50 { for i in 0..50 {
state.messages.push((Role::User, format!("message {i}"))); state.messages.push((Role::User, format!("message {i}")));
} }
state.content_changed = true; // simulate new content arriving
let area = Rect::new(0, 0, 80, 24); let area = Rect::new(0, 0, 80, 24);
update_scroll(&mut state, area); update_scroll(&mut state, area);
assert!(state.scroll > 0, "expected scroll > 0 with 50 messages"); assert!(state.scroll > 0, "expected scroll > 0 with 50 messages");