diff --git a/internal/notifier/notifier.go b/internal/notifier/notifier.go index bf12ff29..86cac6e4 100644 --- a/internal/notifier/notifier.go +++ b/internal/notifier/notifier.go @@ -180,8 +180,18 @@ func (n *Notifier) deliverNotifications(ctx context.Context) { }() for _, notifyFunc := range notifyFuncs { - // TODO: panic recovery on delivery attempts - notifyFunc(NotificationTopic(notification.Topic), notification.Payload) + func() { + defer func() { + if panicVal := recover(); panicVal != nil { + n.Logger.ErrorContext(ctx, n.Name+": Notification delivery panicked", + slog.String("panic_val", fmt.Sprintf("%v", panicVal)), + slog.String("topic", notification.Topic), + ) + } + }() + + notifyFunc(NotificationTopic(notification.Topic), notification.Payload) + }() } } } diff --git a/internal/notifier/notifier_test.go b/internal/notifier/notifier_test.go index a6dc16ab..685b529d 100644 --- a/internal/notifier/notifier_test.go +++ b/internal/notifier/notifier_test.go @@ -401,6 +401,53 @@ func TestNotifier(t *testing.T) { requireNoNotification(t, notifyChan2) }) + t.Run("PanicInSubscriberDoesNotBreakDelivery", func(t *testing.T) { + t.Parallel() + + notifier := New(riversharedtest.BaseServiceArchetype(t), nil) + + panicChan := make(chan TopicAndPayload, 10) + notifyChan := make(chan TopicAndPayload, 10) + + notifier.subscriptions[testTopic1] = []*Subscription{ + { + notifyFunc: func(topic NotificationTopic, payload string) { + panicChan <- TopicAndPayload{topic, payload} + panic("panic from notify func") + }, + notifier: notifier, + topic: testTopic1, + }, + { + notifyFunc: topicAndPayloadNotifyFunc(notifyChan), + notifier: notifier, + topic: testTopic1, + }, + } + + runCtx, cancel := context.WithCancel(ctx) + defer cancel() + + done := make(chan struct{}) + go func() { + defer close(done) + notifier.deliverNotifications(runCtx) + }() + + notifier.notificationBuf <- &riverdriver.Notification{Topic: string(testTopic1), Payload: "msg1"} + + require.Equal(t, TopicAndPayload{testTopic1, "msg1"}, riversharedtest.WaitOrTimeout(t, panicChan)) + require.Equal(t, TopicAndPayload{testTopic1, "msg1"}, riversharedtest.WaitOrTimeout(t, notifyChan)) + + notifier.notificationBuf <- &riverdriver.Notification{Topic: string(testTopic1), Payload: "msg2"} + + require.Equal(t, TopicAndPayload{testTopic1, "msg2"}, riversharedtest.WaitOrTimeout(t, panicChan)) + require.Equal(t, TopicAndPayload{testTopic1, "msg2"}, riversharedtest.WaitOrTimeout(t, notifyChan)) + + cancel() + riversharedtest.WaitOrTimeout(t, done) + }) + // Stress test meant to suss out any races that there might be in the // subscribe or interrupt loop code. t.Run("MultipleSubscribersStress", func(t *testing.T) {