diff --git a/backend/app/api/dingtalk.py b/backend/app/api/dingtalk.py index f18c81aa3..c7c0ac9e7 100644 --- a/backend/app/api/dingtalk.py +++ b/backend/app/api/dingtalk.py @@ -218,7 +218,8 @@ async def process_dingtalk_message( .order_by(ChatMessage.created_at.desc()) .limit(ctx_size) ) - history = [{"role": m.role, "content": m.content} for m in reversed(history_r.scalars().all())] + from app.services.llm.utils import convert_chat_messages_to_llm_format as _conv + history = _conv(reversed(history_r.scalars().all())) # Build saved_content for DB (no base64 blobs, keep it display-friendly) import re as _re_dt diff --git a/backend/app/api/discord_bot.py b/backend/app/api/discord_bot.py index b50b63b3a..28c879195 100644 --- a/backend/app/api/discord_bot.py +++ b/backend/app/api/discord_bot.py @@ -335,7 +335,8 @@ async def handle_in_background(): .order_by(ChatMessage.created_at.desc()) .limit(ctx_size) ) - history = [{"role": m.role, "content": m.content} for m in reversed(history_r.scalars().all())] + from app.services.llm.utils import convert_chat_messages_to_llm_format as _conv + history = _conv(reversed(history_r.scalars().all())) # Save user message bg_db.add(ChatMessage(agent_id=agent_id, user_id=platform_user_id, role="user", content=user_text, conversation_id=session_conv_id)) diff --git a/backend/app/api/feishu.py b/backend/app/api/feishu.py index 29961b82c..f789a2b78 100644 --- a/backend/app/api/feishu.py +++ b/backend/app/api/feishu.py @@ -6,9 +6,10 @@ from collections.abc import Awaitable, Callable from datetime import datetime -from fastapi import APIRouter, Depends, HTTPException, Request, Response, status +from fastapi import APIRouter, Depends, HTTPException, Request, status +from fastapi.responses import HTMLResponse from loguru import logger -from sqlalchemy import select, or_ +from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.core.permissions import check_agent_access, is_agent_creator, is_agent_expired @@ -16,9 +17,9 @@ from app.database import async_session as _async_session, get_db from app.models.channel_config import ChannelConfig from app.models.user import User -from app.models.identity import IdentityProvider from app.schemas.schemas import ChannelConfigCreate, ChannelConfigOut, TokenResponse, UserOut from app.services.feishu_service import feishu_service +from app.services.llm.utils import convert_chat_messages_to_llm_format, truncate_messages_with_pair_integrity from app.services.storage import agent_upload_key, get_storage_backend, store_agent_upload router = APIRouter(tags=["feishu"]) @@ -265,8 +266,6 @@ async def _save_feishu_tool_call( # ─── OAuth ────────────────────────────────────────────── -from fastapi.responses import HTMLResponse, Response - @router.get("/auth/feishu/callback") @router.post("/auth/feishu/callback", response_model=TokenResponse) async def feishu_oauth_callback( @@ -673,10 +672,9 @@ async def process_feishu_event(agent_id: uuid.UUID, body: dict): .limit(ctx_size) ) history_msgs = history_result.scalars().all() - history = _build_llm_history_from_chat_messages(list(reversed(history_msgs))) + history = convert_chat_messages_to_llm_format(reversed(history_msgs)) # --- Resolve Feishu sender identity & find/create platform user --- - import uuid as _uuid import httpx as _httpx sender_name = "" @@ -732,7 +730,9 @@ async def process_feishu_event(agent_id: uuid.UUID, body: dict): # Cache sender info so feishu_user_search can find them by name if sender_name and sender_open_id: try: - import pathlib as _pl, json as _cj, time as _ct + import pathlib as _pl + import json as _cj + import time as _ct _safe_id = str(agent_id).replace("..", "").replace("/", "") _cache = _pl.Path(f"/data/workspaces/{_safe_id}/feishu_contacts_cache.json") _cache.parent.mkdir(parents=True, exist_ok=True) @@ -1221,15 +1221,14 @@ async def _handle_feishu_file( chat_id, ): """Handle incoming file or image messages from Feishu (runs as a background task).""" - import asyncio, random, json + import asyncio + import random + import json from app.models.audit import ChatMessage from app.models.agent import Agent as AgentModel - from app.models.user import User as UserModel from app.services.channel_session import find_or_create_channel_session - from app.core.security import hash_password from app.database import async_session as _async_session from datetime import datetime as _dt, timezone as _tz - import uuid as _uuid from sqlalchemy import select as _select msg_type = message.get("message_type", "file") @@ -1404,7 +1403,7 @@ async def _handle_feishu_file( .order_by(ChatMessage.created_at.desc()) .limit(ctx_size) ) - _history = _build_llm_history_from_chat_messages(list(reversed(_hist_r.scalars().all()))) + _history = convert_chat_messages_to_llm_format(reversed(_hist_r.scalars().all())) await db.commit() @@ -1666,7 +1665,7 @@ async def _call_llm_with_config( from app.models.agent import DEFAULT_CONTEXT_WINDOW_SIZE ctx_size = agent.context_window_size or DEFAULT_CONTEXT_WINDOW_SIZE if history: - messages.extend(_normalize_history_messages(history)[-ctx_size:]) + messages.extend(truncate_messages_with_pair_integrity(history, ctx_size)) messages.append({"role": "user", "content": user_text}) effective_user_id = user_id or agent_id diff --git a/backend/app/api/gateway.py b/backend/app/api/gateway.py index d52e8dfb7..501d72fcb 100644 --- a/backend/app/api/gateway.py +++ b/backend/app/api/gateway.py @@ -392,9 +392,8 @@ async def _send_to_agent_background( ) hist_msgs = list(reversed(hist_result.scalars().all())) - messages = [] - for h in hist_msgs: - messages.append({"role": h.role, "content": h.content or ""}) + from app.services.llm.utils import convert_chat_messages_to_llm_format as _conv + messages = _conv(hist_msgs) # Add the new message with agent communication context user_msg = f"{agent_comm_alert}\n\n[Message from agent: {source_agent_name}]\n{content}" diff --git a/backend/app/api/slack.py b/backend/app/api/slack.py index 95849c6db..0b9d46595 100644 --- a/backend/app/api/slack.py +++ b/backend/app/api/slack.py @@ -306,7 +306,8 @@ async def slack_event_webhook( .order_by(ChatMessage.created_at.desc()) .limit(ctx_size) ) - history = [{"role": m.role, "content": m.content} for m in reversed(history_r.scalars().all())] + from app.services.llm.utils import convert_chat_messages_to_llm_format as _conv + history = _conv(reversed(history_r.scalars().all())) # Handle file attachments: save to workspace/uploads/ and send ack import asyncio as _asyncio diff --git a/backend/app/api/teams.py b/backend/app/api/teams.py index 003de0019..ce7bc1b7e 100644 --- a/backend/app/api/teams.py +++ b/backend/app/api/teams.py @@ -477,7 +477,8 @@ async def teams_event_webhook( .order_by(ChatMessage.created_at.desc()) .limit(ctx_size) ) - history = [{"role": m.role, "content": m.content} for m in reversed(history_r.scalars().all())] + from app.services.llm.utils import convert_chat_messages_to_llm_format as _conv + history = _conv(reversed(history_r.scalars().all())) # Save user message db.add(ChatMessage(agent_id=agent_id, user_id=platform_user_id, role="user", content=user_text, conversation_id=session_conv_id)) diff --git a/backend/app/api/websocket.py b/backend/app/api/websocket.py index e259005b8..68330687b 100644 --- a/backend/app/api/websocket.py +++ b/backend/app/api/websocket.py @@ -8,15 +8,15 @@ from time import perf_counter -from fastapi import APIRouter, Depends, Query, WebSocket, WebSocketDisconnect +from fastapi import APIRouter, Query, WebSocket, WebSocketDisconnect from loguru import logger from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.core.logging_config import set_trace_id from app.core.permissions import check_agent_access, is_agent_expired -from app.core.security import decode_access_token, get_current_user -from app.database import async_session, get_db +from app.core.security import decode_access_token +from app.database import async_session from app.models.agent import Agent from app.models.audit import ChatMessage from app.models.chat_session import ChatSession @@ -27,6 +27,7 @@ from app.services.agentbay_live import detect_agentbay_env, get_browser_snapshot, get_desktop_screenshot from app.services.chat_session_service import ensure_primary_platform_session from app.services.llm import call_llm_with_failover +from app.services.llm.utils import convert_chat_messages_to_llm_format, truncate_messages_with_pair_integrity from app.services.onboarding import is_onboarded, mark_onboarding_phase, resolve_onboarding_prompt from app.services.quota_guard import ( AgentExpired, @@ -38,7 +39,6 @@ ) from app.services.realtime import realtime_router from app.services.task_executor import execute_task -from app.services.vision_inject import sanitize_history_tool_result router = APIRouter(tags=["websocket"]) @@ -451,46 +451,7 @@ async def _load_history(self, db: AsyncSession): def _build_conversation_context(self) -> list[dict]: """Translates historical ChatMessages to LLM inputs.""" - conversation = [] - for msg in self.history_messages: - if msg.role == "tool_call": - try: - tc_data = json.loads(msg.content) - tc_name = tc_data.get("name") or tc_data.get("tool_name") or "unknown" - tc_args = tc_data.get("args") or tc_data.get("arguments") or {} - tc_result = tc_data.get("result", "") - tc_id = f"call_{msg.id}" - asst_msg = { - "role": "assistant", - "content": None, - "tool_calls": [ - { - "id": tc_id, - "type": "function", - "function": {"name": tc_name, "arguments": json.dumps(tc_args, ensure_ascii=False)}, - } - ], - } - if tc_data.get("reasoning_content"): - asst_msg["reasoning_content"] = tc_data["reasoning_content"] - conversation.append(asst_msg) - - sanitized_result = sanitize_history_tool_result(str(tc_result)) - conversation.append( - { - "role": "tool", - "tool_call_id": tc_id, - "content": sanitized_result[:500], - } - ) - except Exception: - continue - else: - entry = {"role": msg.role, "content": msg.content} - if hasattr(msg, "thinking") and msg.thinking: - entry["thinking"] = msg.thinking - conversation.append(entry) - return conversation + return convert_chat_messages_to_llm_format(self.history_messages) async def message_loop(self): """Core message processing loop.""" @@ -845,9 +806,7 @@ async def _call_with_failover(): async def _on_failover(reason: str): await self.websocket.send_json({"type": "info", "content": f"Primary model error, {reason}"}) - _truncated = self.conversation[-self.ctx_size :] - while _truncated and _truncated[0].get("role") == "tool": - _truncated.pop(0) + _truncated = truncate_messages_with_pair_integrity(self.conversation, self.ctx_size) # Resolve onboarding prompt skip_tools_for_greeting = False diff --git a/backend/app/api/wecom.py b/backend/app/api/wecom.py index 4cda4800f..ed15ed477 100644 --- a/backend/app/api/wecom.py +++ b/backend/app/api/wecom.py @@ -579,7 +579,8 @@ async def _process_wecom_text( .order_by(ChatMessage.created_at.desc()) .limit(ctx_size) ) - history = [{"role": m.role, "content": m.content} for m in reversed(history_r.scalars().all())] + from app.services.llm.utils import convert_chat_messages_to_llm_format as _conv + history = _conv(reversed(history_r.scalars().all())) # Save user message db.add(ChatMessage( diff --git a/backend/app/services/discord_gateway.py b/backend/app/services/discord_gateway.py index fa8e7971f..c8384c777 100644 --- a/backend/app/services/discord_gateway.py +++ b/backend/app/services/discord_gateway.py @@ -215,10 +215,8 @@ async def _handle_message( .order_by(ChatMessage.created_at.desc()) .limit(ctx_size) ) - history = [ - {"role": m.role, "content": m.content} - for m in reversed(history_r.scalars().all()) - ] + from app.services.llm.utils import convert_chat_messages_to_llm_format as _conv + history = _conv(reversed(history_r.scalars().all())) # Save user message db.add(ChatMessage( diff --git a/backend/app/services/llm/client.py b/backend/app/services/llm/client.py index 1f9af784b..5354835a8 100644 --- a/backend/app/services/llm/client.py +++ b/backend/app/services/llm/client.py @@ -743,10 +743,20 @@ def _messages_to_input(self, messages: list[LLMMessage]) -> list[dict[str, Any]] input_items: list[dict[str, Any]] = [] for msg in messages: - if msg.role in {"system", "user", "assistant"} and msg.content is not None: - item: dict[str, Any] = {"role": msg.role} - item["content"] = self._format_content_for_input(msg.content) - input_items.append(item) + # Handle system messages with dynamic_content + if msg.role == "system" and msg.content is not None: + content = msg.content + if msg.dynamic_content: + content = f"{content}\n\n{msg.dynamic_content}" + input_items.append({ + "role": msg.role, + "content": self._format_content_for_input(content), + }) + elif msg.role in {"user", "assistant"} and msg.content is not None: + input_items.append({ + "role": msg.role, + "content": self._format_content_for_input(msg.content), + }) if msg.role == "assistant" and msg.tool_calls: for tc in msg.tool_calls: @@ -768,8 +778,67 @@ def _messages_to_input(self, messages: list[LLMMessage]) -> list[dict[str, Any]] "output": msg.content or "", }) + # Sanitize: ensure every function_call_output has a matching function_call. + # This prevents "No tool call found for function call output" API errors + # caused by context window truncation breaking assistant+tool pairs. + input_items = self._sanitize_input_items(input_items) + return input_items + @staticmethod + def _sanitize_input_items(items: list[dict[str, Any]]) -> list[dict[str, Any]]: + """Remove orphaned function_call_output items that have no matching function_call. + + Also removes function_call items whose function_call_output is missing, + since the Responses API requires complete pairs. + """ + # Collect all call_ids from function_call items + call_ids_with_fc: set[str] = set() + for item in items: + if item.get("type") == "function_call": + call_id = item.get("call_id", "") + if call_id: + call_ids_with_fc.add(call_id) + + # Collect all call_ids from function_call_output items + call_ids_with_fco: set[str] = set() + for item in items: + if item.get("type") == "function_call_output": + call_id = item.get("call_id", "") + if call_id: + call_ids_with_fco.add(call_id) + + # Determine which call_ids are orphaned (output without call, or call without output) + orphaned_fco = call_ids_with_fco - call_ids_with_fc + orphaned_fc = call_ids_with_fc - call_ids_with_fco + + if not orphaned_fco and not orphaned_fc: + return items + + if orphaned_fco: + logger.warning( + "[OpenAIResponses] Removing %d orphaned function_call_output item(s) " + "with no matching function_call: %s", + len(orphaned_fco), + orphaned_fco, + ) + if orphaned_fc: + logger.warning( + "[OpenAIResponses] Removing %d orphaned function_call item(s) " + "with no matching function_call_output: %s", + len(orphaned_fc), + orphaned_fc, + ) + + # Filter out orphaned items + return [ + item for item in items + if not ( + (item.get("type") == "function_call_output" and item.get("call_id", "") in orphaned_fco) + or (item.get("type") == "function_call" and item.get("call_id", "") in orphaned_fc) + ) + ] + def _convert_tools(self, tools: list[dict] | None) -> list[dict] | None: """Convert OpenAI tool schema to Responses API function tool schema.""" if not tools: diff --git a/backend/app/services/llm/utils.py b/backend/app/services/llm/utils.py index fbb3637b2..27f2b5b47 100644 --- a/backend/app/services/llm/utils.py +++ b/backend/app/services/llm/utils.py @@ -28,8 +28,6 @@ ProviderSpec, PROVIDER_URLS, TOOL_CHOICE_PROVIDERS, - MAX_TOKENS_BY_PROVIDER as _MAX_TOKENS_BY_PROVIDER, - MAX_TOKENS_BY_MODEL as _MAX_TOKENS_BY_MODEL, chat_complete, chat_stream, create_llm_client, @@ -75,6 +73,143 @@ def get_tool_params(provider: str) -> dict: return {} +def convert_chat_messages_to_llm_format(messages) -> list[dict]: + """Convert ChatMessage DB records to LLM-compatible message dicts. + + Properly handles ``tool_call`` role records by splitting them into an + assistant message (with ``tool_calls`` array) followed by a tool result + message — the format required by OpenAI / Anthropic / Gemini APIs. + + Without this conversion, ``tool_call`` records would be passed with + ``role="tool_call"`` (an invalid role), causing LLM API errors or + silently lost context. + + Args: + messages: Iterable of ChatMessage ORM objects (with ``role``, + ``content``, ``id``, and optional ``thinking`` attributes). + + Returns: + List of dicts suitable for passing to ``call_llm()`` or + ``call_llm_with_failover()``. + """ + import json as _json + + result: list[dict] = [] + for msg in messages: + if msg.role == "tool_call": + try: + tc_data = _json.loads(msg.content) + tc_name = tc_data.get("name", "unknown") + tc_args = tc_data.get("args", {}) + tc_result = tc_data.get("result", "") + tc_id = f"call_{msg.id}" # synthetic tool_call_id + + # Assistant message with tool_calls array + asst_msg: dict = { + "role": "assistant", + "content": None, + "tool_calls": [{ + "id": tc_id, + "type": "function", + "function": { + "name": tc_name, + "arguments": _json.dumps(tc_args, ensure_ascii=False), + }, + }], + } + if tc_data.get("reasoning_content"): + asst_msg["reasoning_content"] = tc_data["reasoning_content"] + result.append(asst_msg) + + # Tool result message + try: + from app.services.vision_inject import sanitize_history_tool_result + sanitized_result = sanitize_history_tool_result(str(tc_result)) + except ImportError: + sanitized_result = str(tc_result) + result.append({ + "role": "tool", + "tool_call_id": tc_id, + "content": sanitized_result[:500], + }) + except Exception: + continue # Skip malformed tool_call records + else: + entry: dict = {"role": msg.role, "content": msg.content} + if hasattr(msg, "thinking") and msg.thinking: + entry["thinking"] = msg.thinking + result.append(entry) + + return result + + +def truncate_messages_with_pair_integrity(messages: list[dict], ctx_size: int) -> list[dict]: + """Truncate message list to ctx_size while preserving assistant+tool pair integrity. + + When context window truncation breaks an assistant(tool_calls) + tool_result + group, the resulting orphaned messages cause "No tool call found for function + call output" errors from the LLM API. This function ensures that: + + 1. No tool_result message exists without its preceding assistant(tool_calls) + 2. No assistant(tool_calls) message exists without all its tool_result messages + """ + truncated = messages[-ctx_size:] + if not truncated: + return truncated + + # Pass 1: Remove leading tool messages (they have no matching assistant before them) + while truncated and truncated[0].get("role") == "tool": + truncated.pop(0) + + if not truncated: + return truncated + + # Pass 2: Scan for broken pairs within the truncated list. + assistant_call_ids: set[str] = set() + tool_call_ids: set[str] = set() + + for msg in truncated: + if msg.get("role") == "assistant" and msg.get("tool_calls"): + for tc in msg["tool_calls"]: + tc_id = tc.get("id", "") + if tc_id: + assistant_call_ids.add(tc_id) + elif msg.get("role") == "tool": + tc_id = msg.get("tool_call_id", "") + if tc_id: + tool_call_ids.add(tc_id) + + orphaned_tools = tool_call_ids - assistant_call_ids + orphaned_assistant_calls = assistant_call_ids - tool_call_ids + + if not orphaned_tools and not orphaned_assistant_calls: + return truncated + + # Remove orphaned tool messages and assistant tool_calls entries + sanitized = [] + for msg in truncated: + if msg.get("role") == "tool": + if msg.get("tool_call_id", "") in orphaned_tools: + continue # Remove orphaned tool result + elif msg.get("role") == "assistant" and msg.get("tool_calls"): + filtered_tcs = [ + tc for tc in msg["tool_calls"] + if tc.get("id", "") not in orphaned_assistant_calls + ] + if filtered_tcs: + new_msg = dict(msg) + new_msg["tool_calls"] = filtered_tcs + sanitized.append(new_msg) + elif msg.get("content"): + new_msg = {k: v for k, v in msg.items() if k != "tool_calls"} + sanitized.append(new_msg) + # else: drop the entire assistant message (no content, no valid tool_calls) + else: + sanitized.append(msg) + + return sanitized + + # Keep backward compatibility aliases __all__ = [ # Original utilities @@ -82,6 +217,9 @@ def get_tool_params(provider: str) -> dict: "get_provider_base_url", "get_max_tokens", "get_model_api_key", + # Message conversion utilities + "convert_chat_messages_to_llm_format", + "truncate_messages_with_pair_integrity", # New client classes "LLMClient", "OpenAICompatibleClient", diff --git a/backend/app/services/wecom_stream.py b/backend/app/services/wecom_stream.py index 51ebe0acb..80230c638 100644 --- a/backend/app/services/wecom_stream.py +++ b/backend/app/services/wecom_stream.py @@ -388,7 +388,8 @@ async def _process_wecom_stream_message( .order_by(ChatMessage.created_at.desc()) .limit(ctx_size) ) - history = [{"role": m.role, "content": m.content} for m in reversed(history_r.scalars().all())] + from app.services.llm.utils import convert_chat_messages_to_llm_format as _conv + history = _conv(reversed(history_r.scalars().all())) # Save user message db.add(ChatMessage(