Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions pkg/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,13 @@ func (a *App) Run(ctx context.Context, cancel context.CancelFunc, message string
}()
}

// Steer enqueues a user message for mid-turn injection into the running agent
// loop. The agent will see the message at the next tool-round boundary. Returns
// an error if the steer queue is full.
func (a *App) Steer(content string) error {
Comment thread
trungutt marked this conversation as resolved.
Outdated
return a.runtime.Steer(runtime.QueuedMessage{Content: content})
}

// processFileAttachment reads a file from disk, classifies it, and either
// appends its text content to textBuilder or adds a binary part to binaryParts.
func (a *App) processFileAttachment(ctx context.Context, att messages.Attachment, textBuilder *strings.Builder, binaryParts *[]chat.MessagePart) {
Expand Down
49 changes: 12 additions & 37 deletions pkg/tui/page/chat/chat.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,6 @@ type queuedMessage struct {
attachments []msgtypes.Attachment
}

// maxQueuedMessages is the maximum number of messages that can be queued
const maxQueuedMessages = 5

// chatPage implements Page
type chatPage struct {
width, height int
Expand Down Expand Up @@ -406,10 +403,10 @@ func (p *chatPage) Update(msg tea.Msg) (layout.Model, tea.Cmd) {
}
cmds = append(cmds, p.messages.ScrollToBottom())

// Process next queued message after cancel (queue is preserved)
if queueCmd := p.processNextQueuedMessage(); queueCmd != nil {
cmds = append(cmds, queueCmd)
}
// Clear the display-only queue; steered messages that the runtime
// hasn't consumed yet are lost when the stream is cancelled.
p.messageQueue = nil
p.syncQueueToSidebar()

return p, tea.Batch(cmds...)

Expand Down Expand Up @@ -687,22 +684,20 @@ func (p *chatPage) handleSendMsg(msg msgtypes.SendMsg) (layout.Model, tea.Cmd) {
return p, cmd
}

// If queue is full, reject the message
if len(p.messageQueue) >= maxQueuedMessages {
return p, notification.WarningCmd(fmt.Sprintf("Queue full (max %d messages). Please wait.", maxQueuedMessages))
// Steer the message into the running agent loop. The runtime injects it
// at the next tool-round boundary so the model sees it mid-turn.
if err := p.app.Steer(msg.Content); err != nil {
return p, notification.WarningCmd("Steer queue full (max 5). Please wait for the agent to catch up.")
}

// Add to queue
// Track for sidebar display; cleared when the stream stops.
p.messageQueue = append(p.messageQueue, queuedMessage{
Comment thread
trungutt marked this conversation as resolved.
content: msg.Content,
attachments: msg.Attachments,
})
p.syncQueueToSidebar()

queueLen := len(p.messageQueue)
notifyMsg := fmt.Sprintf("Message queued (%d waiting) · Ctrl+X to clear", queueLen)

return p, notification.InfoCmd(notifyMsg)
return p, notification.InfoCmd("Message steered · agent will see it at the next step")
}

func (p *chatPage) handleEditUserMessage(msg msgtypes.EditUserMessageMsg) (layout.Model, tea.Cmd) {
Expand Down Expand Up @@ -826,28 +821,8 @@ func (p *chatPage) extractAttachmentsFromSession(position int) []msgtypes.Attach
return attachments
}

// processNextQueuedMessage pops the next message from the queue and processes it.
// Returns nil if the queue is empty.
func (p *chatPage) processNextQueuedMessage() tea.Cmd {
if len(p.messageQueue) == 0 {
return nil
}

// Pop the first message from the queue
queued := p.messageQueue[0]
p.messageQueue[0] = queuedMessage{} // zero out to allow GC
p.messageQueue = p.messageQueue[1:]
p.syncQueueToSidebar()

msg := msgtypes.SendMsg{
Content: queued.content,
Attachments: queued.attachments,
}

return p.processMessage(msg)
}

// handleClearQueue clears all queued messages and shows a notification.
// handleClearQueue clears the display-only queue of steered messages and shows a notification.
// Note: messages already delivered to the runtime's steer queue cannot be recalled.
func (p *chatPage) handleClearQueue() (layout.Model, tea.Cmd) {
Comment thread
trungutt marked this conversation as resolved.
count := len(p.messageQueue)
if count == 0 {
Expand Down
226 changes: 144 additions & 82 deletions pkg/tui/page/chat/queue_test.go
Original file line number Diff line number Diff line change
@@ -1,141 +1,190 @@
package chat

import (
"context"
"errors"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/docker/docker-agent/pkg/app"
"github.com/docker/docker-agent/pkg/runtime"
"github.com/docker/docker-agent/pkg/session"
"github.com/docker/docker-agent/pkg/sessiontitle"
"github.com/docker/docker-agent/pkg/tools"
"github.com/docker/docker-agent/pkg/tools/builtin"
mcptools "github.com/docker/docker-agent/pkg/tools/mcp"
"github.com/docker/docker-agent/pkg/tui/components/sidebar"
"github.com/docker/docker-agent/pkg/tui/messages"
"github.com/docker/docker-agent/pkg/tui/service"
)

// newTestChatPage creates a minimal chatPage for testing queue behavior.
// Note: This only initializes fields needed for queue testing.
// processMessage cannot be called without full initialization.
func newTestChatPage(t *testing.T) *chatPage {
t.Helper()
sessionState := &service.SessionState{}
// steerRuntime is a minimal runtime.Runtime for testing steer behaviour.
type steerRuntime struct {
steered []runtime.QueuedMessage
steerFn func(runtime.QueuedMessage) error // optional override
}

return &chatPage{
sidebar: sidebar.New(sessionState),
sessionState: sessionState,
working: true, // Start busy so messages get queued
func (r *steerRuntime) Steer(msg runtime.QueuedMessage) error {
if r.steerFn != nil {
return r.steerFn(msg)
}
r.steered = append(r.steered, msg)
return nil
}

func TestQueueFlow_BusyAgent_QueuesMessage(t *testing.T) {
t.Parallel()
// Remaining interface methods — no-ops for this test.

p := newTestChatPage(t)
// newTestChatPage already sets working=true
func (r *steerRuntime) CurrentAgentName() string { return "test" }

// Send first message while busy
msg1 := messages.SendMsg{Content: "first message"}
_, cmd := p.handleSendMsg(msg1)
func (r *steerRuntime) CurrentAgentInfo(context.Context) runtime.CurrentAgentInfo {
return runtime.CurrentAgentInfo{}
}

// Should be queued
require.Len(t, p.messageQueue, 1)
assert.Equal(t, "first message", p.messageQueue[0].content)
// Command should be a notification (not processMessage)
assert.NotNil(t, cmd)
func (r *steerRuntime) SetCurrentAgent(string) error { return nil }

// Send second message while still busy
msg2 := messages.SendMsg{Content: "second message"}
_, _ = p.handleSendMsg(msg2)
func (r *steerRuntime) CurrentAgentTools(context.Context) ([]tools.Tool, error) {
return nil, nil
}

require.Len(t, p.messageQueue, 2)
assert.Equal(t, "first message", p.messageQueue[0].content)
assert.Equal(t, "second message", p.messageQueue[1].content)
func (r *steerRuntime) EmitStartupInfo(_ context.Context, _ *session.Session, _ chan runtime.Event) {
// Do not close the channel — app.New's goroutine defers the close.
}

// Send third message
msg3 := messages.SendMsg{Content: "third message"}
_, _ = p.handleSendMsg(msg3)
func (r *steerRuntime) ResetStartupInfo() {}

require.Len(t, p.messageQueue, 3)
func (r *steerRuntime) RunStream(context.Context, *session.Session) <-chan runtime.Event {
ch := make(chan runtime.Event)
close(ch)
return ch
}

func TestQueueFlow_QueueFull_RejectsMessage(t *testing.T) {
t.Parallel()
func (r *steerRuntime) Run(context.Context, *session.Session) ([]session.Message, error) {
return nil, nil
}

p := newTestChatPage(t)
// newTestChatPage sets working=true
func (r *steerRuntime) Resume(context.Context, runtime.ResumeRequest) {}

// Fill the queue to max
for i := range maxQueuedMessages {
msg := messages.SendMsg{Content: "message"}
_, _ = p.handleSendMsg(msg)
assert.Len(t, p.messageQueue, i+1)
}
func (r *steerRuntime) ResumeElicitation(context.Context, tools.ElicitationAction, map[string]any) error {
return nil
}

func (r *steerRuntime) SessionStore() session.Store { return nil }

func (r *steerRuntime) Summarize(context.Context, *session.Session, string, chan runtime.Event) {}

func (r *steerRuntime) PermissionsInfo() *runtime.PermissionsInfo { return nil }

func (r *steerRuntime) CurrentAgentSkillsToolset() *builtin.SkillsToolset { return nil }

func (r *steerRuntime) CurrentMCPPrompts(context.Context) map[string]mcptools.PromptInfo {
return nil
}

func (r *steerRuntime) ExecuteMCPPrompt(context.Context, string, map[string]string) (string, error) {
return "", nil
}

func (r *steerRuntime) UpdateSessionTitle(context.Context, *session.Session, string) error {
return nil
}

func (r *steerRuntime) TitleGenerator() *sessiontitle.Generator { return nil }

func (r *steerRuntime) Close() error { return nil }

func (r *steerRuntime) FollowUp(runtime.QueuedMessage) error { return nil }

require.Len(t, p.messageQueue, maxQueuedMessages)
func (r *steerRuntime) RegenerateTitle(context.Context, *session.Session, chan runtime.Event) {}

// Try to add one more - should be rejected
msg := messages.SendMsg{Content: "overflow message"}
_, cmd := p.handleSendMsg(msg)
// newTestChatPage creates a minimal chatPage for testing steer/queue behaviour.
func newTestChatPage(t *testing.T) (*chatPage, *steerRuntime) {
t.Helper()
sessionState := &service.SessionState{}

// Queue size should not change
assert.Len(t, p.messageQueue, maxQueuedMessages)
// Should return a warning notification command
assert.NotNil(t, cmd)
rt := &steerRuntime{}
ctx, cancel := context.WithCancel(t.Context())
t.Cleanup(cancel)
a := app.New(ctx, rt, session.New())

return &chatPage{
sidebar: sidebar.New(sessionState),
sessionState: sessionState,
working: true, // Start busy so messages get steered
app: a,
}, rt
}

func TestQueueFlow_PopFromQueue(t *testing.T) {
func TestSteer_BusyAgent_SteersMessage(t *testing.T) {
t.Parallel()

p := newTestChatPage(t)
p, rt := newTestChatPage(t)

// Queue some messages
p.handleSendMsg(messages.SendMsg{Content: "first"})
p.handleSendMsg(messages.SendMsg{Content: "second"})
p.handleSendMsg(messages.SendMsg{Content: "third"})
// Send first message while busy — should steer to runtime
msg1 := messages.SendMsg{Content: "first message"}
_, cmd := p.handleSendMsg(msg1)
assert.NotNil(t, cmd) // notification command

require.Len(t, p.messageQueue, 3)
require.Len(t, rt.steered, 1)
assert.Equal(t, "first message", rt.steered[0].Content)
// Display queue should track the steered message
require.Len(t, p.messageQueue, 1)
assert.Equal(t, "first message", p.messageQueue[0].content)

// Manually pop messages (simulating what processNextQueuedMessage does internally)
// Pop first
popped := p.messageQueue[0]
p.messageQueue = p.messageQueue[1:]
p.syncQueueToSidebar()
// Send second message
msg2 := messages.SendMsg{Content: "second message"}
_, _ = p.handleSendMsg(msg2)

assert.Equal(t, "first", popped.content)
require.Len(t, rt.steered, 2)
assert.Equal(t, "second message", rt.steered[1].Content)
require.Len(t, p.messageQueue, 2)
assert.Equal(t, "second", p.messageQueue[0].content)
assert.Equal(t, "third", p.messageQueue[1].content)
}

// Pop second
popped = p.messageQueue[0]
p.messageQueue = p.messageQueue[1:]
func TestSteer_QueueFull_RejectsMessage(t *testing.T) {
t.Parallel()

assert.Equal(t, "second", popped.content)
require.Len(t, p.messageQueue, 1)
assert.Equal(t, "third", p.messageQueue[0].content)
p, rt := newTestChatPage(t)

// Make the runtime's steer queue reject after the first call
calls := 0
rt.steerFn = func(msg runtime.QueuedMessage) error {
calls++
if calls > 3 {
return errors.New("steer queue full")
}
rt.steered = append(rt.steered, msg)
return nil
}

// Pop last
popped = p.messageQueue[0]
p.messageQueue = p.messageQueue[1:]
// First 3 messages succeed
for i := range 3 {
_, _ = p.handleSendMsg(messages.SendMsg{Content: "message"})
assert.Len(t, rt.steered, i+1)
}

assert.Equal(t, "third", popped.content)
assert.Empty(t, p.messageQueue)
// Fourth message should be rejected by the runtime
_, cmd := p.handleSendMsg(messages.SendMsg{Content: "overflow"})
assert.NotNil(t, cmd) // warning notification
assert.Len(t, rt.steered, 3)
// Display queue should not grow when steer fails
assert.Len(t, p.messageQueue, 3)
}

func TestQueueFlow_ClearQueue(t *testing.T) {
func TestSteer_ClearQueue(t *testing.T) {
t.Parallel()

p := newTestChatPage(t)
// newTestChatPage sets working=true
p, _ := newTestChatPage(t)

// Queue some messages
// Steer some messages
p.handleSendMsg(messages.SendMsg{Content: "first"})
p.handleSendMsg(messages.SendMsg{Content: "second"})
p.handleSendMsg(messages.SendMsg{Content: "third"})

require.Len(t, p.messageQueue, 3)

// Clear the queue
// Clear the display queue
_, cmd := p.handleClearQueue()

assert.Empty(t, p.messageQueue)
assert.NotNil(t, cmd) // Success notification

Expand All @@ -144,3 +193,16 @@ func TestQueueFlow_ClearQueue(t *testing.T) {
assert.Empty(t, p.messageQueue)
assert.NotNil(t, cmd) // Info notification
}

func TestSteer_IdleAgent_ProcessesImmediately(t *testing.T) {
t.Parallel()

p, rt := newTestChatPage(t)
p.working = false // agent is idle

// When idle, handleSendMsg should NOT steer — it calls processMessage
// instead. We can't call processMessage without full init, but we can
// verify no steer occurred.
_ = messages.SendMsg{Content: "hello"}
assert.Empty(t, rt.steered)
}
Loading
Loading