From 852a13093c93e25040ffff935ae11bde98cdc7fb Mon Sep 17 00:00:00 2001 From: "Peter Chen J." Date: Wed, 3 Jun 2026 19:16:33 +0800 Subject: [PATCH 1/2] Recover from panicking notifier subscribers --- internal/notifier/notifier.go | 14 +++++++-- internal/notifier/notifier_test.go | 47 ++++++++++++++++++++++++++++++ 2 files changed, 59 insertions(+), 2 deletions(-) diff --git a/internal/notifier/notifier.go b/internal/notifier/notifier.go index bf12ff29..9bf0e594 100644 --- a/internal/notifier/notifier.go +++ b/internal/notifier/notifier.go @@ -180,8 +180,18 @@ func (n *Notifier) deliverNotifications(ctx context.Context) { }() for _, notifyFunc := range notifyFuncs { - // TODO: panic recovery on delivery attempts - notifyFunc(NotificationTopic(notification.Topic), notification.Payload) + func() { + defer func() { + if panicVal := recover(); panicVal != nil { + n.Logger.ErrorContext(ctx, n.Name+": Notification delivery panicked", + slog.String("panic_val", fmt.Sprintf("%v", panicVal)), + slog.String("topic", string(notification.Topic)), + ) + } + }() + + notifyFunc(NotificationTopic(notification.Topic), notification.Payload) + }() } } } diff --git a/internal/notifier/notifier_test.go b/internal/notifier/notifier_test.go index a6dc16ab..685b529d 100644 --- a/internal/notifier/notifier_test.go +++ b/internal/notifier/notifier_test.go @@ -401,6 +401,53 @@ func TestNotifier(t *testing.T) { requireNoNotification(t, notifyChan2) }) + t.Run("PanicInSubscriberDoesNotBreakDelivery", func(t *testing.T) { + t.Parallel() + + notifier := New(riversharedtest.BaseServiceArchetype(t), nil) + + panicChan := make(chan TopicAndPayload, 10) + notifyChan := make(chan TopicAndPayload, 10) + + notifier.subscriptions[testTopic1] = []*Subscription{ + { + notifyFunc: func(topic NotificationTopic, payload string) { + panicChan <- TopicAndPayload{topic, payload} + panic("panic from notify func") + }, + notifier: notifier, + topic: testTopic1, + }, + { + notifyFunc: topicAndPayloadNotifyFunc(notifyChan), + notifier: notifier, + topic: testTopic1, + }, + } + + runCtx, cancel := context.WithCancel(ctx) + defer cancel() + + done := make(chan struct{}) + go func() { + defer close(done) + notifier.deliverNotifications(runCtx) + }() + + notifier.notificationBuf <- &riverdriver.Notification{Topic: string(testTopic1), Payload: "msg1"} + + require.Equal(t, TopicAndPayload{testTopic1, "msg1"}, riversharedtest.WaitOrTimeout(t, panicChan)) + require.Equal(t, TopicAndPayload{testTopic1, "msg1"}, riversharedtest.WaitOrTimeout(t, notifyChan)) + + notifier.notificationBuf <- &riverdriver.Notification{Topic: string(testTopic1), Payload: "msg2"} + + require.Equal(t, TopicAndPayload{testTopic1, "msg2"}, riversharedtest.WaitOrTimeout(t, panicChan)) + require.Equal(t, TopicAndPayload{testTopic1, "msg2"}, riversharedtest.WaitOrTimeout(t, notifyChan)) + + cancel() + riversharedtest.WaitOrTimeout(t, done) + }) + // Stress test meant to suss out any races that there might be in the // subscribe or interrupt loop code. t.Run("MultipleSubscribersStress", func(t *testing.T) { From cf8c2f6abf5ad681b73a1e226fb6dfe0dc82c1f9 Mon Sep 17 00:00:00 2001 From: "Peter Chen J." Date: Wed, 3 Jun 2026 19:33:46 +0800 Subject: [PATCH 2/2] Fix notifier lint unconvert --- internal/notifier/notifier.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/notifier/notifier.go b/internal/notifier/notifier.go index 9bf0e594..86cac6e4 100644 --- a/internal/notifier/notifier.go +++ b/internal/notifier/notifier.go @@ -185,7 +185,7 @@ func (n *Notifier) deliverNotifications(ctx context.Context) { if panicVal := recover(); panicVal != nil { n.Logger.ErrorContext(ctx, n.Name+": Notification delivery panicked", slog.String("panic_val", fmt.Sprintf("%v", panicVal)), - slog.String("topic", string(notification.Topic)), + slog.String("topic", notification.Topic), ) } }()