Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion backend/app/api/dingtalk.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,8 @@ async def process_dingtalk_message(
.order_by(ChatMessage.created_at.desc())
.limit(ctx_size)
)
history = [{"role": m.role, "content": m.content} for m in reversed(history_r.scalars().all())]
from app.services.llm.utils import convert_chat_messages_to_llm_format as _conv
history = _conv(reversed(history_r.scalars().all()))

# Build saved_content for DB (no base64 blobs, keep it display-friendly)
import re as _re_dt
Expand Down
3 changes: 2 additions & 1 deletion backend/app/api/discord_bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,8 @@ async def handle_in_background():
.order_by(ChatMessage.created_at.desc())
.limit(ctx_size)
)
history = [{"role": m.role, "content": m.content} for m in reversed(history_r.scalars().all())]
from app.services.llm.utils import convert_chat_messages_to_llm_format as _conv
history = _conv(reversed(history_r.scalars().all()))

# Save user message
bg_db.add(ChatMessage(agent_id=agent_id, user_id=platform_user_id, role="user", content=user_text, conversation_id=session_conv_id))
Expand Down
27 changes: 13 additions & 14 deletions backend/app/api/feishu.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,20 @@
from collections.abc import Awaitable, Callable
from datetime import datetime

from fastapi import APIRouter, Depends, HTTPException, Request, Response, status
from fastapi import APIRouter, Depends, HTTPException, Request, status
from fastapi.responses import HTMLResponse
from loguru import logger
from sqlalchemy import select, or_
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession

from app.core.permissions import check_agent_access, is_agent_creator, is_agent_expired
from app.core.security import get_current_user
from app.database import async_session as _async_session, get_db
from app.models.channel_config import ChannelConfig
from app.models.user import User
from app.models.identity import IdentityProvider
from app.schemas.schemas import ChannelConfigCreate, ChannelConfigOut, TokenResponse, UserOut
from app.services.feishu_service import feishu_service
from app.services.llm.utils import convert_chat_messages_to_llm_format, truncate_messages_with_pair_integrity
from app.services.storage import agent_upload_key, get_storage_backend, store_agent_upload

