From 866eef614584e836b160cdb8fd38121c6eae3172 Mon Sep 17 00:00:00 2001 From: Sonu Kumar Date: Wed, 13 May 2026 19:16:08 +0530 Subject: [PATCH 1/5] Add global retry limit Assisted-By: Codex --- docs/configuration/retry-and-backoff.md | 13 ++- .../sonus21/rqueue/config/RqueueConfig.java | 3 + .../RqueueMessageListenerContainer.java | 4 +- ...itional-spring-configuration-metadata.json | 7 ++ .../rqueue/config/RqueueConfigTest.java | 5 + .../RqueueMessageListenerContainerTest.java | 81 ++++++++++++++ .../nats/js/JetStreamMessageBroker.java | 7 +- .../js/JetStreamMessageBrokerResolveTest.java | 5 +- .../NatsGlobalRetryLimitE2EIT.java | 101 ++++++++++++++++++ .../RedisGlobalRetryLimitE2EIT.java | 80 ++++++++++++++ 10 files changed, 296 insertions(+), 10 deletions(-) create mode 100644 rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsGlobalRetryLimitE2EIT.java create mode 100644 rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/integration/RedisGlobalRetryLimitE2EIT.java diff --git a/docs/configuration/retry-and-backoff.md b/docs/configuration/retry-and-backoff.md index 3da5a83e..f5f4a593 100644 --- a/docs/configuration/retry-and-backoff.md +++ b/docs/configuration/retry-and-backoff.md @@ -44,6 +44,17 @@ Note: Messages handled this way are neither retried nor moved to the Dead Letter When a message handler fails, the message can be retried immediately, delayed for a future retry, moved to a dead letter queue, or dropped. +### Global Retry Limit + +Set `rqueue.retry.max=N` to limit listeners that do not configure `numRetries` and +would otherwise retry forever. The default is `-1`, which leaves the retry-forever +default unchanged. Explicit `@RqueueListener(numRetries = "...")` values and the +dead-letter queue default retry count continue to take precedence. + +For the NATS backend, this retry count is translated to JetStream `maxDeliver` as +`N + 1`, because JetStream counts the initial delivery plus retries. For example, +`rqueue.retry.max=3` creates consumers with at most four total deliveries. + ### Immediate Retries To retry a message immediately within the same polling cycle, set `rqueue.retry.per.poll` to a positive integer (e.g., `2`). This will cause the @@ -75,5 +86,3 @@ public class RqueueConfiguration { } ``` - - diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/config/RqueueConfig.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/config/RqueueConfig.java index 9007c3dd..9131421c 100644 --- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/config/RqueueConfig.java +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/config/RqueueConfig.java @@ -134,6 +134,9 @@ private static String generateBrokerId() { @Value("${rqueue.retry.per.poll:1}") private int retryPerPoll; + @Value("${rqueue.retry.max:-1}") + private int maxRetry = -1; + @Value("${rqueue.net.proxy.host:}") private String proxyHost; diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainer.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainer.java index b680fa6e..190c7be4 100644 --- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainer.java +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainer.java @@ -471,20 +471,20 @@ private AsyncTaskExecutor createTaskExecutor( private List getQueueDetail(String queue, MappingInformation mappingInformation) { int numRetry = mappingInformation.getNumRetry(); + RqueueConfig rqueueConfig = rqueueBeanProvider.getRqueueConfig(); if (!StringUtils.isEmpty(mappingInformation.getDeadLetterQueueName()) && numRetry == -1) { log.warn( "Dead letter queue {} is set but retry is not set", mappingInformation.getDeadLetterQueueName()); numRetry = Constants.DEFAULT_RETRY_DEAD_LETTER_QUEUE; } else if (numRetry == -1) { - numRetry = Integer.MAX_VALUE; + numRetry = rqueueConfig.getMaxRetry() >= 0 ? rqueueConfig.getMaxRetry() : Integer.MAX_VALUE; } String priorityGroup = mappingInformation.getPriorityGroup(); Map priority = mappingInformation.getPriority(); if (StringUtils.isEmpty(priorityGroup) && priority.size() == 1) { priorityGroup = Constants.DEFAULT_PRIORITY_GROUP; } - RqueueConfig rqueueConfig = rqueueBeanProvider.getRqueueConfig(); QueueDetail queueDetail = QueueDetail.builder() .name(queue) .queueName(rqueueConfig.getQueueName(queue)) diff --git a/rqueue-core/src/main/resources/META-INF/additional-spring-configuration-metadata.json b/rqueue-core/src/main/resources/META-INF/additional-spring-configuration-metadata.json index 3ea19eae..cc5e7265 100644 --- a/rqueue-core/src/main/resources/META-INF/additional-spring-configuration-metadata.json +++ b/rqueue-core/src/main/resources/META-INF/additional-spring-configuration-metadata.json @@ -1,6 +1,13 @@ { "groups": [], "properties": [ + { + "name": "rqueue.retry.max", + "type": "java.lang.Integer", + "description": "Global retry limit used when a listener does not configure numRetries and would otherwise retry forever. -1 preserves the retry-forever default. Explicit listener retry counts and DLQ defaults take precedence.", + "defaultValue": -1, + "sourceType": "com.github.sonus21.rqueue.config.RqueueConfig" + }, { "name": "rqueue.nats.auto-create-streams", "type": "java.lang.Boolean", diff --git a/rqueue-core/src/test/java/com/github/sonus21/rqueue/config/RqueueConfigTest.java b/rqueue-core/src/test/java/com/github/sonus21/rqueue/config/RqueueConfigTest.java index a2437e8e..860bb563 100644 --- a/rqueue-core/src/test/java/com/github/sonus21/rqueue/config/RqueueConfigTest.java +++ b/rqueue-core/src/test/java/com/github/sonus21/rqueue/config/RqueueConfigTest.java @@ -154,4 +154,9 @@ void workerRegistryProperties() { assertEquals( Duration.ofSeconds(15), rqueueConfigVersion2.getWorkerRegistryQueueHeartbeatInterval()); } + + @Test + void defaultMaxRetryIsDisabled() { + assertEquals(-1, rqueueConfigVersion2.getMaxRetry()); + } } diff --git a/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainerTest.java b/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainerTest.java index 6053ee9d..aa539f2f 100644 --- a/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainerTest.java +++ b/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainerTest.java @@ -33,9 +33,11 @@ import com.github.sonus21.rqueue.common.RqueueLockManager; import com.github.sonus21.rqueue.config.RqueueConfig; import com.github.sonus21.rqueue.config.RqueueWebConfig; +import com.github.sonus21.rqueue.core.EndpointRegistry; import com.github.sonus21.rqueue.core.RqueueBeanProvider; import com.github.sonus21.rqueue.core.RqueueMessage; import com.github.sonus21.rqueue.core.RqueueMessageTemplate; +import com.github.sonus21.rqueue.models.Concurrency; import com.github.sonus21.rqueue.dao.RqueueSystemConfigDao; import com.github.sonus21.rqueue.models.db.MessageMetadata; import com.github.sonus21.rqueue.models.db.QueueConfig; @@ -67,6 +69,7 @@ import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.util.LinkedMultiValueMap; +import org.springframework.util.MultiValueMap; @CoreUnitTest class RqueueMessageListenerContainerTest extends TestBase { @@ -172,6 +175,62 @@ void setTaskExecutor() { ((ThreadPoolTaskExecutor) container.getTaskExecutor()).getThreadNamePrefix()); } + @Test + void globalMaxRetryCapsDefaultRetryForever() throws Exception { + beanProvider.getRqueueConfig().setMaxRetry(5); + doReturn(handlerMap(mapping(slowQueue, -1, null))).when(rqueueMessageHandler) + .getHandlerMethodMap(); + + container.afterPropertiesSet(); + + assertEquals(5, EndpointRegistry.get(slowQueue).getNumRetry()); + } + + @Test + void globalMaxRetryDoesNotOverrideExplicitRetryCount() throws Exception { + beanProvider.getRqueueConfig().setMaxRetry(2); + doReturn(handlerMap(mapping(slowQueue, 10, null))).when(rqueueMessageHandler) + .getHandlerMethodMap(); + + container.afterPropertiesSet(); + + assertEquals(10, EndpointRegistry.get(slowQueue).getNumRetry()); + } + + @Test + void globalMaxRetryDoesNotRaiseLowerExplicitRetryCount() throws Exception { + beanProvider.getRqueueConfig().setMaxRetry(5); + doReturn(handlerMap(mapping(slowQueue, 2, null))).when(rqueueMessageHandler) + .getHandlerMethodMap(); + + container.afterPropertiesSet(); + + assertEquals(2, EndpointRegistry.get(slowQueue).getNumRetry()); + } + + @Test + void globalMaxRetryCanDisableRetries() throws Exception { + beanProvider.getRqueueConfig().setMaxRetry(0); + doReturn(handlerMap(mapping(slowQueue, -1, null))).when(rqueueMessageHandler) + .getHandlerMethodMap(); + + container.afterPropertiesSet(); + + assertEquals(0, EndpointRegistry.get(slowQueue).getNumRetry()); + } + + @Test + void globalMaxRetryDoesNotOverrideDeadLetterDefaultRetryCount() throws Exception { + beanProvider.getRqueueConfig().setMaxRetry(0); + doReturn(handlerMap(mapping(slowQueue, -1, slowQueue + "-dlq"))).when(rqueueMessageHandler) + .getHandlerMethodMap(); + + container.afterPropertiesSet(); + + assertEquals(Constants.DEFAULT_RETRY_DEAD_LETTER_QUEUE, + EndpointRegistry.get(slowQueue).getNumRetry()); + } + @Test void phaseSetting() { assertEquals(Integer.MAX_VALUE, container.getPhase()); @@ -534,6 +593,28 @@ private class TestListenerContainer extends RqueueMessageListenerContainer { } } + private MultiValueMap + handlerMap(MappingInformation mappingInformation) { + LinkedMultiValueMap map = + new LinkedMultiValueMap<>(); + map.add(mappingInformation, new RqueueMessageHandler.HandlerMethodWithPrimary(null, false)); + return map; + } + + private MappingInformation mapping(String queue, int numRetry, String deadLetterQueueName) { + return MappingInformation.builder() + .queueNames(Collections.singleton(queue)) + .numRetry(numRetry) + .deadLetterQueueName(deadLetterQueueName) + .deadLetterConsumerEnabled(false) + .visibilityTimeout(VISIBILITY_TIMEOUT) + .active(false) + .concurrency(new Concurrency(0, 0)) + .priority(Collections.singletonMap(Constants.DEFAULT_PRIORITY_KEY, 1)) + .batchSize(1) + .build(); + } + @Getter private class StubMessageSchedulerListenerContainer extends RqueueMessageListenerContainer { diff --git a/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/js/JetStreamMessageBroker.java b/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/js/JetStreamMessageBroker.java index 38dafea1..688841f1 100644 --- a/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/js/JetStreamMessageBroker.java +++ b/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/js/JetStreamMessageBroker.java @@ -510,15 +510,16 @@ public static Duration resolveAckWait(QueueDetail q, RqueueNatsConfig config) { /** * Resolve the JetStream {@code maxDeliver} from per-queue {@link QueueDetail#getNumRetry()} * (counted as initial delivery + N retries = numRetry + 1). The {@link Integer#MAX_VALUE} - * "retry forever" sentinel maps to JetStream's unlimited value ({@code -1}); non-positive - * numRetry falls back to {@code RqueueNatsConfig.ConsumerDefaults.getMaxDeliver()}. + * "retry forever" sentinel maps to JetStream's unlimited value ({@code -1}); zero means + * one total delivery and no retries. Negative numRetry falls back to + * {@code RqueueNatsConfig.ConsumerDefaults.getMaxDeliver()}. */ public static long resolveMaxDeliver(QueueDetail q, RqueueNatsConfig config) { int numRetry = q.getNumRetry(); if (numRetry == Integer.MAX_VALUE) { return -1L; } - if (numRetry > 0) { + if (numRetry >= 0) { return numRetry + 1L; } return config.getConsumerDefaults().getMaxDeliver(); diff --git a/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/js/JetStreamMessageBrokerResolveTest.java b/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/js/JetStreamMessageBrokerResolveTest.java index 5f92b69e..9255be5d 100644 --- a/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/js/JetStreamMessageBrokerResolveTest.java +++ b/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/js/JetStreamMessageBrokerResolveTest.java @@ -71,10 +71,9 @@ void resolveMaxDeliver_isNumRetryPlusOne() { } @Test - void resolveMaxDeliver_fallsBackToConfigDefaultWhenZero() { + void resolveMaxDeliver_zeroRetryMeansOneDelivery() { QueueDetail q = queue(30_000L, 0); - // RqueueNatsConfig.defaults().consumerDefaults.maxDeliver = 3 - assertEquals(3L, JetStreamMessageBroker.resolveMaxDeliver(q, RqueueNatsConfig.defaults())); + assertEquals(1L, JetStreamMessageBroker.resolveMaxDeliver(q, RqueueNatsConfig.defaults())); } @Test diff --git a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsGlobalRetryLimitE2EIT.java b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsGlobalRetryLimitE2EIT.java new file mode 100644 index 00000000..153cd855 --- /dev/null +++ b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsGlobalRetryLimitE2EIT.java @@ -0,0 +1,101 @@ +/* + * Copyright (c) 2026 Sonu Kumar + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the License. + * + */ +package com.github.sonus21.rqueue.spring.boot.integration; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.github.sonus21.rqueue.annotation.RqueueListener; +import com.github.sonus21.rqueue.core.RqueueMessageEnqueuer; +import io.nats.client.JetStreamManagement; +import java.time.Duration; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.data.redis.autoconfigure.DataRedisAutoConfiguration; +import org.springframework.boot.data.redis.autoconfigure.DataRedisReactiveAutoConfiguration; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.context.annotation.Import; +import org.springframework.stereotype.Component; + +@SpringBootTest( + classes = NatsGlobalRetryLimitE2EIT.TestApp.class, + properties = { + "rqueue.backend=nats", + "rqueue.retry.max=2", + "rqueue.retry.per.poll=1000", + "rqueue.nats.naming.stream-prefix=" + NatsGlobalRetryLimitE2EIT.STREAM_PREFIX, + "rqueue.nats.naming.subject-prefix=" + NatsGlobalRetryLimitE2EIT.SUBJECT_PREFIX + }) +@Tag("nats") +class NatsGlobalRetryLimitE2EIT extends AbstractNatsBootIT { + + static final String QUEUE = "global-retry-nats"; + static final String STREAM_PREFIX = "rqueue-js-globalRetryE2E-"; + static final String SUBJECT_PREFIX = "rqueue.js.globalRetryE2E."; + + @BeforeAll + static void wipeOwnedStreams() { + deleteStreamsWithPrefix(STREAM_PREFIX); + } + + @Autowired + RqueueMessageEnqueuer enqueuer; + + @Autowired + JetStreamManagement jsm; + + @Autowired + FailingListener listener; + + @Test + void globalRetryLimitCapsImplicitRetryForever() throws Exception { + enqueuer.enqueue(QUEUE, "payload"); + + assertThat(listener.twoAttempts.await(20, TimeUnit.SECONDS)).isTrue(); + Awaitility.await().during(Duration.ofSeconds(2)).atMost(Duration.ofSeconds(3)).untilAsserted( + () -> assertThat(listener.attempts).hasValue(2)); + + assertThat(jsm + .getConsumerInfo(STREAM_PREFIX + QUEUE, QUEUE + "-consumer") + .getConsumerConfiguration() + .getMaxDeliver()) + .isEqualTo(3L); + } + + @SpringBootApplication( + exclude = {DataRedisAutoConfiguration.class, DataRedisReactiveAutoConfiguration.class}) + @Import(FailingListener.class) + static class TestApp {} + + @Component + static class FailingListener { + final AtomicInteger attempts = new AtomicInteger(); + final CountDownLatch twoAttempts = new CountDownLatch(2); + + @RqueueListener(value = QUEUE) + void onMessage(String ignored) { + attempts.incrementAndGet(); + twoAttempts.countDown(); + throw new IllegalStateException("force retry"); + } + } +} diff --git a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/integration/RedisGlobalRetryLimitE2EIT.java b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/integration/RedisGlobalRetryLimitE2EIT.java new file mode 100644 index 00000000..8e4699c8 --- /dev/null +++ b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/integration/RedisGlobalRetryLimitE2EIT.java @@ -0,0 +1,80 @@ +/* + * Copyright (c) 2026 Sonu Kumar + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the License. + * + */ +package com.github.sonus21.rqueue.spring.boot.tests.integration; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.github.sonus21.rqueue.annotation.RqueueListener; +import com.github.sonus21.rqueue.core.RqueueMessageEnqueuer; +import com.github.sonus21.rqueue.spring.boot.tests.SpringBootIntegrationTest; +import com.github.sonus21.rqueue.test.application.BaseApplication; +import java.time.Duration; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.context.annotation.Import; +import org.springframework.test.context.TestPropertySource; + +@SpringBootTest(classes = RedisGlobalRetryLimitE2EIT.TestApp.class) +@TestPropertySource( + properties = { + "rqueue.retry.max=2", + "rqueue.retry.per.poll=1000", + "spring.data.redis.port=8027", + "mysql.db.name=RedisGlobalRetryLimitE2EIT", + "use.system.redis=false" + }) +@SpringBootIntegrationTest +class RedisGlobalRetryLimitE2EIT { + + static final String QUEUE = "global-retry-redis"; + + @Autowired + RqueueMessageEnqueuer enqueuer; + + @Autowired + FailingListener listener; + + @Test + void globalRetryLimitCapsImplicitRetryForever() throws Exception { + enqueuer.enqueue(QUEUE, "payload"); + + assertThat(listener.twoAttempts.await(20, TimeUnit.SECONDS)).isTrue(); + Awaitility.await().during(Duration.ofSeconds(2)).atMost(Duration.ofSeconds(3)).untilAsserted( + () -> assertThat(listener.attempts).hasValue(2)); + } + + @SpringBootApplication + @Import(FailingListener.class) + static class TestApp extends BaseApplication {} + + static class FailingListener { + final AtomicInteger attempts = new AtomicInteger(); + final CountDownLatch twoAttempts = new CountDownLatch(2); + + @RqueueListener(value = QUEUE) + void onMessage(String ignored) { + attempts.incrementAndGet(); + twoAttempts.countDown(); + throw new IllegalStateException("force retry"); + } + } +} From 71dec8da88067de2f653e050e30087cbfc83b266 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Wed, 13 May 2026 13:47:56 +0000 Subject: [PATCH 2/5] Apply Palantir Java Format --- .../RqueueMessageListenerContainerTest.java | 20 ++++++++++++------- .../NatsGlobalRetryLimitE2EIT.java | 9 +++++---- .../RedisGlobalRetryLimitE2EIT.java | 6 ++++-- 3 files changed, 22 insertions(+), 13 deletions(-) diff --git a/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainerTest.java b/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainerTest.java index aa539f2f..0a1f6a55 100644 --- a/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainerTest.java +++ b/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainerTest.java @@ -37,8 +37,8 @@ import com.github.sonus21.rqueue.core.RqueueBeanProvider; import com.github.sonus21.rqueue.core.RqueueMessage; import com.github.sonus21.rqueue.core.RqueueMessageTemplate; -import com.github.sonus21.rqueue.models.Concurrency; import com.github.sonus21.rqueue.dao.RqueueSystemConfigDao; +import com.github.sonus21.rqueue.models.Concurrency; import com.github.sonus21.rqueue.models.db.MessageMetadata; import com.github.sonus21.rqueue.models.db.QueueConfig; import com.github.sonus21.rqueue.models.enums.MessageStatus; @@ -178,7 +178,8 @@ void setTaskExecutor() { @Test void globalMaxRetryCapsDefaultRetryForever() throws Exception { beanProvider.getRqueueConfig().setMaxRetry(5); - doReturn(handlerMap(mapping(slowQueue, -1, null))).when(rqueueMessageHandler) + doReturn(handlerMap(mapping(slowQueue, -1, null))) + .when(rqueueMessageHandler) .getHandlerMethodMap(); container.afterPropertiesSet(); @@ -189,7 +190,8 @@ void globalMaxRetryCapsDefaultRetryForever() throws Exception { @Test void globalMaxRetryDoesNotOverrideExplicitRetryCount() throws Exception { beanProvider.getRqueueConfig().setMaxRetry(2); - doReturn(handlerMap(mapping(slowQueue, 10, null))).when(rqueueMessageHandler) + doReturn(handlerMap(mapping(slowQueue, 10, null))) + .when(rqueueMessageHandler) .getHandlerMethodMap(); container.afterPropertiesSet(); @@ -200,7 +202,8 @@ void globalMaxRetryDoesNotOverrideExplicitRetryCount() throws Exception { @Test void globalMaxRetryDoesNotRaiseLowerExplicitRetryCount() throws Exception { beanProvider.getRqueueConfig().setMaxRetry(5); - doReturn(handlerMap(mapping(slowQueue, 2, null))).when(rqueueMessageHandler) + doReturn(handlerMap(mapping(slowQueue, 2, null))) + .when(rqueueMessageHandler) .getHandlerMethodMap(); container.afterPropertiesSet(); @@ -211,7 +214,8 @@ void globalMaxRetryDoesNotRaiseLowerExplicitRetryCount() throws Exception { @Test void globalMaxRetryCanDisableRetries() throws Exception { beanProvider.getRqueueConfig().setMaxRetry(0); - doReturn(handlerMap(mapping(slowQueue, -1, null))).when(rqueueMessageHandler) + doReturn(handlerMap(mapping(slowQueue, -1, null))) + .when(rqueueMessageHandler) .getHandlerMethodMap(); container.afterPropertiesSet(); @@ -222,12 +226,14 @@ void globalMaxRetryCanDisableRetries() throws Exception { @Test void globalMaxRetryDoesNotOverrideDeadLetterDefaultRetryCount() throws Exception { beanProvider.getRqueueConfig().setMaxRetry(0); - doReturn(handlerMap(mapping(slowQueue, -1, slowQueue + "-dlq"))).when(rqueueMessageHandler) + doReturn(handlerMap(mapping(slowQueue, -1, slowQueue + "-dlq"))) + .when(rqueueMessageHandler) .getHandlerMethodMap(); container.afterPropertiesSet(); - assertEquals(Constants.DEFAULT_RETRY_DEAD_LETTER_QUEUE, + assertEquals( + Constants.DEFAULT_RETRY_DEAD_LETTER_QUEUE, EndpointRegistry.get(slowQueue).getNumRetry()); } diff --git a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsGlobalRetryLimitE2EIT.java b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsGlobalRetryLimitE2EIT.java index 153cd855..918cfa61 100644 --- a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsGlobalRetryLimitE2EIT.java +++ b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsGlobalRetryLimitE2EIT.java @@ -71,11 +71,12 @@ void globalRetryLimitCapsImplicitRetryForever() throws Exception { enqueuer.enqueue(QUEUE, "payload"); assertThat(listener.twoAttempts.await(20, TimeUnit.SECONDS)).isTrue(); - Awaitility.await().during(Duration.ofSeconds(2)).atMost(Duration.ofSeconds(3)).untilAsserted( - () -> assertThat(listener.attempts).hasValue(2)); + Awaitility.await() + .during(Duration.ofSeconds(2)) + .atMost(Duration.ofSeconds(3)) + .untilAsserted(() -> assertThat(listener.attempts).hasValue(2)); - assertThat(jsm - .getConsumerInfo(STREAM_PREFIX + QUEUE, QUEUE + "-consumer") + assertThat(jsm.getConsumerInfo(STREAM_PREFIX + QUEUE, QUEUE + "-consumer") .getConsumerConfiguration() .getMaxDeliver()) .isEqualTo(3L); diff --git a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/integration/RedisGlobalRetryLimitE2EIT.java b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/integration/RedisGlobalRetryLimitE2EIT.java index 8e4699c8..d0df932e 100644 --- a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/integration/RedisGlobalRetryLimitE2EIT.java +++ b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/integration/RedisGlobalRetryLimitE2EIT.java @@ -58,8 +58,10 @@ void globalRetryLimitCapsImplicitRetryForever() throws Exception { enqueuer.enqueue(QUEUE, "payload"); assertThat(listener.twoAttempts.await(20, TimeUnit.SECONDS)).isTrue(); - Awaitility.await().during(Duration.ofSeconds(2)).atMost(Duration.ofSeconds(3)).untilAsserted( - () -> assertThat(listener.attempts).hasValue(2)); + Awaitility.await() + .during(Duration.ofSeconds(2)) + .atMost(Duration.ofSeconds(3)) + .untilAsserted(() -> assertThat(listener.attempts).hasValue(2)); } @SpringBootApplication From a03e91648547d358dd01d54b4bc85659c3c41251 Mon Sep 17 00:00:00 2001 From: Sonu Kumar Date: Wed, 13 May 2026 19:34:51 +0530 Subject: [PATCH 3/5] Bump version to RC9 Assisted-By: Codex --- build.gradle | 2 +- docs/CHANGELOG.md | 12 ++++++++++++ 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/build.gradle b/build.gradle index 31402130..87093c40 100644 --- a/build.gradle +++ b/build.gradle @@ -84,7 +84,7 @@ ext { subprojects { group = "com.github.sonus21" - version = "4.0.0-RC8" + version = "4.0.0-RC9" dependencies { // https://mvnrepository.com/artifact/org.springframework/spring-messaging diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 656cb36a..ff7dfdc3 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -18,6 +18,18 @@ foundational Spring Boot 4 and Jackson 3 migration notes; RC3 for the Java 17 baseline change; RC4–RC6 below for the NATS backend, broker SPI, dashboard work, and middleware additions that build on top. +## Release [4.0.0.RC9] 2026-05-13 + +{: .highlight} +Release candidate. + +### Features +* **Global retry limit** — added `rqueue.retry.max` to cap the implicit + retry-forever default for listeners that do not configure `numRetries`. + Explicit per-listener retry counts and the existing DLQ retry default continue + to take precedence. On NATS, the effective retry count maps to JetStream + `maxDeliver` as `retries + 1`. + ## Release [4.0.0.RC8] 2026-05-09 {: .highlight} From 7e46ba9433277cfc2df9c84518a012aae2f43067 Mon Sep 17 00:00:00 2001 From: Sonu Kumar Date: Sun, 17 May 2026 15:00:03 +0530 Subject: [PATCH 4/5] Remove Redis key tags from NATS KV keys Assisted-By: Codex --- .../rqueue/nats/dao/NatsRqueueJobDao.java | 11 ++---- .../rqueue/nats/dao/NatsRqueueQStatsDao.java | 9 ++--- .../nats/dao/NatsRqueueSystemConfigDao.java | 9 ++--- .../sonus21/rqueue/nats/kv/NatsKvKeys.java | 29 +++++++++++++++ .../nats/lock/NatsRqueueLockManager.java | 13 ++----- .../NatsRqueueMessageMetadataService.java | 15 +++----- .../nats/worker/NatsWorkerRegistryStore.java | 16 +++----- .../rqueue/nats/kv/NatsKvKeysTest.java | 37 +++++++++++++++++++ .../worker/NatsWorkerRegistryStoreTest.java | 9 +++++ 9 files changed, 100 insertions(+), 48 deletions(-) create mode 100644 rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/kv/NatsKvKeys.java create mode 100644 rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/kv/NatsKvKeysTest.java diff --git a/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/dao/NatsRqueueJobDao.java b/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/dao/NatsRqueueJobDao.java index c5c62eb3..804937ec 100644 --- a/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/dao/NatsRqueueJobDao.java +++ b/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/dao/NatsRqueueJobDao.java @@ -15,6 +15,7 @@ import com.github.sonus21.rqueue.models.db.RqueueJob; import com.github.sonus21.rqueue.nats.internal.NatsProvisioner; import com.github.sonus21.rqueue.nats.kv.NatsKvBuckets; +import com.github.sonus21.rqueue.nats.kv.NatsKvKeys; import io.nats.client.JetStreamApiException; import io.nats.client.KeyValue; import io.nats.client.api.KeyValueEntry; @@ -69,7 +70,7 @@ public void createJob(RqueueJob rqueueJob, Duration expiry) { @Override public void save(RqueueJob rqueueJob, Duration expiry) { try { - kv(expiry).put(sanitize(rqueueJob.getId()), serialize(rqueueJob)); + kv(expiry).put(NatsKvKeys.sanitize(rqueueJob.getId()), serialize(rqueueJob)); } catch (IOException | JetStreamApiException e) { log.log(Level.WARNING, "save job " + rqueueJob.getId() + " failed", e); } @@ -77,7 +78,7 @@ public void save(RqueueJob rqueueJob, Duration expiry) { @Override public RqueueJob findById(String jobId) { - return loadByKey(sanitize(jobId)); + return loadByKey(NatsKvKeys.sanitize(jobId)); } @Override @@ -111,7 +112,7 @@ public List finByMessageIdIn(List messageIds) { @Override public void delete(String jobId) { try { - kv(null).delete(sanitize(jobId)); + kv(null).delete(NatsKvKeys.sanitize(jobId)); } catch (IOException | JetStreamApiException e) { log.log(Level.WARNING, "delete job " + jobId + " failed", e); } @@ -165,8 +166,4 @@ private RqueueJob deserialize(byte[] bytes) { } } - /** KV keys allow {@code [A-Za-z0-9_=.-]} only. */ - private static String sanitize(String key) { - return key == null ? "_" : key.replaceAll("[^A-Za-z0-9_=.-]", "_"); - } } diff --git a/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/dao/NatsRqueueQStatsDao.java b/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/dao/NatsRqueueQStatsDao.java index 14154012..1ea7152f 100644 --- a/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/dao/NatsRqueueQStatsDao.java +++ b/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/dao/NatsRqueueQStatsDao.java @@ -20,6 +20,7 @@ import com.github.sonus21.rqueue.models.db.QueueStatistics; import com.github.sonus21.rqueue.nats.internal.NatsProvisioner; import com.github.sonus21.rqueue.nats.kv.NatsKvBuckets; +import com.github.sonus21.rqueue.nats.kv.NatsKvKeys; import io.nats.client.JetStreamApiException; import io.nats.client.KeyValue; import io.nats.client.api.KeyValueEntry; @@ -72,7 +73,7 @@ public QueueStatistics findById(String id) { return null; } try { - KeyValueEntry entry = kv().get(sanitize(id)); + KeyValueEntry entry = kv().get(NatsKvKeys.sanitize(id)); if (entry == null || entry.getValue() == null) { return null; } @@ -104,7 +105,7 @@ public void save(QueueStatistics queueStatistics) { throw new IllegalArgumentException("id cannot be null: " + queueStatistics); } try { - kv().put(sanitize(queueStatistics.getId()), serialize(queueStatistics)); + kv().put(NatsKvKeys.sanitize(queueStatistics.getId()), serialize(queueStatistics)); } catch (IOException | JetStreamApiException e) { log.log(Level.WARNING, "save id=" + queueStatistics.getId() + " failed", e); } @@ -125,8 +126,4 @@ private QueueStatistics deserialize(byte[] bytes) { } } - /** KV keys allow {@code [A-Za-z0-9_=.-]} only. */ - private static String sanitize(String key) { - return key == null ? "_" : key.replaceAll("[^A-Za-z0-9_=.-]", "_"); - } } diff --git a/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/dao/NatsRqueueSystemConfigDao.java b/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/dao/NatsRqueueSystemConfigDao.java index d48e9a43..994241d8 100644 --- a/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/dao/NatsRqueueSystemConfigDao.java +++ b/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/dao/NatsRqueueSystemConfigDao.java @@ -15,6 +15,7 @@ import com.github.sonus21.rqueue.models.db.QueueConfig; import com.github.sonus21.rqueue.nats.internal.NatsProvisioner; import com.github.sonus21.rqueue.nats.kv.NatsKvBuckets; +import com.github.sonus21.rqueue.nats.kv.NatsKvKeys; import io.nats.client.JetStreamApiException; import io.nats.client.KeyValue; import io.nats.client.api.KeyValueEntry; @@ -74,7 +75,7 @@ public QueueConfig getConfigByName(String name, boolean cached) { return hit; } } - QueueConfig loaded = loadByKey(sanitize(name)); + QueueConfig loaded = loadByKey(NatsKvKeys.sanitize(name)); if (loaded != null) { cache.put(name, loaded); } @@ -120,7 +121,7 @@ public List findAllQConfig(Collection ids) { @Override public void saveQConfig(QueueConfig queueConfig) { try { - kv().put(sanitize(queueConfig.getName()), serialize(queueConfig)); + kv().put(NatsKvKeys.sanitize(queueConfig.getName()), serialize(queueConfig)); cache.put(queueConfig.getName(), queueConfig); } catch (IOException | JetStreamApiException e) { log.log(Level.WARNING, "saveQConfig " + queueConfig.getName() + " failed", e); @@ -189,8 +190,4 @@ private QueueConfig deserialize(byte[] bytes) { } } - /** KV keys allow {@code [A-Za-z0-9_=.-]} only. */ - private static String sanitize(String key) { - return key == null ? "_" : key.replaceAll("[^A-Za-z0-9_=.-]", "_"); - } } diff --git a/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/kv/NatsKvKeys.java b/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/kv/NatsKvKeys.java new file mode 100644 index 00000000..26f71b72 --- /dev/null +++ b/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/kv/NatsKvKeys.java @@ -0,0 +1,29 @@ +/* + * Copyright (c) 2026 Sonu Kumar + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + */ +package com.github.sonus21.rqueue.nats.kv; + +/** Utility methods for converting rqueue keys into NATS JetStream KV keys. */ +public final class NatsKvKeys { + + private NatsKvKeys() {} + + /** + * NATS KV keys do not need Redis Cluster hash tags. Strip any {@code {name}} tag first, then + * coerce the remaining key into the conservative KV key character set used by this module. + */ + public static String sanitize(String key) { + if (key == null) { + return "_"; + } + String sanitized = + key.replaceAll("\\{([^{}]*)}", "$1").replaceAll("[^A-Za-z0-9_=.-]", "_"); + return sanitized.isEmpty() ? "_" : sanitized; + } +} diff --git a/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/lock/NatsRqueueLockManager.java b/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/lock/NatsRqueueLockManager.java index b19b77f4..eafebb36 100644 --- a/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/lock/NatsRqueueLockManager.java +++ b/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/lock/NatsRqueueLockManager.java @@ -14,6 +14,7 @@ import com.github.sonus21.rqueue.config.NatsBackendCondition; import com.github.sonus21.rqueue.nats.internal.NatsProvisioner; import com.github.sonus21.rqueue.nats.kv.NatsKvBuckets; +import com.github.sonus21.rqueue.nats.kv.NatsKvKeys; import io.nats.client.JetStreamApiException; import io.nats.client.KeyValue; import io.nats.client.api.KeyValueEntry; @@ -57,7 +58,7 @@ public NatsRqueueLockManager(NatsProvisioner provisioner) { public boolean acquireLock(String lockKey, String lockValue, Duration duration) { try { KeyValue kv = provisioner.ensureKv(BUCKET_NAME, duration); - kv.create(sanitize(lockKey), lockValue.getBytes(StandardCharsets.UTF_8)); + kv.create(NatsKvKeys.sanitize(lockKey), lockValue.getBytes(StandardCharsets.UTF_8)); return true; } catch (JetStreamApiException existing) { // Most common path: key already exists; another holder owns the lock. @@ -75,7 +76,7 @@ public boolean acquireLock(String lockKey, String lockValue, Duration duration) public boolean releaseLock(String lockKey, String lockValue) { try { KeyValue kv = provisioner.ensureKv(BUCKET_NAME, Duration.ofSeconds(60)); - String key = sanitize(lockKey); + String key = NatsKvKeys.sanitize(lockKey); KeyValueEntry entry = kv.get(key); if (entry == null) { return false; @@ -92,12 +93,4 @@ public boolean releaseLock(String lockKey, String lockValue) { } } - /** - * KV keys allow {@code [A-Za-z0-9_=.-]} only; coerce other characters to {@code _} so the - * caller can pass arbitrary lock keys. {@code $} and {@code #} surface in queue/listener - * names from inner classes and the legacy separator respectively. - */ - private static String sanitize(String key) { - return key == null ? "_" : key.replaceAll("[^A-Za-z0-9_=.-]", "_"); - } } diff --git a/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/service/NatsRqueueMessageMetadataService.java b/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/service/NatsRqueueMessageMetadataService.java index 24db30df..fe13da4a 100644 --- a/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/service/NatsRqueueMessageMetadataService.java +++ b/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/service/NatsRqueueMessageMetadataService.java @@ -17,6 +17,7 @@ import com.github.sonus21.rqueue.models.enums.MessageStatus; import com.github.sonus21.rqueue.nats.internal.NatsProvisioner; import com.github.sonus21.rqueue.nats.kv.NatsKvBuckets; +import com.github.sonus21.rqueue.nats.kv.NatsKvKeys; import com.github.sonus21.rqueue.service.RqueueMessageMetadataService; import io.nats.client.JetStreamApiException; import io.nats.client.KeyValue; @@ -72,13 +73,13 @@ private KeyValue kv() throws IOException, JetStreamApiException { @Override public MessageMetadata get(String id) { - return loadByKey(sanitize(id)); + return loadByKey(NatsKvKeys.sanitize(id)); } @Override public void delete(String id) { try { - kv().delete(sanitize(id)); + kv().delete(NatsKvKeys.sanitize(id)); } catch (IOException | JetStreamApiException e) { log.log(Level.WARNING, "delete metadata " + id + " failed", e); } @@ -106,7 +107,7 @@ public List findAll(Collection ids) { @Override public void save(MessageMetadata messageMetadata, Duration ttl, boolean checkUnique) { try { - kv().put(sanitize(messageMetadata.getId()), serialize(messageMetadata)); + kv().put(NatsKvKeys.sanitize(messageMetadata.getId()), serialize(messageMetadata)); } catch (IOException | JetStreamApiException e) { log.log(Level.WARNING, "save metadata " + messageMetadata.getId() + " failed", e); } @@ -160,7 +161,7 @@ public List> readMessageMetadataForQueue( try { List keys = new ArrayList<>(kv().keys()); List> out = new ArrayList<>(); - String prefix = sanitize(queueName); + String prefix = NatsKvKeys.sanitize(queueName); for (String k : keys) { if (!k.startsWith(prefix)) { continue; @@ -193,7 +194,7 @@ public void saveMessageMetadataForQueue( public void deleteQueueMessages(String queueName, long before) { try { List keys = new ArrayList<>(kv().keys()); - String prefix = sanitize(queueName); + String prefix = NatsKvKeys.sanitize(queueName); for (String k : keys) { if (!k.startsWith(prefix)) { continue; @@ -239,8 +240,4 @@ private MessageMetadata deserialize(byte[] bytes) { } } - /** KV keys allow {@code [A-Za-z0-9_=.-]} only. */ - private static String sanitize(String key) { - return key == null ? "_" : key.replaceAll("[^A-Za-z0-9_=.-]", "_"); - } } diff --git a/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/worker/NatsWorkerRegistryStore.java b/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/worker/NatsWorkerRegistryStore.java index 0db7ee62..9a9eba13 100644 --- a/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/worker/NatsWorkerRegistryStore.java +++ b/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/worker/NatsWorkerRegistryStore.java @@ -20,6 +20,7 @@ import com.github.sonus21.rqueue.models.registry.RqueueWorkerInfo; import com.github.sonus21.rqueue.nats.internal.NatsProvisioner; import com.github.sonus21.rqueue.nats.kv.NatsKvBuckets; +import com.github.sonus21.rqueue.nats.kv.NatsKvKeys; import com.github.sonus21.rqueue.worker.WorkerRegistryStore; import io.nats.client.JetStreamApiException; import io.nats.client.KeyValue; @@ -86,7 +87,7 @@ public void putWorkerInfo(String workerKey, RqueueWorkerInfo info, Duration ttl) } try { KeyValue kv = provisioner.ensureKv(WORKER_BUCKET, workerBucketTtl); - kv.put(sanitize(workerKey), serialize(info)); + kv.put(NatsKvKeys.sanitize(workerKey), serialize(info)); } catch (IOException | JetStreamApiException e) { log.log(Level.WARNING, "putWorkerInfo " + workerKey + " failed", e); } @@ -96,7 +97,7 @@ public void putWorkerInfo(String workerKey, RqueueWorkerInfo info, Duration ttl) public void deleteWorkerInfo(String workerKey) { try { KeyValue kv = provisioner.ensureKv(WORKER_BUCKET, workerBucketTtl); - kv.delete(sanitize(workerKey)); + kv.delete(NatsKvKeys.sanitize(workerKey)); } catch (IOException | JetStreamApiException e) { log.log(Level.WARNING, "deleteWorkerInfo " + workerKey + " failed", e); } @@ -111,7 +112,7 @@ public Map getWorkerInfos(Collection workerKey try { KeyValue kv = provisioner.ensureKv(WORKER_BUCKET, workerBucketTtl); for (String key : workerKeys) { - KeyValueEntry entry = kv.get(sanitize(key)); + KeyValueEntry entry = kv.get(NatsKvKeys.sanitize(key)); if (entry == null || entry.getValue() == null) { continue; } @@ -146,7 +147,7 @@ public Map getQueueHeartbeats(String queueKey) { Map out = new LinkedHashMap<>(); try { KeyValue kv = provisioner.ensureKv(HEARTBEAT_BUCKET, heartbeatBucketTtl); - String prefix = sanitize(queueKey) + SEP; + String prefix = NatsKvKeys.sanitize(queueKey) + SEP; List keys = new ArrayList<>(kv.keys()); for (String k : keys) { if (!k.startsWith(prefix)) { @@ -197,12 +198,7 @@ public void refreshQueueTtl(String queueKey, Duration ttl) { // ---- helpers ---------------------------------------------------------- private static String compositeKey(String queueKey, String workerId) { - return sanitize(queueKey) + SEP + sanitize(workerId); - } - - /** KV keys allow {@code [A-Za-z0-9_=.-]} only. */ - private static String sanitize(String key) { - return key == null ? "_" : key.replaceAll("[^A-Za-z0-9_=.-]", "_"); + return NatsKvKeys.sanitize(queueKey) + SEP + NatsKvKeys.sanitize(workerId); } private byte[] serialize(RqueueWorkerInfo info) throws IOException { diff --git a/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/kv/NatsKvKeysTest.java b/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/kv/NatsKvKeysTest.java new file mode 100644 index 00000000..9cba9561 --- /dev/null +++ b/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/kv/NatsKvKeysTest.java @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2026 Sonu Kumar + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + */ +package com.github.sonus21.rqueue.nats.kv; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import com.github.sonus21.rqueue.nats.NatsUnitTest; +import org.junit.jupiter.api.Test; + +@NatsUnitTest +class NatsKvKeysTest { + + @Test + void sanitizeStripsRedisHashTagsBeforeReplacingUnsupportedCharacters() { + assertEquals( + "__rq__q-pollers__email-notification-dispatch-queue", + NatsKvKeys.sanitize("__rq::q-pollers::{email-notification-dispatch-queue}")); + } + + @Test + void sanitizeReplacesUnsupportedCharacters() { + assertEquals("orders_2__w_1", NatsKvKeys.sanitize("orders$2::w#1")); + } + + @Test + void sanitizeReturnsFallbackForNullOrEmptyResult() { + assertEquals("_", NatsKvKeys.sanitize(null)); + assertEquals("_", NatsKvKeys.sanitize("{}")); + } +} diff --git a/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/worker/NatsWorkerRegistryStoreTest.java b/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/worker/NatsWorkerRegistryStoreTest.java index 79d9618e..3ee82feb 100644 --- a/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/worker/NatsWorkerRegistryStoreTest.java +++ b/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/worker/NatsWorkerRegistryStoreTest.java @@ -176,6 +176,15 @@ void putQueueHeartbeat_sanitisesSpecialCharsInQueueKey() verify(heartbeatKv).put(eq("orders_2__w_1"), any(byte[].class)); } + @Test + void putQueueHeartbeat_stripsRedisHashTagsFromQueueKey() + throws IOException, JetStreamApiException { + String queueKey = "__rq::q-pollers::{email-notification-dispatch-queue}"; + store.putQueueHeartbeat(queueKey, "w1", "{}"); + verify(heartbeatKv) + .put(eq("__rq__q-pollers__email-notification-dispatch-queue__w1"), any(byte[].class)); + } + @Test void putQueueHeartbeat_ioException_swallowed() throws IOException, JetStreamApiException { doThrow(new IOException("bucket gone")).when(heartbeatKv).put(anyString(), any(byte[].class)); From 0b52350eef204f3e01f2bf5a46ec6e083eba1add Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Sun, 17 May 2026 14:32:09 +0000 Subject: [PATCH 5/5] Apply Palantir Java Format --- .../com/github/sonus21/rqueue/nats/dao/NatsRqueueJobDao.java | 1 - .../github/sonus21/rqueue/nats/dao/NatsRqueueQStatsDao.java | 1 - .../sonus21/rqueue/nats/dao/NatsRqueueSystemConfigDao.java | 1 - .../java/com/github/sonus21/rqueue/nats/kv/NatsKvKeys.java | 3 +-- .../github/sonus21/rqueue/nats/lock/NatsRqueueLockManager.java | 1 - .../rqueue/nats/service/NatsRqueueMessageMetadataService.java | 1 - 6 files changed, 1 insertion(+), 7 deletions(-) diff --git a/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/dao/NatsRqueueJobDao.java b/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/dao/NatsRqueueJobDao.java index 804937ec..db1b02b7 100644 --- a/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/dao/NatsRqueueJobDao.java +++ b/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/dao/NatsRqueueJobDao.java @@ -165,5 +165,4 @@ private RqueueJob deserialize(byte[] bytes) { return null; } } - } diff --git a/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/dao/NatsRqueueQStatsDao.java b/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/dao/NatsRqueueQStatsDao.java index 1ea7152f..1ba04fab 100644 --- a/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/dao/NatsRqueueQStatsDao.java +++ b/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/dao/NatsRqueueQStatsDao.java @@ -125,5 +125,4 @@ private QueueStatistics deserialize(byte[] bytes) { return null; } } - } diff --git a/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/dao/NatsRqueueSystemConfigDao.java b/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/dao/NatsRqueueSystemConfigDao.java index 994241d8..289f3f6f 100644 --- a/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/dao/NatsRqueueSystemConfigDao.java +++ b/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/dao/NatsRqueueSystemConfigDao.java @@ -189,5 +189,4 @@ private QueueConfig deserialize(byte[] bytes) { return null; } } - } diff --git a/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/kv/NatsKvKeys.java b/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/kv/NatsKvKeys.java index 26f71b72..0e6ec16f 100644 --- a/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/kv/NatsKvKeys.java +++ b/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/kv/NatsKvKeys.java @@ -22,8 +22,7 @@ public static String sanitize(String key) { if (key == null) { return "_"; } - String sanitized = - key.replaceAll("\\{([^{}]*)}", "$1").replaceAll("[^A-Za-z0-9_=.-]", "_"); + String sanitized = key.replaceAll("\\{([^{}]*)}", "$1").replaceAll("[^A-Za-z0-9_=.-]", "_"); return sanitized.isEmpty() ? "_" : sanitized; } } diff --git a/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/lock/NatsRqueueLockManager.java b/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/lock/NatsRqueueLockManager.java index eafebb36..2b648341 100644 --- a/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/lock/NatsRqueueLockManager.java +++ b/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/lock/NatsRqueueLockManager.java @@ -92,5 +92,4 @@ public boolean releaseLock(String lockKey, String lockValue) { return false; } } - } diff --git a/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/service/NatsRqueueMessageMetadataService.java b/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/service/NatsRqueueMessageMetadataService.java index fe13da4a..8239f4b0 100644 --- a/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/service/NatsRqueueMessageMetadataService.java +++ b/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/service/NatsRqueueMessageMetadataService.java @@ -239,5 +239,4 @@ private MessageMetadata deserialize(byte[] bytes) { return null; } } - }