diff --git a/mcp/client.go b/mcp/client.go index 1e0e18a4..bae40b1d 100644 --- a/mcp/client.go +++ b/mcp/client.go @@ -300,6 +300,28 @@ func (c *Client) Connect(ctx context.Context, t Transport, opts *ClientSessionOp if hc, ok := cs.mcpConn.(clientConnection); ok { hc.sessionUpdated(cs.state) } + subscribeParams := &SubscriptionsListenParams{} + if c.opts.ToolListChangedHandler != nil { + subscribeParams.Notifications.ToolsListChanged = true + } + if c.opts.PromptListChangedHandler != nil { + subscribeParams.Notifications.PromptsListChanged = true + } + if c.opts.ResourceListChangedHandler != nil { + subscribeParams.Notifications.ResourcesListChanged = true + } + if subscribeParams.Notifications.ToolsListChanged || + subscribeParams.Notifications.PromptsListChanged || + subscribeParams.Notifications.ResourcesListChanged { + // The listen blocks until the server cancels it. Run it in + // a goroutine so Connect can return; ClientSession.Close + // cancels its context to send notifications/cancelled. + listenCtx, cancelListen := context.WithCancel(context.Background()) + cs.listenCancel = cancelListen + go func() { + _ = cs.subscriptionsListen(listenCtx, subscribeParams) + }() + } return cs, nil } @@ -416,6 +438,7 @@ type ClientSession struct { conn *jsonrpc2.Connection client *Client keepaliveCancel context.CancelFunc + listenCancel context.CancelFunc mcpConn Connection // No mutex is (currently) required to guard the session state, because it is @@ -432,6 +455,15 @@ type ClientSession struct { // It is used to look up x-mcp-header annotations when // constructing Mcp-Param-* headers for tools/call requests. toolCache map[string]*Tool + + // resourceSubsMu guards resourceSubs. + resourceSubsMu sync.Mutex + // resourceSubs maps a subscribed resource URI to the cancel func of the + // goroutine running its dedicated subscriptions/listen stream. Populated + // only under SEP-2575; the legacy protocol routes Subscribe and + // Unsubscribe straight to the resources/subscribe and resources/unsubscribe + // RPCs and leaves this map untouched. + resourceSubs map[string]context.CancelFunc } type clientSessionState struct { @@ -498,6 +530,10 @@ func (cs *ClientSession) Close() error { if cs.keepaliveCancel != nil { cs.keepaliveCancel() } + if cs.listenCancel != nil { + cs.listenCancel() + } + cs.cancelAllResourceSubscriptions() err := cs.conn.Close() if cs.onClose != nil && cs.calledOnClose.CompareAndSwap(false, true) { @@ -1079,6 +1115,7 @@ var clientMethodInfos = map[string]methodInfo{ notificationLoggingMessage: newClientMethodInfo(clientMethod((*Client).callLoggingHandler), notification), notificationProgress: newClientMethodInfo(clientSessionMethod((*ClientSession).callProgressNotificationHandler), notification), notificationElicitationComplete: newClientMethodInfo(clientMethod((*Client).callElicitationCompleteHandler), notification|missingParamsOK), + notificationSubscriptionsAck: newClientMethodInfo(clientMethod((*Client).callSubscriptionsAckHandler), notification|missingParamsOK), } func (cs *ClientSession) sendingMethodInfos() map[string]methodInfo { @@ -1223,17 +1260,90 @@ func (cs *ClientSession) Complete(ctx context.Context, params *CompleteParams) ( // Subscribe sends a "resources/subscribe" request to the server, asking for // notifications when the specified resource changes. func (cs *ClientSession) Subscribe(ctx context.Context, params *SubscribeParams) error { - _, err := handleSend[*emptyResult](ctx, methodSubscribe, newClientRequest(cs, orZero[Params](params))) - return err + if !cs.usesNewProtocol() { + _, err := handleSend[*emptyResult](ctx, methodSubscribe, newClientRequest(cs, orZero[Params](params))) + return err + } + if params == nil || params.URI == "" { + return fmt.Errorf("Subscribe: missing URI") + } + uri := params.URI + + cs.resourceSubsMu.Lock() + if _, exists := cs.resourceSubs[uri]; exists { + cs.resourceSubsMu.Unlock() + return nil + } + listenCtx, cancel := context.WithCancel(context.Background()) + if cs.resourceSubs == nil { + cs.resourceSubs = make(map[string]context.CancelFunc) + } + cs.resourceSubs[uri] = cancel + cs.resourceSubsMu.Unlock() + + go func() { + _ = cs.subscriptionsListen(listenCtx, &SubscriptionsListenParams{ + Notifications: NotificationSubscriptions{ + ResourceSubscriptions: []string{uri}, + }, + }) + }() + return nil } -// Unsubscribe sends a "resources/unsubscribe" request to the server, cancelling -// a previous subscription. +// Unsubscribe cancels a previous [ClientSession.Subscribe] for params.URI. +// +// Under the legacy protocol it sends a "resources/unsubscribe" request. +// +// Under SEP-2575 it cancels the background "subscriptions/listen" stream +// opened by Subscribe for the URI. Unsubscribe is idempotent: calling it for +// a URI that is not currently subscribed is a no-op. func (cs *ClientSession) Unsubscribe(ctx context.Context, params *UnsubscribeParams) error { - _, err := handleSend[*emptyResult](ctx, methodUnsubscribe, newClientRequest(cs, orZero[Params](params))) + if !cs.usesNewProtocol() { + _, err := handleSend[*emptyResult](ctx, methodUnsubscribe, newClientRequest(cs, orZero[Params](params))) + return err + } + if params == nil || params.URI == "" { + return fmt.Errorf("Unsubscribe: missing URI") + } + cs.resourceSubsMu.Lock() + cancel, ok := cs.resourceSubs[params.URI] + delete(cs.resourceSubs, params.URI) + cs.resourceSubsMu.Unlock() + if ok { + cancel() + } + return nil +} + +// cancelAllResourceSubscriptions cancels every active SEP-2575 resource +// subscription opened via Subscribe. The listen goroutines exit +// asynchronously as their contexts unwind. Called from Close. +func (cs *ClientSession) cancelAllResourceSubscriptions() { + cs.resourceSubsMu.Lock() + subs := cs.resourceSubs + cs.resourceSubs = nil + cs.resourceSubsMu.Unlock() + for _, cancel := range subs { + cancel() + } +} + +// SubscriptionsListen opens a SEP-2575 "subscriptions/listen" stream and +// blocks for the lifetime of the subscription. The server's first message on +// the stream is "notifications/subscriptions/acknowledged"; subsequent +// opted-in notifications (e.g. tools/list_changed) are delivered through the +// usual handlers registered in [ClientOptions]. +func (cs *ClientSession) subscriptionsListen(ctx context.Context, params *SubscriptionsListenParams) error { + params = injectRequestMeta(cs, params) + _, err := handleSend[*emptyResult](ctx, methodSubscriptionsListen, newClientRequest(cs, orZero[Params](params))) return err } +func (c *Client) callSubscriptionsAckHandler(context.Context, *ClientRequest[*SubscriptionsAcknowledgedParams]) (Result, error) { + return nil, nil +} + func (c *Client) callToolChangedHandler(ctx context.Context, req *ToolListChangedRequest) (Result, error) { if h := c.opts.ToolListChangedHandler; h != nil { h(ctx, req) diff --git a/mcp/mcp_test.go b/mcp/mcp_test.go index 5c8e7d12..4c89569e 100644 --- a/mcp/mcp_test.go +++ b/mcp/mcp_test.go @@ -12,6 +12,8 @@ import ( "fmt" "io" "log/slog" + "net/http" + "net/http/httptest" "net/url" "path/filepath" "runtime" @@ -2449,3 +2451,309 @@ func TestSetErrorPreservesContent(t *testing.T) { } var ctrCmpOpts = []cmp.Option{cmpopts.IgnoreUnexported(CallToolResult{}, GetPromptResult{}, ReadResourceResult{})} + +// runSubscriptionsListenTest exercises the SEP-2575 auto-listen flow end-to-end +// against the supplied transport pair. It captures every notification and the +// acknowledgment the client sees, then asserts: +// +// - the auto-listen issued by Client.Connect is acknowledged with a tagged +// subscription ID; +// - tool and prompt list-changed notifications are delivered to the matching +// handlers, each carrying the same subscription ID as the ack; +// - the subscription persists across multiple unrelated changes; +// - cs.Close() ends the subscription and further changes don't deliver. +func runSubscriptionsListenTest(t *testing.T, client *Client, server *Server, ct Transport, events chan subListenEvent) { + t.Helper() + + ctx, topCancel := context.WithTimeout(context.Background(), 30*time.Second) + defer topCancel() + + cs, err := client.Connect(ctx, ct, &ClientSessionOptions{protocolVersion: protocolVersion20260630}) + if err != nil { + t.Fatalf("client connect: %v", err) + } + + waitFor := func(kind string) subListenEvent { + t.Helper() + select { + case e := <-events: + if e.kind != kind { + t.Fatalf("got event %q (id=%s), want kind %q", e.kind, e.id, kind) + } + return e + case <-time.After(5 * time.Second): + t.Fatalf("timed out waiting for %q event", kind) + return subListenEvent{} + } + } + expectNoEvent := func(d time.Duration) { + t.Helper() + select { + case e := <-events: + t.Fatalf("unexpected event %q (id=%s)", e.kind, e.id) + case <-time.After(d): + } + } + + ack := waitFor("ack") + if ack.id == "" { + t.Fatalf("acknowledgment missing subscription ID") + } + + server.AddTool(&Tool{Name: "t2", InputSchema: &jsonschema.Schema{Type: "object"}}, nil) + if e := waitFor("tool"); e.id != ack.id { + t.Errorf("first tool notif id = %s, want %s", e.id, ack.id) + } + + server.AddPrompt(&Prompt{Name: "p2"}, nil) + if e := waitFor("prompt"); e.id != ack.id { + t.Errorf("first prompt notif id = %s, want %s", e.id, ack.id) + } + + server.AddTool(&Tool{Name: "t3", InputSchema: &jsonschema.Schema{Type: "object"}}, nil) + if e := waitFor("tool"); e.id != ack.id { + t.Errorf("second tool notif id = %s, want %s", e.id, ack.id) + } + server.AddPrompt(&Prompt{Name: "p3"}, nil) + if e := waitFor("prompt"); e.id != ack.id { + t.Errorf("second prompt notif id = %s, want %s", e.id, ack.id) + } + expectNoEvent(notificationDelay * 5) + + cs.Close() + time.Sleep(50 * time.Millisecond) + + server.AddTool(&Tool{Name: "t4", InputSchema: &jsonschema.Schema{Type: "object"}}, nil) + server.AddPrompt(&Prompt{Name: "p4"}, nil) + expectNoEvent(notificationDelay * 20) +} + +type subListenEvent struct { + kind string // "ack", "tool", "prompt" + id string // subscription ID from _meta, stringified for cross-encoding equality +} + +// newSubListenClient returns a client wired to push every ack and every +// list-changed notification it receives into events, tagged with the kind +// and the subscription ID extracted from _meta. +func newSubListenClient(events chan subListenEvent) *Client { + asEvent := func(kind string, raw any) subListenEvent { + return subListenEvent{kind, fmt.Sprint(raw)} + } + c := NewClient(testImpl, &ClientOptions{ + ToolListChangedHandler: func(_ context.Context, req *ToolListChangedRequest) { + events <- asEvent("tool", req.Params.Meta[MetaKeySubscriptionID]) + }, + PromptListChangedHandler: func(_ context.Context, req *PromptListChangedRequest) { + events <- asEvent("prompt", req.Params.Meta[MetaKeySubscriptionID]) + }, + }) + c.AddReceivingMiddleware(func(next MethodHandler) MethodHandler { + return func(ctx context.Context, method string, req Request) (Result, error) { + if method == notificationSubscriptionsAck { + if cr, ok := req.(*ClientRequest[*SubscriptionsAcknowledgedParams]); ok && cr.Params != nil { + events <- asEvent("ack", cr.Params.Meta[MetaKeySubscriptionID]) + } + } + return next(ctx, method, req) + } + }) + return c +} + +func newSubListenServer() *Server { + s := NewServer(testImpl, nil) + AddTool(s, &Tool{Name: "t1"}, sayHi) + s.AddPrompt(&Prompt{Name: "p1"}, nil) + return s +} + +func enableNewProtocol(t *testing.T) { + t.Helper() + orig := supportedProtocolVersions + supportedProtocolVersions = append([]string{protocolVersion20260630}, slices.Clone(orig)...) + t.Cleanup(func() { supportedProtocolVersions = orig }) +} + +// TestSubscriptionsListen_InMemory exercises the listen flow over the +// session-shared in-memory transport (semantically equivalent to STDIO). +// Cancellation here propagates via notifications/cancelled. +func TestSubscriptionsListen_InMemory(t *testing.T) { + enableNewProtocol(t) + events := make(chan subListenEvent, 64) + server := newSubListenServer() + ct, st := NewInMemoryTransports() + ss, err := server.Connect(context.Background(), st, nil) + if err != nil { + t.Fatalf("server connect: %v", err) + } + defer ss.Close() + runSubscriptionsListenTest(t, newSubListenClient(events), server, ct, events) +} + +// TestSubscriptionsListen_Streamable exercises the listen flow over a +// stateless HTTP server (SEP-2575). Each listen rides its own SSE response +// stream; cs.Close() tears it down. +func TestSubscriptionsListen_Streamable(t *testing.T) { + enableNewProtocol(t) + events := make(chan subListenEvent, 64) + server := newSubListenServer() + handler := NewStreamableHTTPHandler( + func(*http.Request) *Server { return server }, + &StreamableHTTPOptions{Stateless: true}, + ) + httpServer := httptest.NewServer(mustNotPanic(t, handler)) + defer httpServer.Close() + runSubscriptionsListenTest(t, newSubListenClient(events), server, + &StreamableClientTransport{Endpoint: httpServer.URL}, events) +} + +// TestSubscriptionsListen_NoHandlersNoListen verifies that a new-protocol +// client without any list-changed handlers registered does not open an +// auto-listen on connect, and therefore does not receive any acknowledgment +// or downstream notifications. +func TestSubscriptionsListen_NoHandlersNoListen(t *testing.T) { + enableNewProtocol(t) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + events := make(chan subListenEvent, 8) + server := newSubListenServer() + ct, st := NewInMemoryTransports() + ss, err := server.Connect(ctx, st, nil) + if err != nil { + t.Fatalf("server connect: %v", err) + } + defer ss.Close() + + c := NewClient(testImpl, nil) + c.AddReceivingMiddleware(func(next MethodHandler) MethodHandler { + return func(ctx context.Context, method string, req Request) (Result, error) { + if method == notificationSubscriptionsAck { + events <- subListenEvent{"ack", ""} + } + return next(ctx, method, req) + } + }) + cs, err := c.Connect(ctx, ct, &ClientSessionOptions{protocolVersion: protocolVersion20260630}) + if err != nil { + t.Fatalf("client connect: %v", err) + } + defer cs.Close() + + server.AddTool(&Tool{Name: "t2", InputSchema: &jsonschema.Schema{Type: "object"}}, nil) + + select { + case e := <-events: + t.Fatalf("unexpected event %q on no-handler client", e.kind) + case <-time.After(notificationDelay * 10): + } +} + +// resourceSubServer builds a server that advertises resource subscriptions +// and records every Subscribe/Unsubscribe handler invocation through chans. +func resourceSubServer(t *testing.T, subCh, unsubCh chan string) *Server { + t.Helper() + s := NewServer(testImpl, &ServerOptions{ + SubscribeHandler: func(_ context.Context, r *SubscribeRequest) error { + subCh <- r.Params.URI + return nil + }, + UnsubscribeHandler: func(_ context.Context, r *UnsubscribeRequest) error { + unsubCh <- r.Params.URI + return nil + }, + }) + s.AddResource(&Resource{Name: "r1", URI: "file:///r1"}, nil) + return s +} + +// resourceSubEvent is one delivered notifications/resources/updated. +type resourceSubEvent struct { + uri string + id string // _meta subscription ID, stringified +} + +// TestResourceSubscriptionsSEP2575_Streamable verifies the Subscribe -> +// ResourceUpdated path on a stateless Streamable HTTP server. +// +// Caveat: per-subscription Unsubscribe is intentionally NOT verified here. +// In stateless Streamable HTTP mode the subscriptions/listen handler blocks +// on its request context, and neither the HTTP POST disconnect nor the +// separate notifications/cancelled POST currently propagates to that +// handler's context. The handler only unwinds when the server next attempts +// a write to the (now-dead) SSE stream and the writeErr branch in the +// jsonrpc2 layer cancels the in-flight request. To keep the test +// hermetic we therefore trigger a write at the end by adding a resource, +// which fires notifications/resources/list_changed on the auto-listen path +// (if any) and on the per-URI listen, causing the listen handler to unwind. +// The spec-correct fix is to plumb the POST's request context down to the +// subscriptionsListen handler so HTTP disconnect is observed directly; this +// is tracked separately. +func TestResourceSubscriptions_Streamable(t *testing.T) { + enableNewProtocol(t) + + subCh := make(chan string, 8) + unsubCh := make(chan string, 8) + events := make(chan resourceSubEvent, 16) + + server := resourceSubServer(t, subCh, unsubCh) + handler := NewStreamableHTTPHandler( + func(*http.Request) *Server { return server }, + &StreamableHTTPOptions{Stateless: true}, + ) + httpServer := httptest.NewServer(mustNotPanic(t, handler)) + + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + + c := NewClient(testImpl, &ClientOptions{ + ResourceUpdatedHandler: func(_ context.Context, req *ResourceUpdatedNotificationRequest) { + id := "" + if req.Params != nil && req.Params.Meta != nil { + id = fmt.Sprint(req.Params.Meta[MetaKeySubscriptionID]) + } + events <- resourceSubEvent{uri: req.Params.URI, id: id} + }, + }) + cs, err := c.Connect(ctx, &StreamableClientTransport{Endpoint: httpServer.URL}, + &ClientSessionOptions{protocolVersion: protocolVersion20260630}) + if err != nil { + t.Fatalf("client connect: %v", err) + } + + if err := cs.Subscribe(ctx, &SubscribeParams{URI: "file:///r1"}); err != nil { + t.Fatalf("subscribe r1: %v", err) + } + select { + case got := <-subCh: + if got != "file:///r1" { + t.Fatalf("got URI %q, want %q", got, "file:///r1") + } + case <-time.After(5 * time.Second): + t.Fatal("timed out waiting for SubscribeHandler") + } + + server.ResourceUpdated(ctx, &ResourceUpdatedNotificationParams{URI: "file:///r1"}) + select { + case e := <-events: + if e.uri != "file:///r1" { + t.Fatalf("got URI %q, want %q", e.uri, "file:///r1") + } + if e.id == "" || e.id == "" { + t.Fatalf("missing subscription ID on update (got %q)", e.id) + } + case <-time.After(5 * time.Second): + t.Fatal("timed out waiting for resource update") + } + + // See test header comment for the explanation of this teardown ritual: + // close the client, then drop server-side TCP, then drive a write that + // will fail (any extra ResourceUpdated for our URI), to unblock the + // in-flight listen handler so httpServer.Close can return. + _ = cs.Close() + httpServer.CloseClientConnections() + server.ResourceUpdated(ctx, &ResourceUpdatedNotificationParams{URI: "file:///r1"}) + httpServer.Close() +} diff --git a/mcp/protocol.go b/mcp/protocol.go index acc7ec48..c5dd952d 100644 --- a/mcp/protocol.go +++ b/mcp/protocol.go @@ -1883,6 +1883,49 @@ type ResourceUpdatedNotificationParams struct { func (x *ResourceUpdatedNotificationParams) isParams() {} func (x *ResourceUpdatedNotificationParams) isNil() bool { return x == nil } +// NotificationSubscriptions describes the set of server-to-client +// notifications a client wishes to receive on a [SubscriptionsListenParams] +// stream. Each field is an explicit opt-in: a server MUST NOT push +// notifications of a type the client did not request. +type NotificationSubscriptions struct { + // ToolsListChanged opts in to "notifications/tools/list_changed". + ToolsListChanged bool `json:"toolsListChanged,omitempty"` + // PromptsListChanged opts in to "notifications/prompts/list_changed". + PromptsListChanged bool `json:"promptsListChanged,omitempty"` + // ResourcesListChanged opts in to "notifications/resources/list_changed". + ResourcesListChanged bool `json:"resourcesListChanged,omitempty"` + // ResourceSubscriptions enumerates the resource URIs for which the client + // wants "notifications/resources/updated". Replaces the legacy + // resources/subscribe RPC. + ResourceSubscriptions []string `json:"resourceSubscriptions,omitempty"` +} + +// SubscriptionsListenParams are the parameters for the +// "subscriptions/listen" RPC. +type SubscriptionsListenParams struct { + // Meta carries the per-request `_meta` triple. + Meta `json:"_meta,omitempty"` + // Notifications declares which notification types the client wants to + // receive on this stream. + Notifications NotificationSubscriptions `json:"notifications"` +} + +func (x *SubscriptionsListenParams) isParams() {} +func (x *SubscriptionsListenParams) isNil() bool { return x == nil } + +// SubscriptionsAcknowledgedParams are the parameters for the +// "notifications/subscriptions/acknowledged" notification, which the server +// MUST send as the first message on a subscriptions/listen stream. It carries +// the subset of the requested [NotificationSubscriptions] that the server has +// agreed to honor. +type SubscriptionsAcknowledgedParams struct { + Meta `json:"_meta,omitempty"` + Notifications NotificationSubscriptions `json:"notifications"` +} + +func (x *SubscriptionsAcknowledgedParams) isParams() {} +func (x *SubscriptionsAcknowledgedParams) isNil() bool { return x == nil } + // TODO(jba): add CompleteRequest and related types. // A request from the server to elicit additional information from the user via the client. @@ -2086,8 +2129,10 @@ const ( notificationRootsListChanged = "notifications/roots/list_changed" methodSetLevel = "logging/setLevel" methodSubscribe = "resources/subscribe" + methodSubscriptionsListen = "subscriptions/listen" notificationToolListChanged = "notifications/tools/list_changed" methodUnsubscribe = "resources/unsubscribe" + notificationSubscriptionsAck = "notifications/subscriptions/acknowledged" ) // Per-request _meta field names for the >= 2026-06-30 protocol version. @@ -2104,6 +2149,9 @@ const ( MetaKeyClientCapabilities = "io.modelcontextprotocol/clientCapabilities" // MetaKeyLogLevel identifies the desired log level for the request. MetaKeyLogLevel = "io.modelcontextprotocol/logLevel" + // MetaKeySubscriptionID identifies the subscriptions/listen request that an + // out-of-band notification belongs to. + MetaKeySubscriptionID = "io.modelcontextprotocol/subscriptionId" ) // UnsupportedProtocolVersionData is the SEP-2575 payload carried in the diff --git a/mcp/requests.go b/mcp/requests.go index 36368c99..64414caa 100644 --- a/mcp/requests.go +++ b/mcp/requests.go @@ -19,6 +19,7 @@ type ( ReadResourceRequest = ServerRequest[*ReadResourceParams] RootsListChangedRequest = ServerRequest[*RootsListChangedParams] SubscribeRequest = ServerRequest[*SubscribeParams] + SubscriptionsListenRequest = ServerRequest[*SubscriptionsListenParams] UnsubscribeRequest = ServerRequest[*UnsubscribeParams] ) diff --git a/mcp/server.go b/mcp/server.go index 912dea98..338eedf7 100644 --- a/mcp/server.go +++ b/mcp/server.go @@ -44,16 +44,19 @@ type Server struct { impl *Implementation opts ServerOptions - mu sync.Mutex - prompts *featureSet[*serverPrompt] - tools *featureSet[*serverTool] - resources *featureSet[*serverResource] - resourceTemplates *featureSet[*serverResourceTemplate] - sessions []*ServerSession - sendingMethodHandler_ MethodHandler - receivingMethodHandler_ MethodHandler - resourceSubscriptions map[string]map[*ServerSession]bool // uri -> session -> bool - pendingNotifications map[string]*time.Timer // notification name -> timer for pending notification send + mu sync.Mutex + prompts *featureSet[*serverPrompt] + tools *featureSet[*serverTool] + resources *featureSet[*serverResource] + resourceTemplates *featureSet[*serverResourceTemplate] + sessions []*ServerSession + sendingMethodHandler_ MethodHandler + receivingMethodHandler_ MethodHandler + toolChangeSubscriptions map[*ServerSession]jsonrpc.ID // session -> requestID for "tools/changed" + promptChangeSubscriptions map[*ServerSession]jsonrpc.ID // session -> requestID for "prompts/changed" + resourceChangeSubscriptions map[*ServerSession]jsonrpc.ID // session -> requestID for "resources/changed" + resourceSubscriptions map[string]map[*ServerSession]jsonrpc.ID // uri -> session -> requestID + pendingNotifications map[string]*time.Timer // notification name -> timer for pending notification send } // ServerOptions is used to configure behavior of the server. @@ -195,16 +198,19 @@ func NewServer(impl *Implementation, options *ServerOptions) *Server { } s := &Server{ - impl: impl, - opts: opts, - prompts: newFeatureSet(func(p *serverPrompt) string { return p.prompt.Name }), - tools: newFeatureSet(func(t *serverTool) string { return t.tool.Name }), - resources: newFeatureSet(func(r *serverResource) string { return r.resource.URI }), - resourceTemplates: newFeatureSet(func(t *serverResourceTemplate) string { return t.resourceTemplate.URITemplate }), - sendingMethodHandler_: defaultSendingMethodHandler, - receivingMethodHandler_: defaultReceivingMethodHandler[*ServerSession], - resourceSubscriptions: make(map[string]map[*ServerSession]bool), - pendingNotifications: make(map[string]*time.Timer), + impl: impl, + opts: opts, + prompts: newFeatureSet(func(p *serverPrompt) string { return p.prompt.Name }), + tools: newFeatureSet(func(t *serverTool) string { return t.tool.Name }), + resources: newFeatureSet(func(r *serverResource) string { return r.resource.URI }), + resourceTemplates: newFeatureSet(func(t *serverResourceTemplate) string { return t.resourceTemplate.URITemplate }), + sendingMethodHandler_: defaultSendingMethodHandler, + receivingMethodHandler_: defaultReceivingMethodHandler[*ServerSession], + toolChangeSubscriptions: make(map[*ServerSession]jsonrpc.ID), + promptChangeSubscriptions: make(map[*ServerSession]jsonrpc.ID), + resourceChangeSubscriptions: make(map[*ServerSession]jsonrpc.ID), + resourceSubscriptions: make(map[string]map[*ServerSession]jsonrpc.ID), + pendingNotifications: make(map[string]*time.Timer), } s.AddReceivingMiddleware(serverMultiRoundTripMiddleware()) return s @@ -643,12 +649,11 @@ func (s *Server) complete(ctx context.Context, req *CompleteRequest) (*CompleteR return s.opts.CompletionHandler(ctx, req) } -// Map from notification name to its corresponding params. The params have no fields, -// so a single struct can be reused. -var changeNotificationParams = map[string]Params{ - notificationToolListChanged: &ToolListChangedParams{}, - notificationPromptListChanged: &PromptListChangedParams{}, - notificationResourceListChanged: &ResourceListChangedParams{}, +// Map from notification name to its corresponding params. +var changeNotificationParams = map[string]func() Params{ + notificationToolListChanged: func() Params { return &ToolListChangedParams{} }, + notificationPromptListChanged: func() Params { return &PromptListChangedParams{} }, + notificationResourceListChanged: func() Params { return &ResourceListChangedParams{} }, } // How long to wait before sending a change notification. @@ -682,12 +687,56 @@ func (s *Server) changeAndNotify(notification string, change func() bool) { // notifySessions sends the notification n to all existing sessions. // It is called asynchronously by changeAndNotify. +// +// Legacy (pre-SEP-2575) sessions receive the notification on the shared +// session channel. Sessions speaking the new protocol receive it only if they +// have an active subscriptions/listen stream that opted in to this +// notification type. func (s *Server) notifySessions(n string) { s.mu.Lock() sessions := slices.Clone(s.sessions) s.pendingNotifications[n] = nil + var subscribers map[*ServerSession]jsonrpc.ID + switch n { + case notificationToolListChanged: + subscribers = maps.Clone(s.toolChangeSubscriptions) + case notificationPromptListChanged: + subscribers = maps.Clone(s.promptChangeSubscriptions) + case notificationResourceListChanged: + subscribers = maps.Clone(s.resourceChangeSubscriptions) + } s.mu.Unlock() // Don't hold the lock during notification: it causes deadlock. - notifySessions(sessions, n, changeNotificationParams[n], s.opts.Logger) + + // Legacy sessions receive list-changed notifications on the shared session channel without opt-in. + var legacySessions []*ServerSession + for _, sess := range sessions { + if sess.InitializeParams().isNil() || sess.InitializeParams().ProtocolVersion < protocolVersion20260630 { + legacySessions = append(legacySessions, sess) + } + } + notifySessions(legacySessions, n, changeNotificationParams[n](), s.opts.Logger) + + // Sessions receive the notification only if they opened a + // subscriptions/listen stream that opted in to this notification type. + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + for sess, reqID := range subscribers { + params := changeNotificationParams[n]() + injectMetaSubscriptionID(params, reqID) + req := newRequest(sess, params) + if err := handleNotify(ctx, n, req); err != nil { + s.opts.Logger.Warn(fmt.Sprintf("calling %s: %v", n, err)) + } + } +} + +func injectMetaSubscriptionID(params Params, reqID jsonrpc.ID) { + m := params.GetMeta() + if m == nil { + m = map[string]any{} + } + m[MetaKeySubscriptionID] = reqID.Raw() + params.SetMeta(m) } // shouldSendListChangedNotification checks if the server's capabilities allow @@ -981,12 +1030,38 @@ func (s *Server) ResourceUpdated(ctx context.Context, params *ResourceUpdatedNot subscribedSessions := s.resourceSubscriptions[params.URI] sessions := slices.Collect(maps.Keys(subscribedSessions)) s.mu.Unlock() - notifySessions(sessions, notificationResourceUpdated, params, s.opts.Logger) + // Only add legacy sessions for the notification, new ones use the new notification mechanism. + var legacySessions []*ServerSession + var newSessions []*ServerSession + for _, sess := range sessions { + if sess.InitializeParams().isNil() || sess.InitializeParams().ProtocolVersion < protocolVersion20260630 { + legacySessions = append(legacySessions, sess) + } else { + newSessions = append(newSessions, sess) + } + } + notifySessions(legacySessions, notificationResourceUpdated, params, s.opts.Logger) s.opts.Logger.Info("resource updated notification sent", "uri", params.URI, "subscriber_count", len(sessions)) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + for _, sess := range newSessions { + reqID := subscribedSessions[sess] + p := *params + injectMetaSubscriptionID(&p, reqID) + req := newRequest(sess, &p) + if err := handleNotify(ctx, notificationResourceUpdated, req); err != nil { + s.opts.Logger.Warn(fmt.Sprintf("calling %s: %v", notificationResourceUpdated, err)) + } + } return nil } func (s *Server) subscribe(ctx context.Context, req *SubscribeRequest) (*emptyResult, error) { + requestID, ok := ctx.Value(idContextKey{}).(jsonrpc.ID) + if !ok || !requestID.IsValid() { + return nil, fmt.Errorf("%w: subscribe requires a request ID", jsonrpc2.ErrInvalidRequest) + } if s.opts.SubscribeHandler == nil { return nil, fmt.Errorf("%w: server does not support resource subscriptions", jsonrpc2.ErrMethodNotFound) } @@ -997,10 +1072,10 @@ func (s *Server) subscribe(ctx context.Context, req *SubscribeRequest) (*emptyRe s.mu.Lock() defer s.mu.Unlock() if s.resourceSubscriptions[req.Params.URI] == nil { - s.resourceSubscriptions[req.Params.URI] = make(map[*ServerSession]bool) + s.resourceSubscriptions[req.Params.URI] = make(map[*ServerSession]jsonrpc.ID) } - s.resourceSubscriptions[req.Params.URI][req.Session] = true - s.opts.Logger.Info("resource subscribed", "uri", req.Params.URI, "session_id", req.Session.ID()) + s.resourceSubscriptions[req.Params.URI][req.Session] = requestID + s.opts.Logger.Info("resource subscribed", "uri", req.Params.URI, "session_id", req.Session.ID(), "request_id", requestID) return &emptyResult{}, nil } @@ -1027,6 +1102,82 @@ func (s *Server) unsubscribe(ctx context.Context, req *UnsubscribeRequest) (*emp return &emptyResult{}, nil } +func (s *Server) subscriptionsListen(ctx context.Context, req *SubscriptionsListenRequest) (*emptyResult, error) { + requestID, ok := ctx.Value(idContextKey{}).(jsonrpc.ID) + if !ok || !requestID.IsValid() { + return nil, fmt.Errorf("%w: subscriptions/listen requires a request ID", jsonrpc2.ErrInvalidRequest) + } + + allowed := s.allowedSubscriptions(req.Params.Notifications) + s.mu.Lock() + if allowed.ToolsListChanged { + s.toolChangeSubscriptions[req.Session] = requestID + } + if allowed.PromptsListChanged { + s.promptChangeSubscriptions[req.Session] = requestID + } + if allowed.ResourcesListChanged { + s.resourceChangeSubscriptions[req.Session] = requestID + } + s.mu.Unlock() + defer func() { + s.mu.Lock() + delete(s.toolChangeSubscriptions, req.Session) + delete(s.promptChangeSubscriptions, req.Session) + delete(s.resourceChangeSubscriptions, req.Session) + s.mu.Unlock() + }() + + for _, uri := range allowed.ResourceSubscriptions { + _, err := s.subscribe(ctx, &SubscribeRequest{ + Session: req.Session, + Params: &SubscribeParams{ + URI: uri, + Meta: req.Params.GetMeta(), + }, + }) + if err != nil { + return nil, err + } + defer s.unsubscribe(ctx, &UnsubscribeRequest{ + Session: req.Session, + Params: &UnsubscribeParams{ + URI: uri, + Meta: req.Params.GetMeta(), + }, + }) + } + + ackParams := &SubscriptionsAcknowledgedParams{ + Notifications: allowed, + Meta: Meta{MetaKeySubscriptionID: requestID.Raw()}, + } + if err := req.Session.NotifySubscriptionAcked(ctx, ackParams); err != nil { + return nil, fmt.Errorf("sending subscriptions/acknowledged: %w", err) + } + + <-ctx.Done() + return &emptyResult{}, nil +} + +func (s *Server) allowedSubscriptions(want NotificationSubscriptions) NotificationSubscriptions { + caps := s.capabilities() + agreed := NotificationSubscriptions{} + if want.ToolsListChanged && caps.Tools != nil && caps.Tools.ListChanged { + agreed.ToolsListChanged = true + } + if want.PromptsListChanged && caps.Prompts != nil && caps.Prompts.ListChanged { + agreed.PromptsListChanged = true + } + if want.ResourcesListChanged && caps.Resources != nil && caps.Resources.ListChanged { + agreed.ResourcesListChanged = true + } + if len(want.ResourceSubscriptions) > 0 && caps.Resources != nil && caps.Resources.Subscribe { + agreed.ResourceSubscriptions = slices.Clone(want.ResourceSubscriptions) + } + return agreed +} + // Run runs the server over the given transport, which must be persistent. // // Run blocks until the client terminates the connection or the provided @@ -1096,6 +1247,10 @@ func (s *Server) disconnect(cc *ServerSession) { for _, subscribedSessions := range s.resourceSubscriptions { delete(subscribedSessions, cc) } + delete(s.toolChangeSubscriptions, cc) + delete(s.promptChangeSubscriptions, cc) + delete(s.resourceChangeSubscriptions, cc) + s.opts.Logger.Info("server session disconnected", "session_id", cc.ID()) } @@ -1196,6 +1351,12 @@ func (ss *ServerSession) NotifyProgress(ctx context.Context, params *ProgressNot return handleNotify(ctx, notificationProgress, newServerRequest(ss, orZero[Params](params))) } +// NotifySubscriptionAcked sends a subscription acknowledged notification from the server to the client +// associated with this session. +func (ss *ServerSession) NotifySubscriptionAcked(ctx context.Context, params *SubscriptionsAcknowledgedParams) error { + return handleNotify(ctx, notificationSubscriptionsAck, newServerRequest(ss, orZero[Params](params))) +} + func newServerRequest[P Params](ss *ServerSession, params P) *ServerRequest[P] { return &ServerRequest[P]{Session: ss, Params: params} } @@ -1228,6 +1389,14 @@ type ServerSession struct { state ServerSessionState } +// listenSubscription records the notification types a single +// subscriptions/listen stream has opted in to. +type listenSubscription struct { + toolsListChanged bool + promptsListChanged bool + resourcesListChanged bool +} + func (ss *ServerSession) updateState(mut func(*ServerSessionState)) { ss.mu.Lock() mut(&ss.state) @@ -1497,6 +1666,7 @@ var serverMethodInfos = map[string]methodInfo{ methodReadResource: newServerMethodInfo(serverMethod((*Server).readResource), 0), methodSetLevel: newServerMethodInfo(serverSessionMethod((*ServerSession).setLevel), 0), methodSubscribe: newServerMethodInfo(serverMethod((*Server).subscribe), 0), + methodSubscriptionsListen: newServerMethodInfo(serverMethod((*Server).subscriptionsListen), 0), methodUnsubscribe: newServerMethodInfo(serverMethod((*Server).unsubscribe), 0), notificationCancelled: newServerMethodInfo(serverSessionMethod((*ServerSession).cancel), notification|missingParamsOK), notificationInitialized: newServerMethodInfo(serverSessionMethod((*ServerSession).initialized), notification|missingParamsOK), @@ -1573,7 +1743,7 @@ func (ss *ServerSession) handle(ctx context.Context, req *jsonrpc.Request) (any, } switch req.Method { - case methodInitialize, methodPing, notificationInitialized, methodSetLevel: + case methodInitialize, methodPing, notificationInitialized, methodSetLevel, methodSubscribe, methodUnsubscribe: if validatedMeta.usesNewProtocol { ss.server.opts.Logger.Error("method removed in the new protocol", "method", req.Method) return nil, &jsonrpc.Error{ diff --git a/mcp/streamable.go b/mcp/streamable.go index 8ff9cd1f..a589fd4d 100644 --- a/mcp/streamable.go +++ b/mcp/streamable.go @@ -908,6 +908,12 @@ type stream struct { // the spec and earlier there was a concept of batching, in which POST // payloads could hold multiple requests or responses. requests map[jsonrpc.ID]struct{} + + // isListen reports whether this stream was opened by a + // subscriptions/listen request. Listen streams are always SSE, live for + // the duration of the subscription, and act as the target for + // out-of-band notifications routed through this connection. + isListen bool } // close sends a 'close' event to the client (if protocolVersion >= 2025-11-25 @@ -1365,6 +1371,7 @@ func (c *streamableServerConn) servePOST(w http.ResponseWriter, req *http.Reques calls := make(map[jsonrpc.ID]struct{}) tokenInfo := auth.TokenInfoFromContext(req.Context()) isInitialize := false + isSubscriptionsListen := false var initializeProtocolVersion string for _, msg := range incoming { if jreq, ok := msg.(*jsonrpc.Request); ok { @@ -1390,6 +1397,9 @@ func (c *streamableServerConn) servePOST(w http.ResponseWriter, req *http.Reques initializeProtocolVersion = params.ProtocolVersion } } + if jreq.Method == methodSubscriptionsListen { + isSubscriptionsListen = true + } // SEP-2575: requests carrying `_meta.protocolVersion` require the // Mcp-Protocol-Version HTTP header to be present and to match the // per-request `_meta.protocolVersion` value. @@ -1535,14 +1545,22 @@ func (c *streamableServerConn) servePOST(w http.ResponseWriter, req *http.Reques http.Error(w, fmt.Sprintf("storing stream: %v", err), http.StatusInternalServerError) return } + stream.isListen = isSubscriptionsListen + + // subscriptions/listen is inherently a long-lived SSE endpoint (SEP-2575): + // it has no synchronous result, the response stream stays open until the + // client cancels, and the server pushes notifications on it as they occur. + // Force SSE mode (bypassing JSONResponse) so the buffered application/json + // path doesn't deadlock waiting for a completion that won't come. + useSSE := !c.jsonResponse || isSubscriptionsListen // Set response headers. Accept was checked in [StreamableHTTPHandler]. w.Header().Set("Cache-Control", "no-cache, no-transform") - if c.jsonResponse { - w.Header().Set("Content-Type", "application/json") - } else { + if useSSE { w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Connection", "keep-alive") + } else { + w.Header().Set("Content-Type", "application/json") } if c.sessionID != "" && isInitialize { w.Header().Set(sessionIDHeader, c.sessionID) @@ -1553,7 +1571,7 @@ func (c *streamableServerConn) servePOST(w http.ResponseWriter, req *http.Reques done := make(chan struct{}) stream.done = done stream.protocolVersion = effectiveVersion - if c.jsonResponse { + if !useSSE { // JSON mode: collect messages in pendingJSONMessages until done. // Set pendingJSONMessages to a non-nil value to signal that this is an // application/json stream. @@ -1707,6 +1725,15 @@ func (c *streamableServerConn) Write(ctx context.Context, msg jsonrpc.Message) e s = c.streams[streamID] } } else { + // In stateless mode there will always be only one stream per connection. + for _, stream := range c.streams { + if stream.isListen { + s = stream + break + } + } + } + if s == nil { s = c.streams[""] // standalone SSE stream } if responseTo.IsValid() {