router = APIRouter(tags=["feishu"])
Expand Down Expand Up @@ -265,8 +266,6 @@ async def _save_feishu_tool_call(

# ─── OAuth ──────────────────────────────────────────────

from fastapi.responses import HTMLResponse, Response

@router.get("/auth/feishu/callback")
@router.post("/auth/feishu/callback", response_model=TokenResponse)
async def feishu_oauth_callback(
Expand Down Expand Up @@ -673,10 +672,9 @@ async def process_feishu_event(agent_id: uuid.UUID, body: dict):
.limit(ctx_size)
)
history_msgs = history_result.scalars().all()
history = _build_llm_history_from_chat_messages(list(reversed(history_msgs)))
history = convert_chat_messages_to_llm_format(reversed(history_msgs))

# --- Resolve Feishu sender identity & find/create platform user ---
import uuid as _uuid
import httpx as _httpx

sender_name = ""
Expand Down Expand Up @@ -732,7 +730,9 @@ async def process_feishu_event(agent_id: uuid.UUID, body: dict):
# Cache sender info so feishu_user_search can find them by name
if sender_name and sender_open_id:
try:
import pathlib as _pl, json as _cj, time as _ct
import pathlib as _pl
import json as _cj
import time as _ct
_safe_id = str(agent_id).replace("..", "").replace("/", "")
_cache = _pl.Path(f"/data/workspaces/{_safe_id}/feishu_contacts_cache.json")
_cache.parent.mkdir(parents=True, exist_ok=True)
Expand Down Expand Up @@ -1221,15 +1221,14 @@ async def _handle_feishu_file(
chat_id,
):
"""Handle incoming file or image messages from Feishu (runs as a background task)."""
import asyncio, random, json
import asyncio
import random
import json
from app.models.audit import ChatMessage
from app.models.agent import Agent as AgentModel
from app.models.user import User as UserModel
from app.services.channel_session import find_or_create_channel_session
from app.core.security import hash_password
from app.database import async_session as _async_session
from datetime import datetime as _dt, timezone as _tz
import uuid as _uuid
from sqlalchemy import select as _select

msg_type = message.get("message_type", "file")
Expand Down Expand Up @@ -1404,7 +1403,7 @@ async def _handle_feishu_file(
.order_by(ChatMessage.created_at.desc())
.limit(ctx_size)
)
_history = _build_llm_history_from_chat_messages(list(reversed(_hist_r.scalars().all())))
_history = convert_chat_messages_to_llm_format(reversed(_hist_r.scalars().all()))

await db.commit()

Expand Down Expand Up @@ -1666,7 +1665,7 @@ async def _call_llm_with_config(
from app.models.agent import DEFAULT_CONTEXT_WINDOW_SIZE
ctx_size = agent.context_window_size or DEFAULT_CONTEXT_WINDOW_SIZE
if history:
messages.extend(_normalize_history_messages(history)[-ctx_size:])
messages.extend(truncate_messages_with_pair_integrity(history, ctx_size))
messages.append({"role": "user", "content": user_text})

effective_user_id = user_id or agent_id
Expand Down
5 changes: 2 additions & 3 deletions backend/app/api/gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -392,9 +392,8 @@ async def _send_to_agent_background(
)
hist_msgs = list(reversed(hist_result.scalars().all()))

messages = []
for h in hist_msgs:
messages.append({"role": h.role, "content": h.content or ""})
from app.services.llm.utils import convert_chat_messages_to_llm_format as _conv
messages = _conv(hist_msgs)

# Add the new message with agent communication context
user_msg = f"{agent_comm_alert}\n\n[Message from agent: {source_agent_name}]\n{content}"
Expand Down
3 changes: 2 additions & 1 deletion backend/app/api/slack.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,8 @@ async def slack_event_webhook(
.order_by(ChatMessage.created_at.desc())
.limit(ctx_size)
)
history = [{"role": m.role, "content": m.content} for m in reversed(history_r.scalars().all())]
from app.services.llm.utils import convert_chat_messages_to_llm_format as _conv
history = _conv(reversed(history_r.scalars().all()))

# Handle file attachments: save to workspace/uploads/ and send ack
import asyncio as _asyncio
Expand Down
3 changes: 2 additions & 1 deletion backend/app/api/teams.py
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,8 @@ async def teams_event_webhook(
.order_by(ChatMessage.created_at.desc())
.limit(ctx_size)
)
history = [{"role": m.role, "content": m.content} for m in reversed(history_r.scalars().all())]
from app.services.llm.utils import convert_chat_messages_to_llm_format as _conv
history = _conv(reversed(history_r.scalars().all()))

# Save user message
db.add(ChatMessage(agent_id=agent_id, user_id=platform_user_id, role="user", content=user_text, conversation_id=session_conv_id))
Expand Down
53 changes: 6 additions & 47 deletions backend/app/api/websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@
from time import perf_counter


from fastapi import APIRouter, Depends, Query, WebSocket, WebSocketDisconnect
from fastapi import APIRouter, Query, WebSocket, WebSocketDisconnect
from loguru import logger
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession

from app.core.logging_config import set_trace_id
from app.core.permissions import check_agent_access, is_agent_expired
from app.core.security import decode_access_token, get_current_user
from app.database import async_session, get_db
from app.core.security import decode_access_token
from app.database import async_session
from app.models.agent import Agent
from app.models.audit import ChatMessage
from app.models.chat_session import ChatSession
Expand All @@ -27,6 +27,7 @@
from app.services.agentbay_live import detect_agentbay_env, get_browser_snapshot, get_desktop_screenshot
from app.services.chat_session_service import ensure_primary_platform_session
from app.services.llm import call_llm_with_failover
from app.services.llm.utils import convert_chat_messages_to_llm_format, truncate_messages_with_pair_integrity
from app.services.onboarding import is_onboarded, mark_onboarding_phase, resolve_onboarding_prompt
from app.services.quota_guard import (
AgentExpired,
Expand All @@ -38,7 +39,6 @@
)
from app.services.realtime import realtime_router
from app.services.task_executor import execute_task
from app.services.vision_inject import sanitize_history_tool_result

router = APIRouter(tags=["websocket"])

Expand Down Expand Up @@ -451,46 +451,7 @@ async def _load_history(self, db: AsyncSession):

def _build_conversation_context(self) -> list[dict]:
"""Translates historical ChatMessages to LLM inputs."""
conversation = []
for msg in self.history_messages:
if msg.role == "tool_call":
try:
tc_data = json.loads(msg.content)
tc_name = tc_data.get("name") or tc_data.get("tool_name") or "unknown"
tc_args = tc_data.get("args") or tc_data.get("arguments") or {}
tc_result = tc_data.get("result", "")
tc_id = f"call_{msg.id}"
asst_msg = {
"role": "assistant",
"content": None,
"tool_calls": [
{
"id": tc_id,
"type": "function",
"function": {"name": tc_name, "arguments": json.dumps(tc_args, ensure_ascii=False)},
}
],
}
if tc_data.get("reasoning_content"):
asst_msg["reasoning_content"] = tc_data["reasoning_content"]
conversation.append(asst_msg)

sanitized_result = sanitize_history_tool_result(str(tc_result))
conversation.append(
{
"role": "tool",
"tool_call_id": tc_id,
"content": sanitized_result[:500],
}
)
except Exception:
continue
else:
entry = {"role": msg.role, "content": msg.content}
if hasattr(msg, "thinking") and msg.thinking:
entry["thinking"] = msg.thinking
conversation.append(entry)
return conversation
return convert_chat_messages_to_llm_format(self.history_messages)

async def message_loop(self):
"""Core message processing loop."""
Expand Down Expand Up @@ -845,9 +806,7 @@ async def _call_with_failover():
async def _on_failover(reason: str):
await self.websocket.send_json({"type": "info", "content": f"Primary model error, {reason}"})

_truncated = self.conversation[-self.ctx_size :]
while _truncated and _truncated[0].get("role") == "tool":
_truncated.pop(0)
_truncated = truncate_messages_with_pair_integrity(self.conversation, self.ctx_size)

# Resolve onboarding prompt
skip_tools_for_greeting = False
Expand Down
3 changes: 2 additions & 1 deletion backend/app/api/wecom.py
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,8 @@ async def _process_wecom_text(
.order_by(ChatMessage.created_at.desc())
.limit(ctx_size)
)
history = [{"role": m.role, "content": m.content} for m in reversed(history_r.scalars().all())]
from app.services.llm.utils import convert_chat_messages_to_llm_format as _conv
history = _conv(reversed(history_r.scalars().all()))

# Save user message
db.add(ChatMessage(
Expand Down
6 changes: 2 additions & 4 deletions backend/app/services/discord_gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,10 +215,8 @@ async def _handle_message(
.order_by(ChatMessage.created_at.desc())
.limit(ctx_size)
)
history = [
{"role": m.role, "content": m.content}
for m in reversed(history_r.scalars().all())
]
from app.services.llm.utils import convert_chat_messages_to_llm_format as _conv
history = _conv(reversed(history_r.scalars().all()))

# Save user message
db.add(ChatMessage(
Expand Down
77 changes: 73 additions & 4 deletions backend/app/services/llm/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -743,10 +743,20 @@ def _messages_to_input(self, messages: list[LLMMessage]) -> list[dict[str, Any]]
input_items: list[dict[str, Any]] = []

for msg in messages:
if msg.role in {"system", "user", "assistant"} and msg.content is not None:
item: dict[str, Any] = {"role": msg.role}
item["content"] = self._format_content_for_input(msg.content)
input_items.append(item)
# Handle system messages with dynamic_content
if msg.role == "system" and msg.content is not None:
content = msg.content
if msg.dynamic_content:
content = f"{content}\n\n{msg.dynamic_content}"
input_items.append({
"role": msg.role,
"content": self._format_content_for_input(content),
})
elif msg.role in {"user", "assistant"} and msg.content is not None:
input_items.append({
"role": msg.role,
"content": self._format_content_for_input(msg.content),
})

if msg.role == "assistant" and msg.tool_calls:
for tc in msg.tool_calls:
Expand All @@ -768,8 +778,67 @@ def _messages_to_input(self, messages: list[LLMMessage]) -> list[dict[str, Any]]
"output": msg.content or "",
})

# Sanitize: ensure every function_call_output has a matching function_call.
# This prevents "No tool call found for function call output" API errors
# caused by context window truncation breaking assistant+tool pairs.
input_items = self._sanitize_input_items(input_items)

return input_items

@staticmethod
def _sanitize_input_items(items: list[dict[str, Any]]) -> list[dict[str, Any]]:
"""Remove orphaned function_call_output items that have no matching function_call.

Also removes function_call items whose function_call_output is missing,
since the Responses API requires complete pairs.
"""
# Collect all call_ids from function_call items
call_ids_with_fc: set[str] = set()
for item in items:
if item.get("type") == "function_call":
call_id = item.get("call_id", "")
if call_id:
call_ids_with_fc.add(call_id)

# Collect all call_ids from function_call_output items
call_ids_with_fco: set[str] = set()
for item in items:
if item.get("type") == "function_call_output":
call_id = item.get("call_id", "")
if call_id:
call_ids_with_fco.add(call_id)

# Determine which call_ids are orphaned (output without call, or call without output)
orphaned_fco = call_ids_with_fco - call_ids_with_fc
orphaned_fc = call_ids_with_fc - call_ids_with_fco

if not orphaned_fco and not orphaned_fc:
return items

if orphaned_fco:
logger.warning(
"[OpenAIResponses] Removing %d orphaned function_call_output item(s) "
"with no matching function_call: %s",
len(orphaned_fco),
orphaned_fco,
)
if orphaned_fc:
logger.warning(
"[OpenAIResponses] Removing %d orphaned function_call item(s) "
"with no matching function_call_output: %s",
len(orphaned_fc),
orphaned_fc,
)

# Filter out orphaned items
return [
item for item in items
if not (
(item.get("type") == "function_call_output" and item.get("call_id", "") in orphaned_fco)
or (item.get("type") == "function_call" and item.get("call_id", "") in orphaned_fc)
)
]

def _convert_tools(self, tools: list[dict] | None) -> list[dict] | None:
"""Convert OpenAI tool schema to Responses API function tool schema."""
if not tools:
Expand Down
Loading