Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 115 additions & 5 deletions mcp/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,28 @@ func (c *Client) Connect(ctx context.Context, t Transport, opts *ClientSessionOp
if hc, ok := cs.mcpConn.(clientConnection); ok {
hc.sessionUpdated(cs.state)
}
subscribeParams := &SubscriptionsListenParams{}
if c.opts.ToolListChangedHandler != nil {
subscribeParams.Notifications.ToolsListChanged = true
}
if c.opts.PromptListChangedHandler != nil {
subscribeParams.Notifications.PromptsListChanged = true
}
if c.opts.ResourceListChangedHandler != nil {
subscribeParams.Notifications.ResourcesListChanged = true
}
if subscribeParams.Notifications.ToolsListChanged ||
subscribeParams.Notifications.PromptsListChanged ||
subscribeParams.Notifications.ResourcesListChanged {
// The listen blocks until the server cancels it. Run it in
// a goroutine so Connect can return; ClientSession.Close
// cancels its context to send notifications/cancelled.
listenCtx, cancelListen := context.WithCancel(context.Background())
cs.listenCancel = cancelListen
go func() {
_ = cs.subscriptionsListen(listenCtx, subscribeParams)
}()
}
return cs, nil
}

Expand Down Expand Up @@ -416,6 +438,7 @@ type ClientSession struct {
conn *jsonrpc2.Connection
client *Client
keepaliveCancel context.CancelFunc
listenCancel context.CancelFunc
mcpConn Connection

// No mutex is (currently) required to guard the session state, because it is
Expand All @@ -432,6 +455,15 @@ type ClientSession struct {
// It is used to look up x-mcp-header annotations when
// constructing Mcp-Param-* headers for tools/call requests.
toolCache map[string]*Tool

// resourceSubsMu guards resourceSubs.
resourceSubsMu sync.Mutex
// resourceSubs maps a subscribed resource URI to the cancel func of the
// goroutine running its dedicated subscriptions/listen stream. Populated
// only under SEP-2575; the legacy protocol routes Subscribe and
// Unsubscribe straight to the resources/subscribe and resources/unsubscribe
// RPCs and leaves this map untouched.
resourceSubs map[string]context.CancelFunc
}

type clientSessionState struct {
Expand Down Expand Up @@ -498,6 +530,10 @@ func (cs *ClientSession) Close() error {
if cs.keepaliveCancel != nil {
cs.keepaliveCancel()
}
if cs.listenCancel != nil {
cs.listenCancel()
}
cs.cancelAllResourceSubscriptions()
err := cs.conn.Close()

if cs.onClose != nil && cs.calledOnClose.CompareAndSwap(false, true) {
Expand Down Expand Up @@ -1079,6 +1115,7 @@ var clientMethodInfos = map[string]methodInfo{
notificationLoggingMessage: newClientMethodInfo(clientMethod((*Client).callLoggingHandler), notification),
notificationProgress: newClientMethodInfo(clientSessionMethod((*ClientSession).callProgressNotificationHandler), notification),
notificationElicitationComplete: newClientMethodInfo(clientMethod((*Client).callElicitationCompleteHandler), notification|missingParamsOK),
notificationSubscriptionsAck: newClientMethodInfo(clientMethod((*Client).callSubscriptionsAckHandler), notification|missingParamsOK),
}

func (cs *ClientSession) sendingMethodInfos() map[string]methodInfo {
Expand Down Expand Up @@ -1223,17 +1260,90 @@ func (cs *ClientSession) Complete(ctx context.Context, params *CompleteParams) (
// Subscribe sends a "resources/subscribe" request to the server, asking for
// notifications when the specified resource changes.
func (cs *ClientSession) Subscribe(ctx context.Context, params *SubscribeParams) error {
_, err := handleSend[*emptyResult](ctx, methodSubscribe, newClientRequest(cs, orZero[Params](params)))
return err
if !cs.usesNewProtocol() {
_, err := handleSend[*emptyResult](ctx, methodSubscribe, newClientRequest(cs, orZero[Params](params)))
return err
}
if params == nil || params.URI == "" {
return fmt.Errorf("Subscribe: missing URI")
}
uri := params.URI

cs.resourceSubsMu.Lock()
if _, exists := cs.resourceSubs[uri]; exists {
cs.resourceSubsMu.Unlock()
return nil
}
listenCtx, cancel := context.WithCancel(context.Background())
if cs.resourceSubs == nil {
cs.resourceSubs = make(map[string]context.CancelFunc)
}
cs.resourceSubs[uri] = cancel
cs.resourceSubsMu.Unlock()

go func() {
_ = cs.subscriptionsListen(listenCtx, &SubscriptionsListenParams{
Notifications: NotificationSubscriptions{
ResourceSubscriptions: []string{uri},
},
})
}()
return nil
}

// Unsubscribe sends a "resources/unsubscribe" request to the server, cancelling
// a previous subscription.
// Unsubscribe cancels a previous [ClientSession.Subscribe] for params.URI.
//
// Under the legacy protocol it sends a "resources/unsubscribe" request.
//
// Under SEP-2575 it cancels the background "subscriptions/listen" stream
// opened by Subscribe for the URI. Unsubscribe is idempotent: calling it for
// a URI that is not currently subscribed is a no-op.
func (cs *ClientSession) Unsubscribe(ctx context.Context, params *UnsubscribeParams) error {
_, err := handleSend[*emptyResult](ctx, methodUnsubscribe, newClientRequest(cs, orZero[Params](params)))
if !cs.usesNewProtocol() {
_, err := handleSend[*emptyResult](ctx, methodUnsubscribe, newClientRequest(cs, orZero[Params](params)))
return err
}
if params == nil || params.URI == "" {
return fmt.Errorf("Unsubscribe: missing URI")
}
cs.resourceSubsMu.Lock()
cancel, ok := cs.resourceSubs[params.URI]
delete(cs.resourceSubs, params.URI)
cs.resourceSubsMu.Unlock()
if ok {
cancel()
}
return nil
}

// cancelAllResourceSubscriptions cancels every active SEP-2575 resource
// subscription opened via Subscribe. The listen goroutines exit
// asynchronously as their contexts unwind. Called from Close.
func (cs *ClientSession) cancelAllResourceSubscriptions() {
cs.resourceSubsMu.Lock()
subs := cs.resourceSubs
cs.resourceSubs = nil
cs.resourceSubsMu.Unlock()
for _, cancel := range subs {
cancel()
}
}

// SubscriptionsListen opens a SEP-2575 "subscriptions/listen" stream and
// blocks for the lifetime of the subscription. The server's first message on
// the stream is "notifications/subscriptions/acknowledged"; subsequent
// opted-in notifications (e.g. tools/list_changed) are delivered through the
// usual handlers registered in [ClientOptions].
func (cs *ClientSession) subscriptionsListen(ctx context.Context, params *SubscriptionsListenParams) error {
params = injectRequestMeta(cs, params)
_, err := handleSend[*emptyResult](ctx, methodSubscriptionsListen, newClientRequest(cs, orZero[Params](params)))
return err
}

func (c *Client) callSubscriptionsAckHandler(context.Context, *ClientRequest[*SubscriptionsAcknowledgedParams]) (Result, error) {
return nil, nil
}

func (c *Client) callToolChangedHandler(ctx context.Context, req *ToolListChangedRequest) (Result, error) {
if h := c.opts.ToolListChangedHandler; h != nil {
h(ctx, req)
Expand Down
Loading