@@ -154,18 +154,19 @@ func New(ctx context.Context,
154154
155155// ProcessRequests processes incoming requests for the given peer
156156func (rm * ResponseManager ) ProcessRequests (ctx context.Context , p peer.ID , requests []gsmsg.GraphSyncRequest ) {
157- rm .send (& processRequestsMessage {p , requests }, ctx .Done ())
157+ _ = rm .send (& processRequestsMessage {p , requests }, ctx .Done ())
158158}
159159
160160// UnpauseResponse unpauses a response that was previously paused
161161func (rm * ResponseManager ) UnpauseResponse (ctx context.Context , requestID graphsync.RequestID , extensions ... graphsync.ExtensionData ) error {
162162 response := make (chan error , 1 )
163- rm .send (& unpauseRequestMessage {requestID , response , extensions }, ctx .Done ())
163+ err := rm .send (& unpauseRequestMessage {requestID , response , extensions }, ctx .Done ())
164+ if err != nil {
165+ return err
166+ }
164167 select {
165168 case <- rm .ctx .Done ():
166169 return errors .New ("context cancelled" )
167- case <- ctx .Done ():
168- return errors .New ("context cancelled" )
169170 case err := <- response :
170171 return err
171172 }
@@ -174,12 +175,13 @@ func (rm *ResponseManager) UnpauseResponse(ctx context.Context, requestID graphs
174175// PauseResponse pauses an in progress response (may take 1 or more blocks to process)
175176func (rm * ResponseManager ) PauseResponse (ctx context.Context , requestID graphsync.RequestID ) error {
176177 response := make (chan error , 1 )
177- rm .send (& pauseRequestMessage {requestID , response }, ctx .Done ())
178+ err := rm .send (& pauseRequestMessage {requestID , response }, ctx .Done ())
179+ if err != nil {
180+ return err
181+ }
178182 select {
179183 case <- rm .ctx .Done ():
180184 return errors .New ("context cancelled" )
181- case <- ctx .Done ():
182- return errors .New ("context cancelled" )
183185 case err := <- response :
184186 return err
185187 }
@@ -188,12 +190,13 @@ func (rm *ResponseManager) PauseResponse(ctx context.Context, requestID graphsyn
188190// CancelResponse cancels an in progress response
189191func (rm * ResponseManager ) CancelResponse (ctx context.Context , requestID graphsync.RequestID ) error {
190192 response := make (chan error , 1 )
191- rm .send (& errorRequestMessage {requestID , queryexecutor .ErrCancelledByCommand , response }, ctx .Done ())
193+ err := rm .send (& errorRequestMessage {requestID , queryexecutor .ErrCancelledByCommand , response }, ctx .Done ())
194+ if err != nil {
195+ return err
196+ }
192197 select {
193198 case <- rm .ctx .Done ():
194199 return errors .New ("context cancelled" )
195- case <- ctx .Done ():
196- return errors .New ("context cancelled" )
197200 case err := <- response :
198201 return err
199202 }
@@ -202,12 +205,13 @@ func (rm *ResponseManager) CancelResponse(ctx context.Context, requestID graphsy
202205// UpdateRequest updates an in progress response
203206func (rm * ResponseManager ) UpdateResponse (ctx context.Context , requestID graphsync.RequestID , extensions ... graphsync.ExtensionData ) error {
204207 response := make (chan error , 1 )
205- rm .send (& updateRequestMessage {requestID , extensions , response }, ctx .Done ())
208+ err := rm .send (& updateRequestMessage {requestID , extensions , response }, ctx .Done ())
209+ if err != nil {
210+ return err
211+ }
206212 select {
207213 case <- rm .ctx .Done ():
208214 return errors .New ("context cancelled" )
209- case <- ctx .Done ():
210- return errors .New ("context cancelled" )
211215 case err := <- response :
212216 return err
213217 }
@@ -216,7 +220,7 @@ func (rm *ResponseManager) UpdateResponse(ctx context.Context, requestID graphsy
216220// Synchronize is a utility method that blocks until all current messages are processed
217221func (rm * ResponseManager ) synchronize () {
218222 sync := make (chan error )
219- rm .send (& synchronizeMessage {sync }, nil )
223+ _ = rm .send (& synchronizeMessage {sync }, nil )
220224 select {
221225 case <- rm .ctx .Done ():
222226 case <- sync :
@@ -225,18 +229,18 @@ func (rm *ResponseManager) synchronize() {
225229
226230// StartTask starts the given task from the peer task queue
227231func (rm * ResponseManager ) StartTask (task * peertask.Task , p peer.ID , responseTaskChan chan <- queryexecutor.ResponseTask ) {
228- rm .send (& startTaskRequest {task , p , responseTaskChan }, nil )
232+ _ = rm .send (& startTaskRequest {task , p , responseTaskChan }, nil )
229233}
230234
231235// GetUpdates is called to read pending updates for a task and clear them
232236func (rm * ResponseManager ) GetUpdates (requestID graphsync.RequestID , updatesChan chan <- []gsmsg.GraphSyncRequest ) {
233- rm .send (& responseUpdateRequest {requestID , updatesChan }, nil )
237+ _ = rm .send (& responseUpdateRequest {requestID , updatesChan }, nil )
234238}
235239
236240// FinishTask marks a task from the task queue as done
237241func (rm * ResponseManager ) FinishTask (task * peertask.Task , p peer.ID , err error ) {
238242 done := make (chan struct {}, 1 )
239- rm .send (& finishTaskRequest {task , p , err , done }, nil )
243+ _ = rm .send (& finishTaskRequest {task , p , err , done }, nil )
240244 select {
241245 case <- rm .ctx .Done ():
242246 case <- done :
@@ -246,7 +250,7 @@ func (rm *ResponseManager) FinishTask(task *peertask.Task, p peer.ID, err error)
246250// CloseWithNetworkError closes a request due to a network error
247251func (rm * ResponseManager ) CloseWithNetworkError (requestID graphsync.RequestID ) {
248252 done := make (chan error , 1 )
249- rm .send (& errorRequestMessage {requestID , queryexecutor .ErrNetworkError , done }, nil )
253+ _ = rm .send (& errorRequestMessage {requestID , queryexecutor .ErrNetworkError , done }, nil )
250254 select {
251255 case <- rm .ctx .Done ():
252256 case <- done :
@@ -256,7 +260,7 @@ func (rm *ResponseManager) CloseWithNetworkError(requestID graphsync.RequestID)
256260// TerminateRequest indicates a request has finished sending data and should no longer be tracked
257261func (rm * ResponseManager ) TerminateRequest (requestID graphsync.RequestID ) {
258262 done := make (chan struct {}, 1 )
259- rm .send (& terminateRequestMessage {requestID , done }, nil )
263+ _ = rm .send (& terminateRequestMessage {requestID , done }, nil )
260264 select {
261265 case <- rm .ctx .Done ():
262266 case <- done :
@@ -266,7 +270,7 @@ func (rm *ResponseManager) TerminateRequest(requestID graphsync.RequestID) {
266270// PeerState gets current state of the outgoing responses for a given peer
267271func (rm * ResponseManager ) PeerState (p peer.ID ) peerstate.PeerState {
268272 response := make (chan peerstate.PeerState )
269- rm .send (& peerStateMessage {p , response }, nil )
273+ _ = rm .send (& peerStateMessage {p , response }, nil )
270274 select {
271275 case <- rm .ctx .Done ():
272276 return peerstate.PeerState {}
@@ -275,11 +279,20 @@ func (rm *ResponseManager) PeerState(p peer.ID) peerstate.PeerState {
275279 }
276280}
277281
278- func (rm * ResponseManager ) send (message responseManagerMessage , done <- chan struct {}) {
282+ func (rm * ResponseManager ) send (message responseManagerMessage , done <- chan struct {}) error {
283+ // prioritize cancelled context
284+ select {
285+ case <- done :
286+ return errors .New ("unable to send message before cancellation" )
287+ default :
288+ }
279289 select {
280290 case <- rm .ctx .Done ():
291+ return rm .ctx .Err ()
281292 case <- done :
293+ return errors .New ("unable to send message before cancellation" )
282294 case rm .messages <- message :
295+ return nil
283296 }
284297}
285298
0 commit comments