//! Server event draining: applies incoming [`UIEvent`]s to the TUI state. use tokio::sync::mpsc; use tracing::debug; use super::{AppState, DisplayMessage}; use crate::core::types::{Role, StampedEvent, UIEvent}; use crate::tui::tool_display; /// Drain all pending [`UIEvent`]s from `event_rx` and apply them to `state`. /// /// This is non-blocking: it processes all currently-available events and returns /// immediately when the channel is empty. /// /// Tool events use in-place replacement: when a `ToolExecuting` or `ToolResult` /// arrives, the handler searches `state.messages` for an existing entry with the /// same `tool_use_id` and replaces its content rather than appending a new row. /// /// | Event | Effect | /// |-------------------|------------------------------------------------------------| /// | `StreamDelta(s)` | Append `s` to last message if it's `Assistant`; else push | /// | `ToolExecuting` | Push new executing message (or replace in-place) | /// | `ToolResult` | Replace executing message in-place (or push new) | /// | `TurnComplete` | No structural change; logged at debug level | /// | `Error(msg)` | Push `(Assistant, "[error] {msg}")` | pub(super) fn drain_ui_events(event_rx: &mut mpsc::Receiver, state: &mut AppState) { while let Ok(stamped) = event_rx.try_recv() { // Discard events from before the most recent :clear. if stamped.epoch < state.epoch { debug!( event_epoch = stamped.epoch, state_epoch = state.epoch, "dropping stale event" ); continue; } match stamped.event { UIEvent::StreamDelta(chunk) => { if let Some(msg) = state.messages.last_mut() { if msg.role == Role::Assistant && msg.tool_use_id.is_none() { msg.content.push_str(&chunk); } else { state.messages.push(DisplayMessage { role: Role::Assistant, content: chunk, tool_use_id: None, }); } } else { state.messages.push(DisplayMessage { role: Role::Assistant, content: chunk, tool_use_id: None, }); } state.content_changed = true; } UIEvent::ToolExecuting { tool_use_id, tool_name, display, } => { let content = tool_display::format_executing(&tool_name, &display); replace_or_push(state, &tool_use_id, content); state.content_changed = true; } UIEvent::ToolResult { tool_use_id, tool_name, display, is_error, } => { let content = tool_display::format_result(&tool_name, &display, is_error); replace_or_push(state, &tool_use_id, content); state.content_changed = true; } UIEvent::TurnComplete => { debug!("turn complete"); } UIEvent::NetworkPolicyChanged(allowed) => { state.network_allowed = allowed; } UIEvent::Error(msg) => { state.messages.push(DisplayMessage { role: Role::Assistant, content: format!("[error] {msg}"), tool_use_id: None, }); state.content_changed = true; } } } } /// Find the message with the given `tool_use_id` and replace its content, /// or push a new message if not found. fn replace_or_push(state: &mut AppState, tool_use_id: &str, content: String) { if let Some(msg) = state .messages .iter_mut() .rev() .find(|m| m.tool_use_id.as_deref() == Some(tool_use_id)) { msg.content = content; } else { state.messages.push(DisplayMessage { role: Role::Assistant, content, tool_use_id: Some(tool_use_id.to_string()), }); } } #[cfg(test)] mod tests { use super::*; use crate::core::types::ToolDisplay; /// Wrap a [`UIEvent`] in a [`StampedEvent`] at epoch 0 for tests. fn stamp(event: UIEvent) -> StampedEvent { StampedEvent { epoch: 0, event } } #[tokio::test] async fn drain_appends_to_existing_assistant() { let (tx, mut rx) = tokio::sync::mpsc::channel(8); let mut state = AppState::new(); state.messages.push(DisplayMessage { role: Role::Assistant, content: "hello".to_string(), tool_use_id: None, }); tx.send(stamp(UIEvent::StreamDelta(" world".to_string()))) .await .unwrap(); drop(tx); drain_ui_events(&mut rx, &mut state); assert_eq!(state.messages.last().unwrap().content, "hello world"); } #[tokio::test] async fn drain_creates_assistant_on_user_last() { let (tx, mut rx) = tokio::sync::mpsc::channel(8); let mut state = AppState::new(); state.messages.push(DisplayMessage { role: Role::User, content: "hi".to_string(), tool_use_id: None, }); tx.send(stamp(UIEvent::StreamDelta("hello".to_string()))) .await .unwrap(); drop(tx); drain_ui_events(&mut rx, &mut state); assert_eq!(state.messages.len(), 2); assert_eq!(state.messages[1].role, Role::Assistant); assert_eq!(state.messages[1].content, "hello"); } #[tokio::test] async fn drain_tool_result_replaces_existing_message() { let (tx, mut rx) = tokio::sync::mpsc::channel(8); let mut state = AppState::new(); // Simulate an existing executing message. state.messages.push(DisplayMessage { role: Role::Assistant, content: "$ cargo test".to_string(), tool_use_id: Some("t1".to_string()), }); tx.send(stamp(UIEvent::ToolResult { tool_use_id: "t1".to_string(), tool_name: "shell_exec".to_string(), display: ToolDisplay::ShellExec { command: "cargo test\nok".to_string(), }, is_error: false, })) .await .unwrap(); drop(tx); drain_ui_events(&mut rx, &mut state); // Should replace, not append. assert_eq!(state.messages.len(), 1); assert!(state.messages[0].content.contains("cargo test")); } #[tokio::test] async fn drain_discards_stale_epoch_events() { let (tx, mut rx) = tokio::sync::mpsc::channel(8); let mut state = AppState::new(); state.epoch = 2; tx.send(StampedEvent { epoch: 1, event: UIEvent::StreamDelta("ghost".to_string()), }) .await .unwrap(); tx.send(StampedEvent { epoch: 2, event: UIEvent::StreamDelta("real".to_string()), }) .await .unwrap(); drop(tx); drain_ui_events(&mut rx, &mut state); assert_eq!(state.messages.len(), 1); assert_eq!(state.messages[0].content, "real"); } #[tokio::test] async fn drain_sets_content_changed() { let (tx, mut rx) = tokio::sync::mpsc::channel(8); let mut state = AppState::new(); assert!(!state.content_changed); tx.send(stamp(UIEvent::StreamDelta("hi".to_string()))) .await .unwrap(); drop(tx); drain_ui_events(&mut rx, &mut state); assert!(state.content_changed); } }