Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions codex-rs/codex-api/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ pub enum ResponseEvent {
delta: String,
},
ReasoningSummaryDelta {
item_id: String,
delta: String,
summary_index: i64,
},
Expand All @@ -109,6 +110,7 @@ pub enum ResponseEvent {
content_index: i64,
},
ReasoningSummaryPartAdded {
item_id: String,
summary_index: i64,
},
RateLimits(RateLimitSnapshot),
Expand Down
46 changes: 44 additions & 2 deletions codex-rs/codex-api/src/sse/responses.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,8 +341,11 @@ pub fn process_responses_event(
}
}
"response.reasoning_summary_text.delta" => {
if let (Some(delta), Some(summary_index)) = (event.delta, event.summary_index) {
if let (Some(item_id), Some(delta), Some(summary_index)) =
Comment thread
alexi-openai marked this conversation as resolved.
(event.item_id, event.delta, event.summary_index)
Comment on lines +344 to +345

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Keep fallback for summary deltas without item IDs

For streams that emit response.reasoning_summary_text.delta (and the matching part-added event) while a reasoning item is active but do not include item_id, this new parser branch now returns Ok(None) and silently drops the live reasoning update. The previous code and existing test helper shape supported those deltas by associating them with the active reasoning item, so older/legacy response streams lose all streaming reasoning summaries even though the final reasoning item can still arrive; preserve a fallback path instead of requiring item_id unconditionally.

Useful? React with 👍 / 👎.

{
return Ok(Some(ResponseEvent::ReasoningSummaryDelta {
item_id,
delta,
summary_index,
}));
Expand Down Expand Up @@ -434,8 +437,9 @@ pub fn process_responses_event(
}
}
"response.reasoning_summary_part.added" => {
if let Some(summary_index) = event.summary_index {
if let (Some(item_id), Some(summary_index)) = (event.item_id, event.summary_index) {
return Ok(Some(ResponseEvent::ReasoningSummaryPartAdded {
item_id,
summary_index,
}));
}
Expand Down Expand Up @@ -788,6 +792,44 @@ mod tests {
}
}

#[tokio::test]
async fn preserves_reasoning_summary_item_ids() {
let events = run_sse(vec![
json!({
"type": "response.reasoning_summary_part.added",
"item_id": "reasoning-1",
"summary_index": 0
}),
json!({
"type": "response.reasoning_summary_text.delta",
"item_id": "reasoning-1",
"summary_index": 0,
"delta": "Checking"
}),
json!({
"type": "response.completed",
"response": { "id": "resp1" }
}),
])
.await;

assert_matches!(
&events[0],
ResponseEvent::ReasoningSummaryPartAdded {
item_id,
summary_index: 0,
} if item_id == "reasoning-1"
);
assert_matches!(
&events[1],
ResponseEvent::ReasoningSummaryDelta {
item_id,
delta,
summary_index: 0,
} if item_id == "reasoning-1" && delta == "Checking"
);
}

#[tokio::test]
async fn error_when_missing_completed() {
let item1 = json!({
Expand Down
76 changes: 56 additions & 20 deletions codex-rs/core/src/session/turn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1928,6 +1928,7 @@ async fn try_run_sampling_request(
let mut needs_follow_up = false;
let mut last_agent_message: Option<String> = None;
let mut active_item: Option<TurnItem> = None;
let mut streamed_items = HashMap::<String, TurnItem>::new();
let mut active_tool_argument_diff_consumer: Option<(
String,
Box<dyn ToolArgumentDiffConsumer>,
Expand Down Expand Up @@ -1986,18 +1987,44 @@ async fn try_run_sampling_request(
match event {
ResponseEvent::Created => {}
ResponseEvent::OutputItemDone(item) => {
if let Some((_, mut consumer)) = active_tool_argument_diff_consumer.take()
let completes_active_tool_call = active_tool_argument_diff_consumer
.as_ref()
.is_some_and(|(active_call_id, _)| {
matches!(
&item,
ResponseItem::CustomToolCall { call_id, .. }
if call_id == active_call_id
)
});
if completes_active_tool_call
&& let Some((_, mut consumer)) = active_tool_argument_diff_consumer.take()
&& let Ok(Some(event)) = consumer.finish()
{
sess.send_event(&turn_context, event).await;
}
let previously_active_item = active_item.take();
let previously_streamed_item = if active_item_is_streaming_to_client {
previously_active_item
} else {
None
let completed_item_id = item.id().map(str::to_owned);
let is_tool_call = matches!(
&item,
ResponseItem::LocalShellCall { .. }
| ResponseItem::FunctionCall { .. }
| ResponseItem::ToolSearchCall { .. }
| ResponseItem::CustomToolCall { .. }
);
if completed_item_id.is_none() && !is_tool_call {
warn!("dropping non-tool output_item.done event without item ID");
continue;
}
let completes_active_item = match (&completed_item_id, active_item.as_ref()) {
(Some(completed_item_id), Some(active)) => completed_item_id == &active.id(),
_ => false,
};
Comment on lines +2005 to 2020

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Preserve streaming tool diffs across unrelated item completions

When a custom tool call is streaming input deltas and another response item completes before that tool call's own output_item.done, this new ID-based path can process the unrelated completion while keeping the active item alive, but the handler has already take()n and finish()ed active_tool_argument_diff_consumer at the top of the arm. In interleaved streams such as custom_tool_call.added → patch deltas → reasoning.done → more patch deltas, the later deltas are ignored, so live apply-patch/tool-diff updates disappear even though the tool call is still in progress; the consumer should only be finished when the completed item is the active tool call/call_id.

Useful? React with 👍 / 👎.

active_item_is_streaming_to_client = false;
let previously_streamed_item = completed_item_id
.as_deref()
.and_then(|item_id| streamed_items.remove(item_id));
if completes_active_item {
active_item = None;
active_item_is_streaming_to_client = false;
}
if let Some(previous) = previously_streamed_item.as_ref()
&& matches!(previous, TurnItem::AgentMessage(_))
{
Expand Down Expand Up @@ -2152,6 +2179,7 @@ async fn try_run_sampling_request(
)
.await;
}
streamed_items.insert(turn_item.id(), turn_item.clone());
}
active_item = Some(turn_item);
active_item_is_streaming_to_client = stream_item_to_client;
Expand Down Expand Up @@ -2288,39 +2316,47 @@ async fn try_run_sampling_request(
}
}
ResponseEvent::ReasoningSummaryDelta {
item_id,
delta,
summary_index,
} => {
if let Some(active) = active_item.as_ref() {
if !active_item_is_streaming_to_client {
continue;
}
if defer_streamed_turn_items_for_contributors {
continue;
}
if streamed_items.contains_key(&item_id) {
let event = ReasoningContentDeltaEvent {
thread_id: sess.thread_id.to_string(),
turn_id: turn_context.sub_id.clone(),
item_id: active.id(),
item_id,
delta,
summary_index,
};
sess.send_event(&turn_context, EventMsg::ReasoningContentDelta(event))
.await;
} else {
error_or_panic("ReasoningSummaryDelta without active item".to_string());
error_or_panic(format!(
"ReasoningSummaryDelta without streamed item {item_id}"
));
}
}
ResponseEvent::ReasoningSummaryPartAdded { summary_index } => {
if let Some(active) = active_item.as_ref() {
if !active_item_is_streaming_to_client {
continue;
}
ResponseEvent::ReasoningSummaryPartAdded {
item_id,
summary_index,
} => {
if defer_streamed_turn_items_for_contributors {
continue;
}
if streamed_items.contains_key(&item_id) {
let event =
EventMsg::AgentReasoningSectionBreak(AgentReasoningSectionBreakEvent {
item_id: active.id(),
item_id,
summary_index,
});
sess.send_event(&turn_context, event).await;
} else {
error_or_panic("ReasoningSummaryPartAdded without active item".to_string());
error_or_panic(format!(
"ReasoningSummaryPartAdded without streamed item {item_id}"
));
}
}
ResponseEvent::ReasoningContentDelta {
Expand Down
13 changes: 11 additions & 2 deletions codex-rs/core/tests/common/responses.rs
Original file line number Diff line number Diff line change
Expand Up @@ -784,11 +784,20 @@ pub fn ev_reasoning_item_added(id: &str, summary: &[&str]) -> Value {
})
}

pub fn ev_reasoning_summary_text_delta(delta: &str) -> Value {
pub fn ev_reasoning_summary_part_added(item_id: &str, summary_index: i64) -> Value {
serde_json::json!({
"type": "response.reasoning_summary_part.added",
"item_id": item_id,
"summary_index": summary_index,
})
}

pub fn ev_reasoning_summary_text_delta(item_id: &str, summary_index: i64, delta: &str) -> Value {
serde_json::json!({
"type": "response.reasoning_summary_text.delta",
"item_id": item_id,
"delta": delta,
"summary_index": 0,
"summary_index": summary_index,
})
}

Expand Down
1 change: 1 addition & 0 deletions codex-rs/core/tests/suite/apply_patch_cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1198,6 +1198,7 @@ async fn apply_patch_custom_tool_streaming_emits_updated_changes() -> Result<()>
"call_id": call_id,
"delta": "*** Add File: streamed.txt\n+hello",
}),
ev_assistant_message("unrelated-message", ""),
json!({
"type": "response.custom_tool_call_input.delta",
"call_id": call_id,
Expand Down
2 changes: 1 addition & 1 deletion codex-rs/core/tests/suite/codex_delegate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ async fn codex_delegate_ignores_legacy_deltas() {
let sse_stream = sse(vec![
ev_response_created("resp-1"),
ev_reasoning_item_added("reason-1", &["initial"]),
ev_reasoning_summary_text_delta("think-1"),
ev_reasoning_summary_text_delta("reason-1", /*summary_index*/ 0, "think-1"),
ev_completed("resp-1"),
]);

Expand Down
Loading
Loading