diff --git a/codex-rs/codex-api/src/common.rs b/codex-rs/codex-api/src/common.rs index bd037deb5838..e954ee7fa800 100644 --- a/codex-rs/codex-api/src/common.rs +++ b/codex-rs/codex-api/src/common.rs @@ -101,6 +101,7 @@ pub enum ResponseEvent { delta: String, }, ReasoningSummaryDelta { + item_id: String, delta: String, summary_index: i64, }, @@ -109,6 +110,7 @@ pub enum ResponseEvent { content_index: i64, }, ReasoningSummaryPartAdded { + item_id: String, summary_index: i64, }, RateLimits(RateLimitSnapshot), diff --git a/codex-rs/codex-api/src/sse/responses.rs b/codex-rs/codex-api/src/sse/responses.rs index 71f73c0232cc..9dcc2d13654f 100644 --- a/codex-rs/codex-api/src/sse/responses.rs +++ b/codex-rs/codex-api/src/sse/responses.rs @@ -341,8 +341,11 @@ pub fn process_responses_event( } } "response.reasoning_summary_text.delta" => { - if let (Some(delta), Some(summary_index)) = (event.delta, event.summary_index) { + if let (Some(item_id), Some(delta), Some(summary_index)) = + (event.item_id, event.delta, event.summary_index) + { return Ok(Some(ResponseEvent::ReasoningSummaryDelta { + item_id, delta, summary_index, })); @@ -434,8 +437,9 @@ pub fn process_responses_event( } } "response.reasoning_summary_part.added" => { - if let Some(summary_index) = event.summary_index { + if let (Some(item_id), Some(summary_index)) = (event.item_id, event.summary_index) { return Ok(Some(ResponseEvent::ReasoningSummaryPartAdded { + item_id, summary_index, })); } @@ -788,6 +792,44 @@ mod tests { } } + #[tokio::test] + async fn preserves_reasoning_summary_item_ids() { + let events = run_sse(vec![ + json!({ + "type": "response.reasoning_summary_part.added", + "item_id": "reasoning-1", + "summary_index": 0 + }), + json!({ + "type": "response.reasoning_summary_text.delta", + "item_id": "reasoning-1", + "summary_index": 0, + "delta": "Checking" + }), + json!({ + "type": "response.completed", + "response": { "id": "resp1" } + }), + ]) + .await; + + assert_matches!( + &events[0], + ResponseEvent::ReasoningSummaryPartAdded { + item_id, + summary_index: 0, + } if item_id == "reasoning-1" + ); + assert_matches!( + &events[1], + ResponseEvent::ReasoningSummaryDelta { + item_id, + delta, + summary_index: 0, + } if item_id == "reasoning-1" && delta == "Checking" + ); + } + #[tokio::test] async fn error_when_missing_completed() { let item1 = json!({ diff --git a/codex-rs/core/src/session/turn.rs b/codex-rs/core/src/session/turn.rs index f16b525002fc..1031c4291916 100644 --- a/codex-rs/core/src/session/turn.rs +++ b/codex-rs/core/src/session/turn.rs @@ -1928,6 +1928,7 @@ async fn try_run_sampling_request( let mut needs_follow_up = false; let mut last_agent_message: Option = None; let mut active_item: Option = None; + let mut streamed_items = HashMap::::new(); let mut active_tool_argument_diff_consumer: Option<( String, Box, @@ -1986,18 +1987,44 @@ async fn try_run_sampling_request( match event { ResponseEvent::Created => {} ResponseEvent::OutputItemDone(item) => { - if let Some((_, mut consumer)) = active_tool_argument_diff_consumer.take() + let completes_active_tool_call = active_tool_argument_diff_consumer + .as_ref() + .is_some_and(|(active_call_id, _)| { + matches!( + &item, + ResponseItem::CustomToolCall { call_id, .. } + if call_id == active_call_id + ) + }); + if completes_active_tool_call + && let Some((_, mut consumer)) = active_tool_argument_diff_consumer.take() && let Ok(Some(event)) = consumer.finish() { sess.send_event(&turn_context, event).await; } - let previously_active_item = active_item.take(); - let previously_streamed_item = if active_item_is_streaming_to_client { - previously_active_item - } else { - None + let completed_item_id = item.id().map(str::to_owned); + let is_tool_call = matches!( + &item, + ResponseItem::LocalShellCall { .. } + | ResponseItem::FunctionCall { .. } + | ResponseItem::ToolSearchCall { .. } + | ResponseItem::CustomToolCall { .. } + ); + if completed_item_id.is_none() && !is_tool_call { + warn!("dropping non-tool output_item.done event without item ID"); + continue; + } + let completes_active_item = match (&completed_item_id, active_item.as_ref()) { + (Some(completed_item_id), Some(active)) => completed_item_id == &active.id(), + _ => false, }; - active_item_is_streaming_to_client = false; + let previously_streamed_item = completed_item_id + .as_deref() + .and_then(|item_id| streamed_items.remove(item_id)); + if completes_active_item { + active_item = None; + active_item_is_streaming_to_client = false; + } if let Some(previous) = previously_streamed_item.as_ref() && matches!(previous, TurnItem::AgentMessage(_)) { @@ -2152,6 +2179,7 @@ async fn try_run_sampling_request( ) .await; } + streamed_items.insert(turn_item.id(), turn_item.clone()); } active_item = Some(turn_item); active_item_is_streaming_to_client = stream_item_to_client; @@ -2288,39 +2316,47 @@ async fn try_run_sampling_request( } } ResponseEvent::ReasoningSummaryDelta { + item_id, delta, summary_index, } => { - if let Some(active) = active_item.as_ref() { - if !active_item_is_streaming_to_client { - continue; - } + if defer_streamed_turn_items_for_contributors { + continue; + } + if streamed_items.contains_key(&item_id) { let event = ReasoningContentDeltaEvent { thread_id: sess.thread_id.to_string(), turn_id: turn_context.sub_id.clone(), - item_id: active.id(), + item_id, delta, summary_index, }; sess.send_event(&turn_context, EventMsg::ReasoningContentDelta(event)) .await; } else { - error_or_panic("ReasoningSummaryDelta without active item".to_string()); + error_or_panic(format!( + "ReasoningSummaryDelta without streamed item {item_id}" + )); } } - ResponseEvent::ReasoningSummaryPartAdded { summary_index } => { - if let Some(active) = active_item.as_ref() { - if !active_item_is_streaming_to_client { - continue; - } + ResponseEvent::ReasoningSummaryPartAdded { + item_id, + summary_index, + } => { + if defer_streamed_turn_items_for_contributors { + continue; + } + if streamed_items.contains_key(&item_id) { let event = EventMsg::AgentReasoningSectionBreak(AgentReasoningSectionBreakEvent { - item_id: active.id(), + item_id, summary_index, }); sess.send_event(&turn_context, event).await; } else { - error_or_panic("ReasoningSummaryPartAdded without active item".to_string()); + error_or_panic(format!( + "ReasoningSummaryPartAdded without streamed item {item_id}" + )); } } ResponseEvent::ReasoningContentDelta { diff --git a/codex-rs/core/tests/common/responses.rs b/codex-rs/core/tests/common/responses.rs index 9569df1c90ec..45d4cb3d99a5 100644 --- a/codex-rs/core/tests/common/responses.rs +++ b/codex-rs/core/tests/common/responses.rs @@ -784,11 +784,20 @@ pub fn ev_reasoning_item_added(id: &str, summary: &[&str]) -> Value { }) } -pub fn ev_reasoning_summary_text_delta(delta: &str) -> Value { +pub fn ev_reasoning_summary_part_added(item_id: &str, summary_index: i64) -> Value { + serde_json::json!({ + "type": "response.reasoning_summary_part.added", + "item_id": item_id, + "summary_index": summary_index, + }) +} + +pub fn ev_reasoning_summary_text_delta(item_id: &str, summary_index: i64, delta: &str) -> Value { serde_json::json!({ "type": "response.reasoning_summary_text.delta", + "item_id": item_id, "delta": delta, - "summary_index": 0, + "summary_index": summary_index, }) } diff --git a/codex-rs/core/tests/suite/apply_patch_cli.rs b/codex-rs/core/tests/suite/apply_patch_cli.rs index d8142723b495..92f583c47072 100644 --- a/codex-rs/core/tests/suite/apply_patch_cli.rs +++ b/codex-rs/core/tests/suite/apply_patch_cli.rs @@ -1198,6 +1198,7 @@ async fn apply_patch_custom_tool_streaming_emits_updated_changes() -> Result<()> "call_id": call_id, "delta": "*** Add File: streamed.txt\n+hello", }), + ev_assistant_message("unrelated-message", ""), json!({ "type": "response.custom_tool_call_input.delta", "call_id": call_id, diff --git a/codex-rs/core/tests/suite/codex_delegate.rs b/codex-rs/core/tests/suite/codex_delegate.rs index f5db9856df2f..7dab8a068841 100644 --- a/codex-rs/core/tests/suite/codex_delegate.rs +++ b/codex-rs/core/tests/suite/codex_delegate.rs @@ -204,7 +204,7 @@ async fn codex_delegate_ignores_legacy_deltas() { let sse_stream = sse(vec![ ev_response_created("resp-1"), ev_reasoning_item_added("reason-1", &["initial"]), - ev_reasoning_summary_text_delta("think-1"), + ev_reasoning_summary_text_delta("reason-1", /*summary_index*/ 0, "think-1"), ev_completed("resp-1"), ]); diff --git a/codex-rs/core/tests/suite/items.rs b/codex-rs/core/tests/suite/items.rs index 515d496b8b7a..b29696fdad4d 100644 --- a/codex-rs/core/tests/suite/items.rs +++ b/codex-rs/core/tests/suite/items.rs @@ -20,17 +20,20 @@ use codex_utils_absolute_path::AbsolutePathBuf; use core_test_support::PathBufExt; use core_test_support::responses::ev_assistant_message; use core_test_support::responses::ev_completed; +use core_test_support::responses::ev_function_call; use core_test_support::responses::ev_image_generation_call; use core_test_support::responses::ev_message_item_added; use core_test_support::responses::ev_output_text_delta; use core_test_support::responses::ev_reasoning_item; use core_test_support::responses::ev_reasoning_item_added; +use core_test_support::responses::ev_reasoning_summary_part_added; use core_test_support::responses::ev_reasoning_summary_text_delta; use core_test_support::responses::ev_reasoning_text_delta; use core_test_support::responses::ev_response_created; use core_test_support::responses::ev_web_search_call_added_partial; use core_test_support::responses::ev_web_search_call_done; use core_test_support::responses::mount_sse_once; +use core_test_support::responses::mount_sse_sequence; use core_test_support::responses::sse; use core_test_support::responses::start_mock_server; use core_test_support::skip_if_no_network; @@ -1085,21 +1088,45 @@ async fn plan_mode_handles_missing_plan_close_tag() -> anyhow::Result<()> { } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn reasoning_content_delta_has_item_metadata() -> anyhow::Result<()> { +async fn interleaved_reasoning_summary_events_keep_reasoning_item_metadata() -> anyhow::Result<()> { skip_if_no_network!(Ok(())); let server = start_mock_server().await; let TestCodex { codex, .. } = test_codex().build(&server).await?; - let stream = sse(vec![ - ev_response_created("resp-1"), - ev_reasoning_item_added("reasoning-1", &[""]), - ev_reasoning_summary_text_delta("step one"), - ev_reasoning_item("reasoning-1", &["step one"], &[]), - ev_completed("resp-1"), - ]); - mount_sse_once(&server, stream).await; + mount_sse_sequence( + &server, + vec![ + sse(vec![ + ev_response_created("resp-1"), + ev_reasoning_item_added("reasoning-1", &[""]), + ev_reasoning_summary_part_added("reasoning-1", /*summary_index*/ 0), + ev_reasoning_summary_text_delta( + "reasoning-1", + /*summary_index*/ 0, + "step one", + ), + ev_reasoning_item("", &["ignored"], &[]), + ev_function_call("call-1", "unknown_tool", "{}"), + ev_reasoning_summary_part_added("reasoning-1", /*summary_index*/ 1), + ev_reasoning_summary_text_delta( + "reasoning-1", + /*summary_index*/ 1, + "step two", + ), + ev_reasoning_item("reasoning-1", &["step one", "step two"], &[]), + ev_completed("resp-1"), + ]), + sse(vec![ + ev_message_item_added("message-1", ""), + ev_output_text_delta("Done"), + ev_assistant_message("message-1", "Done"), + ev_completed("resp-2"), + ]), + ], + ) + .await; codex .submit(Op::UserInput { @@ -1114,22 +1141,59 @@ async fn reasoning_content_delta_has_item_metadata() -> anyhow::Result<()> { }) .await?; - let reasoning_item = wait_for_event_match(&codex, |ev| match ev { - EventMsg::ItemStarted(ItemStartedEvent { - item: TurnItem::Reasoning(item), - .. - }) => Some(item.clone()), - _ => None, - }) - .await; + let mut reasoning_started = Vec::new(); + let mut reasoning_completed = Vec::new(); + let mut messages_started = Vec::new(); + let mut messages_completed = Vec::new(); + let mut summary_deltas = Vec::new(); + let mut summary_sections = Vec::new(); + loop { + match wait_for_event(&codex, |_| true).await { + EventMsg::ItemStarted(ItemStartedEvent { + item: TurnItem::Reasoning(item), + .. + }) => reasoning_started.push(item.id), + EventMsg::ItemCompleted(ItemCompletedEvent { + item: TurnItem::Reasoning(item), + .. + }) => reasoning_completed.push(item.id), + EventMsg::ItemStarted(ItemStartedEvent { + item: TurnItem::AgentMessage(item), + .. + }) => messages_started.push(item.id), + EventMsg::ItemCompleted(ItemCompletedEvent { + item: TurnItem::AgentMessage(item), + .. + }) => messages_completed.push(item.id), + EventMsg::ReasoningContentDelta(event) => { + summary_deltas.push((event.item_id, event.delta, event.summary_index)); + } + EventMsg::AgentReasoningSectionBreak(event) => { + summary_sections.push((event.item_id, event.summary_index)); + } + EventMsg::TurnComplete(_) => break, + _ => {} + } + } - let delta_event = wait_for_event_match(&codex, |ev| match ev { - EventMsg::ReasoningContentDelta(event) => Some(event.clone()), - _ => None, - }) - .await; - assert_eq!(delta_event.item_id, reasoning_item.id); - assert_eq!(delta_event.delta, "step one"); + assert_eq!(reasoning_started, vec!["reasoning-1"]); + assert_eq!(reasoning_completed, vec!["reasoning-1"]); + assert_eq!(messages_started, vec!["message-1"]); + assert_eq!(messages_completed, vec!["message-1"]); + assert_eq!( + summary_deltas, + vec![ + ("reasoning-1".to_string(), "step one".to_string(), 0), + ("reasoning-1".to_string(), "step two".to_string(), 1), + ] + ); + assert_eq!( + summary_sections, + vec![ + ("reasoning-1".to_string(), 0), + ("reasoning-1".to_string(), 1) + ] + ); Ok(()) } diff --git a/codex-rs/core/tests/suite/otel.rs b/codex-rs/core/tests/suite/otel.rs index 4a2b2993ad7e..060a9079a906 100644 --- a/codex-rs/core/tests/suite/otel.rs +++ b/codex-rs/core/tests/suite/otel.rs @@ -821,7 +821,7 @@ async fn record_responses_sets_span_fields_for_response_events() { ev_message_item_added("msg-added", "hi there"), ev_reasoning_item_added("reasoning-1", &["summary"]), ev_output_text_delta("delta"), - ev_reasoning_summary_text_delta("summary-delta"), + ev_reasoning_summary_text_delta("reasoning-1", /*summary_index*/ 0, "summary-delta"), ev_reasoning_text_delta("raw-delta"), ev_function_call("call-1", "fn", "{\"key\":\"value\"}"), ev_assistant_message("msg-1", "agent"), diff --git a/codex-rs/tui/src/chatwidget/tests/history_replay.rs b/codex-rs/tui/src/chatwidget/tests/history_replay.rs index f8626dfcc0ba..300978cf4d55 100644 --- a/codex-rs/tui/src/chatwidget/tests/history_replay.rs +++ b/codex-rs/tui/src/chatwidget/tests/history_replay.rs @@ -995,34 +995,30 @@ async fn live_reasoning_summary_is_not_rendered_twice_when_item_completes() { let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(/*model_override*/ None).await; chat.show_welcome_banner = false; - chat.handle_server_notification( - ServerNotification::TurnStarted(TurnStartedNotification { - thread_id: "thread-1".to_string(), - turn: AppServerTurn { - id: "turn-1".to_string(), - items_view: codex_app_server_protocol::TurnItemsView::Full, - items: Vec::new(), - status: AppServerTurnStatus::InProgress, - error: None, - started_at: Some(0), - completed_at: None, - duration_ms: None, - }, - }), - /*replay_kind*/ None, - ); - let _ = drain_insert_history(&mut rx); + handle_turn_started(&mut chat, "turn-1"); + while rx.try_recv().is_ok() {} + handle_agent_reasoning_delta(&mut chat, "**Step one**\nFirst summary"); + handle_agent_message_delta(&mut chat, "Final answer"); + complete_assistant_message( + &mut chat, + "msg-1", + "Final answer", + Some(MessagePhase::FinalAnswer), + ); chat.handle_server_notification( - ServerNotification::ReasoningSummaryTextDelta(ReasoningSummaryTextDeltaNotification { - thread_id: "thread-1".to_string(), - turn_id: "turn-1".to_string(), - item_id: "reasoning-1".to_string(), - delta: "Summary only".to_string(), - summary_index: 0, - }), + ServerNotification::ReasoningSummaryPartAdded( + codex_app_server_protocol::ReasoningSummaryPartAddedNotification { + thread_id: chat.thread_id.map(|id| id.to_string()).unwrap_or_default(), + turn_id: "turn-1".to_string(), + item_id: "reasoning-1".to_string(), + summary_index: 1, + }, + ), /*replay_kind*/ None, ); + handle_agent_reasoning_delta(&mut chat, "**Step two**\nSecond summary"); + handle_agent_reasoning_final(&mut chat); chat.handle_server_notification( ServerNotification::ItemCompleted(ItemCompletedNotification { @@ -1031,20 +1027,35 @@ async fn live_reasoning_summary_is_not_rendered_twice_when_item_completes() { completed_at_ms: 0, item: AppServerThreadItem::Reasoning { id: "reasoning-1".to_string(), - summary: vec!["Summary only".to_string()], + summary: vec![ + "**Step one**\nFirst summary".to_string(), + "**Step two**\nSecond summary".to_string(), + ], content: Vec::new(), }, }), /*replay_kind*/ None, ); - let rendered = match rx.try_recv() { - Ok(AppEvent::InsertHistoryCell(cell)) => { - lines_to_single_string(&cell.transcript_lines(/*width*/ 80)) + let mut consolidated_answer = None; + let mut inserted_history = String::new(); + while let Ok(event) = rx.try_recv() { + match event { + AppEvent::ConsolidateAgentMessage { source, .. } => { + consolidated_answer = Some(source); + } + AppEvent::InsertHistoryCell(cell) => { + inserted_history.push_str(&lines_to_single_string( + &cell.transcript_lines(/*width*/ 80), + )); + } + _ => {} } - other => panic!("expected InsertHistoryCell, got {other:?}"), - }; - assert_eq!(rendered.matches("Summary only").count(), 1); + } + + assert_eq!(consolidated_answer.as_deref(), Some("Final answer")); + assert_eq!(inserted_history.matches("First summary").count(), 1); + assert_eq!(inserted_history.matches("Second summary").count(), 1); } #[tokio::test]