From 7f8f3377b58dd9ed829041e07a16f1c6bf2fae1e Mon Sep 17 00:00:00 2001 From: Sonu Kumar Date: Thu, 21 May 2026 16:49:11 +0530 Subject: [PATCH 01/16] Delay Rqueue startup for Boot web apps --- .../RqueueMessageListenerContainer.java | 8 ++- .../boot/RqueueAutoStartupLifecycle.java | 58 +++++++++++++++++++ .../spring/boot/RqueueListenerAutoConfig.java | 10 ++++ .../unit/RqueueListenerAutoConfigTest.java | 51 ++++++++++++++++ 4 files changed, 125 insertions(+), 2 deletions(-) create mode 100644 rqueue-spring-boot-starter/src/main/java/com/github/sonus21/rqueue/spring/boot/RqueueAutoStartupLifecycle.java diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainer.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainer.java index 190c7be4..a4b3ded8 100644 --- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainer.java +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainer.java @@ -517,10 +517,14 @@ private List getQueueDetail(String queue, MappingInformation mappin @Override public void start() { - log.info("Starting Rqueue Message container {}", RqueueConfig.getBrokerId()); synchronized (lifecycleMgr) { - running = true; + if (running) { + log.debug("Rqueue Message container {} is already running", RqueueConfig.getBrokerId()); + return; + } + log.info("Starting Rqueue Message container {}", RqueueConfig.getBrokerId()); doStart(); + running = true; rqueueBeanProvider .getApplicationEventPublisher() .publishEvent(new RqueueBootstrapEvent(EVENT_SOURCE, true)); diff --git a/rqueue-spring-boot-starter/src/main/java/com/github/sonus21/rqueue/spring/boot/RqueueAutoStartupLifecycle.java b/rqueue-spring-boot-starter/src/main/java/com/github/sonus21/rqueue/spring/boot/RqueueAutoStartupLifecycle.java new file mode 100644 index 00000000..0f9013c1 --- /dev/null +++ b/rqueue-spring-boot-starter/src/main/java/com/github/sonus21/rqueue/spring/boot/RqueueAutoStartupLifecycle.java @@ -0,0 +1,58 @@ +/* + * Copyright (c) 2026 Sonu Kumar + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the License. + * + */ + +package com.github.sonus21.rqueue.spring.boot; + +import com.github.sonus21.rqueue.listener.RqueueMessageListenerContainer; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.config.BeanPostProcessor; +import org.springframework.boot.context.event.ApplicationReadyEvent; +import org.springframework.context.ApplicationListener; + +/** + * Delays Rqueue poller startup in Boot web applications until the servlet or reactive web server + * is ready and Spring Boot has published {@link ApplicationReadyEvent}. + */ +public class RqueueAutoStartupLifecycle + implements BeanPostProcessor, ApplicationListener { + + private final Set delayedContainers = + ConcurrentHashMap.newKeySet(); + + @Override + public Object postProcessBeforeInitialization(Object bean, String beanName) + throws BeansException { + if (bean instanceof RqueueMessageListenerContainer) { + RqueueMessageListenerContainer container = (RqueueMessageListenerContainer) bean; + if (container.isAutoStartup()) { + container.setAutoStartup(false); + delayedContainers.add(container); + } + } + return bean; + } + + @Override + public void onApplicationEvent(ApplicationReadyEvent event) { + for (RqueueMessageListenerContainer container : delayedContainers) { + if (!container.isRunning()) { + container.start(); + } + } + } +} diff --git a/rqueue-spring-boot-starter/src/main/java/com/github/sonus21/rqueue/spring/boot/RqueueListenerAutoConfig.java b/rqueue-spring-boot-starter/src/main/java/com/github/sonus21/rqueue/spring/boot/RqueueListenerAutoConfig.java index 9ac5c4fc..cfb166d4 100644 --- a/rqueue-spring-boot-starter/src/main/java/com/github/sonus21/rqueue/spring/boot/RqueueListenerAutoConfig.java +++ b/rqueue-spring-boot-starter/src/main/java/com/github/sonus21/rqueue/spring/boot/RqueueListenerAutoConfig.java @@ -33,8 +33,10 @@ import com.github.sonus21.rqueue.listener.RqueueMessageListenerContainer; import com.github.sonus21.rqueue.utils.condition.ReactiveEnabled; import com.github.sonus21.rqueue.utils.condition.RqueueEnabled; +import org.springframework.beans.factory.config.BeanDefinition; import org.springframework.boot.autoconfigure.AutoConfigureAfter; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnWebApplication; import org.springframework.boot.data.redis.autoconfigure.DataRedisAutoConfiguration; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.ComponentScan; @@ -42,6 +44,7 @@ import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.DependsOn; import org.springframework.context.annotation.Import; +import org.springframework.context.annotation.Role; @Configuration @AutoConfigureAfter(DataRedisAutoConfiguration.class) @@ -54,6 +57,13 @@ @Import(RqueueRedisConfigImportSelector.class) public class RqueueListenerAutoConfig extends RqueueListenerBaseConfig { + @Bean + @Role(BeanDefinition.ROLE_INFRASTRUCTURE) + @ConditionalOnWebApplication + public static RqueueAutoStartupLifecycle rqueueAutoStartupLifecycle() { + return new RqueueAutoStartupLifecycle(); + } + @Bean @ConditionalOnMissingBean public RqueueMessageHandler rqueueMessageHandler(MessageBroker messageBroker) { diff --git a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/unit/RqueueListenerAutoConfigTest.java b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/unit/RqueueListenerAutoConfigTest.java index b7e2986e..8234dc93 100644 --- a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/unit/RqueueListenerAutoConfigTest.java +++ b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/unit/RqueueListenerAutoConfigTest.java @@ -17,6 +17,7 @@ package com.github.sonus21.rqueue.spring.boot.tests.unit; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -35,6 +36,8 @@ import com.github.sonus21.rqueue.core.spi.Capabilities; import com.github.sonus21.rqueue.core.spi.MessageBroker; import com.github.sonus21.rqueue.listener.RqueueMessageHandler; +import com.github.sonus21.rqueue.listener.RqueueMessageListenerContainer; +import com.github.sonus21.rqueue.spring.boot.RqueueAutoStartupLifecycle; import com.github.sonus21.rqueue.spring.boot.RqueueListenerAutoConfig; import com.github.sonus21.rqueue.spring.boot.tests.SpringBootUnitTest; import org.apache.commons.lang3.reflect.FieldUtils; @@ -125,6 +128,33 @@ void rqueueMessageListenerContainer() assertSame(messageBroker, factory.getMessageBroker()); } + @Test + void autoStartupLifecycleDelaysOnlyAutoStartupContainers() { + RqueueAutoStartupLifecycle lifecycle = new RqueueAutoStartupLifecycle(); + TestRqueueMessageListenerContainer autoStartupContainer = + new TestRqueueMessageListenerContainer(); + TestRqueueMessageListenerContainer disabledContainer = + new TestRqueueMessageListenerContainer(); + TestRqueueMessageListenerContainer alreadyRunningContainer = + new TestRqueueMessageListenerContainer(); + disabledContainer.setAutoStartup(false); + + lifecycle.postProcessBeforeInitialization(autoStartupContainer, "autoStartupContainer"); + lifecycle.postProcessBeforeInitialization(disabledContainer, "disabledContainer"); + lifecycle.postProcessBeforeInitialization(alreadyRunningContainer, "alreadyRunningContainer"); + alreadyRunningContainer.running = true; + + assertFalse(autoStartupContainer.isAutoStartup()); + assertFalse(disabledContainer.isAutoStartup()); + assertFalse(alreadyRunningContainer.isAutoStartup()); + + lifecycle.onApplicationEvent(null); + + assertEquals(1, autoStartupContainer.startCount); + assertEquals(0, disabledContainer.startCount); + assertEquals(0, alreadyRunningContainer.startCount); + } + @Test void rqueueMessageEnqueuerWiresBroker() throws IllegalAccessException { SimpleRqueueListenerContainerFactory factory = new SimpleRqueueListenerContainerFactory(); @@ -158,4 +188,25 @@ void rqueueMessageSenderUsesConfiguredMessageConverter() throws IllegalAccessExc MessageConverter converter = messageSender.getMessageConverter(); assertTrue(converter.hashCode() == messageConverter.hashCode()); } + + private class TestRqueueMessageListenerContainer extends RqueueMessageListenerContainer { + + private int startCount; + private boolean running; + + private TestRqueueMessageListenerContainer() { + super(rqueueMessageHandler, messageTemplate); + } + + @Override + public void start() { + startCount++; + running = true; + } + + @Override + public boolean isRunning() { + return running; + } + } } From b612ab2d56309446e2562819c7f4229451bf7d03 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Thu, 21 May 2026 11:21:06 +0000 Subject: [PATCH 02/16] Apply Palantir Java Format --- .../spring/boot/tests/unit/RqueueListenerAutoConfigTest.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/unit/RqueueListenerAutoConfigTest.java b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/unit/RqueueListenerAutoConfigTest.java index 8234dc93..1c6e33a6 100644 --- a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/unit/RqueueListenerAutoConfigTest.java +++ b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/unit/RqueueListenerAutoConfigTest.java @@ -133,8 +133,7 @@ void autoStartupLifecycleDelaysOnlyAutoStartupContainers() { RqueueAutoStartupLifecycle lifecycle = new RqueueAutoStartupLifecycle(); TestRqueueMessageListenerContainer autoStartupContainer = new TestRqueueMessageListenerContainer(); - TestRqueueMessageListenerContainer disabledContainer = - new TestRqueueMessageListenerContainer(); + TestRqueueMessageListenerContainer disabledContainer = new TestRqueueMessageListenerContainer(); TestRqueueMessageListenerContainer alreadyRunningContainer = new TestRqueueMessageListenerContainer(); disabledContainer.setAutoStartup(false); From 2bb9ac0effe910ab5b3363157d2c0621ff99a296 Mon Sep 17 00:00:00 2001 From: Sonu Kumar Date: Thu, 21 May 2026 17:55:53 +0530 Subject: [PATCH 03/16] Make Boot Rqueue auto-start delay explicit --- .../boot/RqueueAutoStartupLifecycle.java | 8 +++++-- .../spring/boot/RqueueListenerAutoConfig.java | 22 ++++++++++++++++++- .../unit/RqueueListenerAutoConfigTest.java | 7 +++++- 3 files changed, 33 insertions(+), 4 deletions(-) diff --git a/rqueue-spring-boot-starter/src/main/java/com/github/sonus21/rqueue/spring/boot/RqueueAutoStartupLifecycle.java b/rqueue-spring-boot-starter/src/main/java/com/github/sonus21/rqueue/spring/boot/RqueueAutoStartupLifecycle.java index 0f9013c1..449aad45 100644 --- a/rqueue-spring-boot-starter/src/main/java/com/github/sonus21/rqueue/spring/boot/RqueueAutoStartupLifecycle.java +++ b/rqueue-spring-boot-starter/src/main/java/com/github/sonus21/rqueue/spring/boot/RqueueAutoStartupLifecycle.java @@ -34,14 +34,18 @@ public class RqueueAutoStartupLifecycle private final Set delayedContainers = ConcurrentHashMap.newKeySet(); + public void delayAutoStartup(RqueueMessageListenerContainer container) { + container.setAutoStartup(false); + delayedContainers.add(container); + } + @Override public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { if (bean instanceof RqueueMessageListenerContainer) { RqueueMessageListenerContainer container = (RqueueMessageListenerContainer) bean; if (container.isAutoStartup()) { - container.setAutoStartup(false); - delayedContainers.add(container); + delayAutoStartup(container); } } return bean; diff --git a/rqueue-spring-boot-starter/src/main/java/com/github/sonus21/rqueue/spring/boot/RqueueListenerAutoConfig.java b/rqueue-spring-boot-starter/src/main/java/com/github/sonus21/rqueue/spring/boot/RqueueListenerAutoConfig.java index cfb166d4..863d9d5a 100644 --- a/rqueue-spring-boot-starter/src/main/java/com/github/sonus21/rqueue/spring/boot/RqueueListenerAutoConfig.java +++ b/rqueue-spring-boot-starter/src/main/java/com/github/sonus21/rqueue/spring/boot/RqueueListenerAutoConfig.java @@ -33,6 +33,7 @@ import com.github.sonus21.rqueue.listener.RqueueMessageListenerContainer; import com.github.sonus21.rqueue.utils.condition.ReactiveEnabled; import com.github.sonus21.rqueue.utils.condition.RqueueEnabled; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.config.BeanDefinition; import org.springframework.boot.autoconfigure.AutoConfigureAfter; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; @@ -57,6 +58,9 @@ @Import(RqueueRedisConfigImportSelector.class) public class RqueueListenerAutoConfig extends RqueueListenerBaseConfig { + @Autowired(required = false) + private RqueueAutoStartupLifecycle rqueueAutoStartupLifecycle; + @Bean @Role(BeanDefinition.ROLE_INFRASTRUCTURE) @ConditionalOnWebApplication @@ -83,7 +87,23 @@ public RqueueMessageListenerContainer rqueueMessageListenerContainer( if (simpleRqueueListenerContainerFactory.getMessageBroker() == null) { simpleRqueueListenerContainerFactory.setMessageBroker(messageBroker); } - return simpleRqueueListenerContainerFactory.createMessageListenerContainer(); + boolean delayAutoStartup = + rqueueAutoStartupLifecycle != null && simpleRqueueListenerContainerFactory.getAutoStartup(); + if (delayAutoStartup) { + simpleRqueueListenerContainerFactory.setAutoStartup(false); + } + RqueueMessageListenerContainer container; + try { + container = simpleRqueueListenerContainerFactory.createMessageListenerContainer(); + } finally { + if (delayAutoStartup) { + simpleRqueueListenerContainerFactory.setAutoStartup(true); + } + } + if (delayAutoStartup) { + rqueueAutoStartupLifecycle.delayAutoStartup(container); + } + return container; } @Bean diff --git a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/unit/RqueueListenerAutoConfigTest.java b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/unit/RqueueListenerAutoConfigTest.java index 1c6e33a6..2f572c19 100644 --- a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/unit/RqueueListenerAutoConfigTest.java +++ b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/unit/RqueueListenerAutoConfigTest.java @@ -122,10 +122,15 @@ void rqueueMessageListenerContainer() "com.github.sonus21.rqueue.converter.DefaultMessageConverterProvider", true); FieldUtils.writeField(messageAutoConfig, "simpleRqueueListenerContainerFactory", factory, true); - messageAutoConfig.rqueueMessageListenerContainer(rqueueMessageHandler, messageBroker); + FieldUtils.writeField( + messageAutoConfig, "rqueueAutoStartupLifecycle", new RqueueAutoStartupLifecycle(), true); + RqueueMessageListenerContainer container = + messageAutoConfig.rqueueMessageListenerContainer(rqueueMessageHandler, messageBroker); assertEquals(factory.getRqueueMessageHandler(null).hashCode(), rqueueMessageHandler.hashCode()); // The broker must be propagated onto the factory so the container picks it up. assertSame(messageBroker, factory.getMessageBroker()); + assertFalse(container.isAutoStartup()); + assertTrue(factory.getAutoStartup()); } @Test From d3e3e512ed6c29e129bb7d446040c0caea7051b3 Mon Sep 17 00:00:00 2001 From: Sonu Kumar Date: Thu, 21 May 2026 19:34:52 +0530 Subject: [PATCH 04/16] Use NATS fetch wait for listener long polling --- .../rqueue/core/spi/MessageBroker.java | 13 +++++++ .../rqueue/listener/RqueueMessagePoller.java | 11 ++---- ...sageListenerContainerBrokerBranchTest.java | 37 +++++++++++++++++-- .../nats/js/JetStreamMessageBroker.java | 19 ++++++---- .../nats/JetStreamMessageBrokerUnitTest.java | 9 +++++ 5 files changed, 71 insertions(+), 18 deletions(-) diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/spi/MessageBroker.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/spi/MessageBroker.java index 1f368f5a..578d314e 100644 --- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/spi/MessageBroker.java +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/spi/MessageBroker.java @@ -83,6 +83,19 @@ default Mono enqueueWithDelayReactive(QueueDetail q, RqueueMessage m, long return Mono.fromRunnable(() -> enqueueWithDelay(q, m, delayMs)); } + /** + * Resolve the wait duration that listener pollers should pass to {@link #pop}. The default uses + * the listener container's polling interval, preserving existing Redis behavior where that value + * also controls idle sleeps. Backends with native long-poll controls can override this so their + * fetch wait can be tuned independently. + * + * @param pollingInterval listener container polling interval + * @return wait duration for listener-driven pop calls + */ + default Duration getPollWait(Duration pollingInterval) { + return pollingInterval; + } + List pop(QueueDetail q, String consumerName, int batch, Duration wait); /** diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RqueueMessagePoller.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RqueueMessagePoller.java index dfdddf92..8927c4da 100644 --- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RqueueMessagePoller.java +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RqueueMessagePoller.java @@ -19,6 +19,7 @@ import com.github.sonus21.rqueue.core.RqueueBeanProvider; import com.github.sonus21.rqueue.core.RqueueMessage; import com.github.sonus21.rqueue.core.middleware.Middleware; +import com.github.sonus21.rqueue.core.spi.MessageBroker; import com.github.sonus21.rqueue.listener.RqueueMessageListenerContainer.QueueStateMgr; import com.github.sonus21.rqueue.utils.Constants; import com.github.sonus21.rqueue.utils.QueueThreadPool; @@ -59,13 +60,9 @@ abstract class RqueueMessagePoller extends MessageContainerBase { } private List getMessages(QueueDetail queueDetail, int count) { - return rqueueBeanProvider - .getMessageBroker() - .pop( - queueDetail, - queueDetail.resolvedConsumerName(), - count, - Duration.ofMillis(pollingInterval)); + MessageBroker broker = rqueueBeanProvider.getMessageBroker(); + Duration wait = broker.getPollWait(Duration.ofMillis(pollingInterval)); + return broker.pop(queueDetail, queueDetail.resolvedConsumerName(), count, wait); } private void execute( diff --git a/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainerBrokerBranchTest.java b/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainerBrokerBranchTest.java index 3b4ebe34..f407ed9e 100644 --- a/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainerBrokerBranchTest.java +++ b/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainerBrokerBranchTest.java @@ -15,6 +15,7 @@ */ package com.github.sonus21.rqueue.listener; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -108,6 +109,7 @@ static class CountingBroker implements MessageBroker, AutoCloseable { final AtomicInteger popCalls = new AtomicInteger(); final AtomicBoolean closed = new AtomicBoolean(); volatile Duration lastWait; + volatile Duration pollWait; private final Capabilities caps; CountingBroker(Capabilities caps) { @@ -120,6 +122,11 @@ public void enqueue(QueueDetail q, RqueueMessage m) {} @Override public void enqueueWithDelay(QueueDetail q, RqueueMessage m, long delayMs) {} + @Override + public Duration getPollWait(Duration pollingInterval) { + return pollWait != null ? pollWait : MessageBroker.super.getPollWait(pollingInterval); + } + @Override public List pop(QueueDetail q, String consumerName, int batch, Duration wait) { popCalls.incrementAndGet(); @@ -248,9 +255,33 @@ void pollerForwardsPollingIntervalAsBrokerFetchWait() throws Exception { Duration wait = broker.lastWait; assertNotNull(wait, "broker should have received a wait duration"); assertFalse(wait.isZero(), "wait must not be Duration.ZERO; should match pollingInterval"); - assertTrue( - wait.toMillis() == pollingInterval, - "wait should equal the configured pollingInterval (got " + wait + ")"); + assertEquals(Duration.ofMillis(pollingInterval), wait); + } + + @Test + void pollerUsesBrokerResolvedFetchWait() throws Exception { + EndpointRegistry.delete(); + CountingBroker broker = + new CountingBroker(new Capabilities(true, false, false, true, true, true)); + broker.pollWait = Duration.ofSeconds(2); + RqueueMessageListenerContainer container = + new RqueueMessageListenerContainer(messageHandler, rqueueMessageTemplate); + container.rqueueBeanProvider = beanProvider; + container.setMessageBroker(broker); + container.setPollingInterval(137L); + container.afterPropertiesSet(); + container.start(); + try { + long deadline = System.currentTimeMillis() + 2000; + while (broker.popCalls.get() == 0 && System.currentTimeMillis() < deadline) { + Thread.sleep(20); + } + } finally { + container.stop(); + container.destroy(); + } + assertTrue(broker.popCalls.get() > 0, "poller should have issued at least one pop call"); + assertEquals(broker.pollWait, broker.lastWait); } private class TrackingContainer extends RqueueMessageListenerContainer { diff --git a/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/js/JetStreamMessageBroker.java b/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/js/JetStreamMessageBroker.java index 688841f1..4e661ff2 100644 --- a/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/js/JetStreamMessageBroker.java +++ b/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/js/JetStreamMessageBroker.java @@ -100,10 +100,7 @@ public class JetStreamMessageBroker implements MessageBroker, AutoCloseable { /** * Lower bound for fetch wait when the caller passes a non-positive duration. JetStream rejects - * zero on a pull fetch, so any zero/negative wait is rounded up to this minimum. Callers that - * want long-poll semantics should pass the desired wait explicitly (e.g. the listener - * container's {@code pollingInterval}); this constant only guards against accidental zero waits - * from non-listener callers. + * zero on a pull fetch, so any zero/negative wait is rounded up to this minimum. */ private static final Duration MIN_FETCH_WAIT = Duration.ofMillis(50); @@ -463,6 +460,12 @@ public Mono enqueueWithDelayReactive(QueueDetail q, RqueueMessage m, long .then(); } + @Override + public Duration getPollWait(Duration pollingInterval) { + Duration fetchWait = config == null ? null : config.getDefaultFetchWait(); + return fetchWait != null ? fetchWait : MessageBroker.super.getPollWait(pollingInterval); + } + @Override public List pop(QueueDetail q, String consumerName, int batch, Duration wait) { return popInternal( @@ -533,10 +536,10 @@ private List popInternal( Duration wait, Duration ackWait, long maxDeliver) { - // Honour the caller-supplied wait — this is the listener container's pollingInterval for - // RqueueMessagePoller, and lets JetStream long-poll instead of the broker firing a steady - // stream of $JS.API.CONSUMER.MSG.NEXT requests. Only fall back when the caller didn't - // express a preference; zero/negative waits are rounded up to the JetStream minimum. + // Honour the caller-supplied wait. Listener pollers obtain this from getPollWait(), so NATS + // can use rqueue.nats.consumer.fetch-wait for long polling while keeping the listener + // pollingInterval as its idle sleep. Only fall back when the caller didn't express a + // preference; zero/negative waits are rounded up to the JetStream minimum. Duration fetchWait; if (wait == null) { fetchWait = config.getDefaultFetchWait(); diff --git a/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/JetStreamMessageBrokerUnitTest.java b/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/JetStreamMessageBrokerUnitTest.java index 2aa9a912..62b1a80c 100644 --- a/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/JetStreamMessageBrokerUnitTest.java +++ b/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/JetStreamMessageBrokerUnitTest.java @@ -39,6 +39,7 @@ import io.nats.client.api.PublishAck; import io.nats.client.impl.Headers; import java.io.IOException; +import java.time.Duration; import java.util.concurrent.CompletableFuture; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; @@ -92,6 +93,14 @@ void enqueue_publishesToPrefixedSubject() throws Exception { verify(f.js, times(1)).publish(eq("rqueue.js.orders"), any(Headers.class), any(byte[].class)); } + @Test + void getPollWait_usesConfiguredFetchWait() { + RqueueNatsConfig cfg = RqueueNatsConfig.defaults().setDefaultFetchWait(Duration.ofSeconds(9)); + Fixture f = newFixture(cfg); + + assertEquals(Duration.ofSeconds(9), f.broker.getPollWait(Duration.ofMillis(137))); + } + @Test void enqueueWithPriority_appendsPrioritySuffixToSubject() throws Exception { Fixture f = newFixture(RqueueNatsConfig.defaults()); From 3839668e2ce2eebba14262a234db03c6a853a18c Mon Sep 17 00:00:00 2001 From: Sonu Kumar Date: Thu, 21 May 2026 21:51:05 +0530 Subject: [PATCH 05/16] Add MsgPack listener E2E coverage --- .../MessagePackageListenerTest.java | 243 ++++++++++++++++++ .../MsgPackMessageConverterProvider.java | 166 ++++++++++++ 2 files changed, 409 insertions(+) create mode 100644 rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/MessagePackageListenerTest.java create mode 100644 rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/MsgPackMessageConverterProvider.java diff --git a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/MessagePackageListenerTest.java b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/MessagePackageListenerTest.java new file mode 100644 index 00000000..5b5adee2 --- /dev/null +++ b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/MessagePackageListenerTest.java @@ -0,0 +1,243 @@ +/* + * Copyright (c) 2026 Sonu Kumar + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the License. + * + */ +package com.github.sonus21.rqueue.spring.boot.integration; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.github.sonus21.rqueue.annotation.RqueueListener; +import com.github.sonus21.rqueue.core.RqueueMessage; +import com.github.sonus21.rqueue.core.RqueueMessageEnqueuer; +import com.github.sonus21.rqueue.listener.RqueueMessageHeaders; +import com.github.sonus21.rqueue.test.application.BaseApplication; +import java.util.Objects; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; +import org.springframework.boot.WebApplicationType; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.builder.SpringApplicationBuilder; +import org.springframework.boot.data.redis.autoconfigure.DataRedisAutoConfiguration; +import org.springframework.boot.data.redis.autoconfigure.DataRedisReactiveAutoConfiguration; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.context.annotation.Import; +import org.springframework.messaging.Message; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.stereotype.Component; + +@Tag("springBootIntegration") +@Tag("integration") +@Tag("springBoot") +class MessagePackageListenerTest { + + private static final String MESSAGE_PACKAGE_QUEUE = "message-package-listener-package"; + private static final String MESSAGE_QUEUE = "message-package-listener-message"; + private static final String NATS_STREAM_PREFIX = "rqueue-js-messagePackageListener-"; + private static final String NATS_SUBJECT_PREFIX = "rqueue.js.messagePackageListener."; + private static final int REDIS_PORT = 8032; + + @ParameterizedTest(name = "{0}") + @EnumSource(BackendUnderTest.class) + void listenerReceivesMsgPackPayloadInsideSpringMessage(BackendUnderTest backend) + throws Exception { + try (ConfigurableApplicationContext context = startContext(backend)) { + TestListener listener = context.getBean(TestListener.class); + ListenerPayload payload = new ListenerPayload(backend.name(), "msgpack-message-package"); + + String messageId = + context.getBean(RqueueMessageEnqueuer.class).enqueue(MESSAGE_PACKAGE_QUEUE, payload); + + assertThat(listener.messagePackageLatch.await(20, TimeUnit.SECONDS)).isTrue(); + assertThat(listener.messagePackage.get()).isNotNull(); + assertThat(listener.messagePackage.get().getPayload()).isEqualTo(payload); + assertRqueueMessage( + listener.messagePackageRqueueMessage.get(), messageId, MESSAGE_PACKAGE_QUEUE, payload); + } + } + + @ParameterizedTest(name = "{0}") + @EnumSource(BackendUnderTest.class) + void listenerReceivesMsgPackPayload(BackendUnderTest backend) throws Exception { + try (ConfigurableApplicationContext context = startContext(backend)) { + TestListener listener = context.getBean(TestListener.class); + ListenerPayload payload = new ListenerPayload(backend.name(), "msgpack-message"); + + String messageId = + context.getBean(RqueueMessageEnqueuer.class).enqueue(MESSAGE_QUEUE, payload); + + assertThat(listener.messageLatch.await(20, TimeUnit.SECONDS)).isTrue(); + assertThat(listener.message.get()).isEqualTo(payload); + assertRqueueMessage(listener.messageRqueueMessage.get(), messageId, MESSAGE_QUEUE, payload); + } + } + + private ConfigurableApplicationContext startContext(BackendUnderTest backend) { + if (backend.isNats()) { + AbstractNatsBootIT.startNats(); + AbstractNatsBootIT.deleteStreamsWithPrefix(NATS_STREAM_PREFIX); + } + return new SpringApplicationBuilder(backend.applicationClass()) + .web(WebApplicationType.NONE) + .properties(backend.properties()) + .run(); + } + + private static void assertRqueueMessage( + RqueueMessage rqueueMessage, + String messageId, + String queueName, + ListenerPayload expectedPayload) { + assertThat(rqueueMessage).isNotNull(); + assertThat(rqueueMessage.getId()).isEqualTo(messageId); + assertThat(rqueueMessage.getQueueName()).isEqualTo(queueName); + assertThat(MsgPackMessageConverterProvider.isMsgPack(rqueueMessage.getMessage())) + .isTrue(); + assertThat(MsgPackMessageConverterProvider.decode(rqueueMessage.getMessage())) + .isEqualTo(expectedPayload); + } + + enum BackendUnderTest { + REDIS(RedisTestApp.class, new String[] { + "rqueue.backend=redis", + "spring.data.redis.host=127.0.0.1", + "spring.data.redis.port=" + REDIS_PORT, + "mysql.db.name=MessagePackageListenerTestRedis", + "use.system.redis=false" + }), + NATS(NatsTestApp.class, new String[] {}); + + private final Class applicationClass; + private final String[] properties; + + BackendUnderTest(Class applicationClass, String[] properties) { + this.applicationClass = applicationClass; + this.properties = properties; + } + + Class applicationClass() { + return applicationClass; + } + + String[] properties() { + String[] common = new String[] { + "rqueue.message.converter.provider.class=" + + MsgPackMessageConverterProvider.class.getName(), + }; + String[] backendProperties = isNats() + ? new String[] { + "rqueue.backend=nats", + "rqueue.nats.naming.stream-prefix=" + NATS_STREAM_PREFIX, + "rqueue.nats.naming.subject-prefix=" + NATS_SUBJECT_PREFIX, + "rqueue.nats.connection.url=" + AbstractNatsBootIT.activeNatsUrl() + } + : properties; + String[] merged = new String[common.length + backendProperties.length]; + System.arraycopy(common, 0, merged, 0, common.length); + System.arraycopy(backendProperties, 0, merged, common.length, backendProperties.length); + return merged; + } + + boolean isNats() { + return this == NATS; + } + } + + @SpringBootApplication + @Import(TestListener.class) + static class RedisTestApp extends BaseApplication {} + + @SpringBootApplication( + exclude = {DataRedisAutoConfiguration.class, DataRedisReactiveAutoConfiguration.class}) + @Import(TestListener.class) + static class NatsTestApp {} + + @Component + static class TestListener { + + final CountDownLatch messagePackageLatch = new CountDownLatch(1); + final CountDownLatch messageLatch = new CountDownLatch(1); + final AtomicReference> messagePackage = new AtomicReference<>(); + final AtomicReference messagePackageRqueueMessage = new AtomicReference<>(); + final AtomicReference message = new AtomicReference<>(); + final AtomicReference messageRqueueMessage = new AtomicReference<>(); + + @RqueueListener(value = MESSAGE_PACKAGE_QUEUE) + void onMessagePackage( + Message message, + @Header(RqueueMessageHeaders.MESSAGE) RqueueMessage rqueueMessage) { + messagePackage.set(message); + messagePackageRqueueMessage.set(rqueueMessage); + messagePackageLatch.countDown(); + } + + @RqueueListener(value = MESSAGE_QUEUE) + void onMessage( + ListenerPayload message, + @Header(RqueueMessageHeaders.MESSAGE) RqueueMessage rqueueMessage) { + this.message.set(message); + messageRqueueMessage.set(rqueueMessage); + messageLatch.countDown(); + } + } + + static class ListenerPayload { + + private String backend; + private String body; + + ListenerPayload() {} + + ListenerPayload(String backend, String body) { + this.backend = backend; + this.body = body; + } + + public String getBackend() { + return backend; + } + + public void setBackend(String backend) { + this.backend = backend; + } + + public String getBody() { + return body; + } + + public void setBody(String body) { + this.body = body; + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + if (!(other instanceof ListenerPayload)) { + return false; + } + ListenerPayload that = (ListenerPayload) other; + return Objects.equals(backend, that.backend) && Objects.equals(body, that.body); + } + + @Override + public int hashCode() { + return Objects.hash(backend, body); + } + } +} diff --git a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/MsgPackMessageConverterProvider.java b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/MsgPackMessageConverterProvider.java new file mode 100644 index 00000000..0ed73e15 --- /dev/null +++ b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/MsgPackMessageConverterProvider.java @@ -0,0 +1,166 @@ +/* + * Copyright (c) 2026 Sonu Kumar + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the License. + * + */ +package com.github.sonus21.rqueue.spring.boot.integration; + +import com.github.sonus21.rqueue.converter.MessageConverterProvider; +import java.io.ByteArrayOutputStream; +import java.nio.charset.StandardCharsets; +import java.util.Base64; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.converter.MessageConversionException; +import org.springframework.messaging.converter.MessageConverter; +import org.springframework.messaging.support.GenericMessage; + +public class MsgPackMessageConverterProvider implements MessageConverterProvider { + + private static final String PREFIX = "msgpack:"; + + @Override + public MessageConverter getConverter() { + return new MsgPackMessageConverter(); + } + + static boolean isMsgPack(String payload) { + return payload != null && payload.startsWith(PREFIX); + } + + static MessagePackageListenerTest.ListenerPayload decode(String payload) { + if (!isMsgPack(payload)) { + throw new MessageConversionException("Payload is not MsgPack encoded"); + } + return MsgPackCodec.decode(Base64.getDecoder().decode(payload.substring(PREFIX.length()))); + } + + private static class MsgPackMessageConverter implements MessageConverter { + + @Override + public Object fromMessage(Message message, Class targetClass) { + Object payload = message.getPayload(); + if (payload instanceof MessagePackageListenerTest.ListenerPayload) { + return payload; + } + if (!(payload instanceof String)) { + return null; + } + return decode((String) payload); + } + + @Override + public Message toMessage(Object payload, MessageHeaders headers) { + if (payload instanceof MessagePackageListenerTest.ListenerPayload) { + byte[] msgPackBytes = + MsgPackCodec.encode((MessagePackageListenerTest.ListenerPayload) payload); + return new GenericMessage<>(PREFIX + Base64.getEncoder().encodeToString(msgPackBytes)); + } + if (payload instanceof String) { + return new GenericMessage<>(payload); + } + return null; + } + } + + private static final class MsgPackCodec { + + private MsgPackCodec() {} + + static byte[] encode(MessagePackageListenerTest.ListenerPayload payload) { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + out.write(0x82); + writeString(out, "backend"); + writeString(out, payload.getBackend()); + writeString(out, "body"); + writeString(out, payload.getBody()); + return out.toByteArray(); + } + + static MessagePackageListenerTest.ListenerPayload decode(byte[] bytes) { + Cursor cursor = new Cursor(bytes); + int mapHeader = cursor.readUnsignedByte(); + int entries; + if ((mapHeader & 0xf0) == 0x80) { + entries = mapHeader & 0x0f; + } else { + throw new MessageConversionException("Expected MsgPack fixmap"); + } + String backend = null; + String body = null; + for (int i = 0; i < entries; i++) { + String key = readString(cursor); + String value = readString(cursor); + if ("backend".equals(key)) { + backend = value; + } else if ("body".equals(key)) { + body = value; + } + } + return new MessagePackageListenerTest.ListenerPayload(backend, body); + } + + private static void writeString(ByteArrayOutputStream out, String value) { + byte[] bytes = value.getBytes(StandardCharsets.UTF_8); + if (bytes.length <= 31) { + out.write(0xa0 | bytes.length); + } else if (bytes.length <= 255) { + out.write(0xd9); + out.write(bytes.length); + } else { + throw new MessageConversionException("Test MsgPack codec supports strings up to 255 bytes"); + } + out.writeBytes(bytes); + } + + private static String readString(Cursor cursor) { + int header = cursor.readUnsignedByte(); + int length; + if ((header & 0xe0) == 0xa0) { + length = header & 0x1f; + } else if (header == 0xd9) { + length = cursor.readUnsignedByte(); + } else { + throw new MessageConversionException("Expected MsgPack string"); + } + return new String(cursor.readBytes(length), StandardCharsets.UTF_8); + } + } + + private static final class Cursor { + + private final byte[] bytes; + private int index; + + Cursor(byte[] bytes) { + this.bytes = bytes; + } + + int readUnsignedByte() { + if (index >= bytes.length) { + throw new MessageConversionException("Unexpected end of MsgPack payload"); + } + return bytes[index++] & 0xff; + } + + byte[] readBytes(int length) { + if (index + length > bytes.length) { + throw new MessageConversionException("Unexpected end of MsgPack payload"); + } + byte[] value = new byte[length]; + System.arraycopy(bytes, index, value, 0, length); + index += length; + return value; + } + } +} From fb227d5f8728d63e4ba09c171b1f0b3faa75b2e8 Mon Sep 17 00:00:00 2001 From: Sonu Kumar Date: Thu, 21 May 2026 23:11:45 +0530 Subject: [PATCH 06/16] Stabilize NATS priority queue E2E test --- .../rqueue/spring/boot/integration/NatsPriorityQueuesE2EIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsPriorityQueuesE2EIT.java b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsPriorityQueuesE2EIT.java index a366a7f1..f4a62f5f 100644 --- a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsPriorityQueuesE2EIT.java +++ b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsPriorityQueuesE2EIT.java @@ -91,7 +91,7 @@ static class PriorityListener { final CountDownLatch latch = new CountDownLatch(10); final List received = Collections.synchronizedList(new ArrayList<>()); - @RqueueListener(value = "pq", priority = "high=10,low=1") + @RqueueListener(value = "pq", priority = "high=10,low=1", batchSize = "5", concurrency = "5") void onMessage(String payload) { received.add(payload); latch.countDown(); From dd3b6e45908f0f1d48f00f151a72c222d5bb82ea Mon Sep 17 00:00:00 2001 From: Sonu Kumar Date: Thu, 21 May 2026 23:16:34 +0530 Subject: [PATCH 07/16] Use MessagePack dependency in listener E2E test --- rqueue-spring-boot-starter/build.gradle | 1 + .../MsgPackMessageConverterProvider.java | 109 +++++------------- 2 files changed, 30 insertions(+), 80 deletions(-) diff --git a/rqueue-spring-boot-starter/build.gradle b/rqueue-spring-boot-starter/build.gradle index ac5dc746..de02433c 100644 --- a/rqueue-spring-boot-starter/build.gradle +++ b/rqueue-spring-boot-starter/build.gradle @@ -68,6 +68,7 @@ dependencies { testImplementation "org.springframework.boot:spring-boot-starter-data-redis-reactive:${springBootVersion}" testImplementation "org.springframework.boot:spring-boot-starter-data-jpa:${springBootVersion}" testImplementation "org.springframework.boot:spring-boot-devtools:${springBootVersion}" + testImplementation "org.msgpack:msgpack-core:0.9.9" testImplementation "org.testcontainers:testcontainers:${testcontainersVersion}" testImplementation "org.testcontainers:junit-jupiter:${testcontainersVersion}" } diff --git a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/MsgPackMessageConverterProvider.java b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/MsgPackMessageConverterProvider.java index 0ed73e15..688971af 100644 --- a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/MsgPackMessageConverterProvider.java +++ b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/MsgPackMessageConverterProvider.java @@ -16,9 +16,11 @@ package com.github.sonus21.rqueue.spring.boot.integration; import com.github.sonus21.rqueue.converter.MessageConverterProvider; -import java.io.ByteArrayOutputStream; -import java.nio.charset.StandardCharsets; +import java.io.IOException; import java.util.Base64; +import org.msgpack.core.MessageBufferPacker; +import org.msgpack.core.MessagePack; +import org.msgpack.core.MessageUnpacker; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.converter.MessageConversionException; @@ -78,89 +80,36 @@ private static final class MsgPackCodec { private MsgPackCodec() {} static byte[] encode(MessagePackageListenerTest.ListenerPayload payload) { - ByteArrayOutputStream out = new ByteArrayOutputStream(); - out.write(0x82); - writeString(out, "backend"); - writeString(out, payload.getBackend()); - writeString(out, "body"); - writeString(out, payload.getBody()); - return out.toByteArray(); + try (MessageBufferPacker packer = MessagePack.newDefaultBufferPacker()) { + packer.packMapHeader(2); + packer.packString("backend"); + packer.packString(payload.getBackend()); + packer.packString("body"); + packer.packString(payload.getBody()); + return packer.toByteArray(); + } catch (IOException e) { + throw new MessageConversionException("MsgPack encoding failed", e); + } } static MessagePackageListenerTest.ListenerPayload decode(byte[] bytes) { - Cursor cursor = new Cursor(bytes); - int mapHeader = cursor.readUnsignedByte(); - int entries; - if ((mapHeader & 0xf0) == 0x80) { - entries = mapHeader & 0x0f; - } else { - throw new MessageConversionException("Expected MsgPack fixmap"); - } - String backend = null; - String body = null; - for (int i = 0; i < entries; i++) { - String key = readString(cursor); - String value = readString(cursor); - if ("backend".equals(key)) { - backend = value; - } else if ("body".equals(key)) { - body = value; + try (MessageUnpacker unpacker = MessagePack.newDefaultUnpacker(bytes)) { + int entries = unpacker.unpackMapHeader(); + String backend = null; + String body = null; + for (int i = 0; i < entries; i++) { + String key = unpacker.unpackString(); + String value = unpacker.unpackString(); + if ("backend".equals(key)) { + backend = value; + } else if ("body".equals(key)) { + body = value; + } } + return new MessagePackageListenerTest.ListenerPayload(backend, body); + } catch (IOException e) { + throw new MessageConversionException("MsgPack decoding failed", e); } - return new MessagePackageListenerTest.ListenerPayload(backend, body); - } - - private static void writeString(ByteArrayOutputStream out, String value) { - byte[] bytes = value.getBytes(StandardCharsets.UTF_8); - if (bytes.length <= 31) { - out.write(0xa0 | bytes.length); - } else if (bytes.length <= 255) { - out.write(0xd9); - out.write(bytes.length); - } else { - throw new MessageConversionException("Test MsgPack codec supports strings up to 255 bytes"); - } - out.writeBytes(bytes); - } - - private static String readString(Cursor cursor) { - int header = cursor.readUnsignedByte(); - int length; - if ((header & 0xe0) == 0xa0) { - length = header & 0x1f; - } else if (header == 0xd9) { - length = cursor.readUnsignedByte(); - } else { - throw new MessageConversionException("Expected MsgPack string"); - } - return new String(cursor.readBytes(length), StandardCharsets.UTF_8); - } - } - - private static final class Cursor { - - private final byte[] bytes; - private int index; - - Cursor(byte[] bytes) { - this.bytes = bytes; - } - - int readUnsignedByte() { - if (index >= bytes.length) { - throw new MessageConversionException("Unexpected end of MsgPack payload"); - } - return bytes[index++] & 0xff; - } - - byte[] readBytes(int length) { - if (index + length > bytes.length) { - throw new MessageConversionException("Unexpected end of MsgPack payload"); - } - byte[] value = new byte[length]; - System.arraycopy(bytes, index, value, 0, length); - index += length; - return value; } } } From b5aeb980f377275dbafe3486240120524b037458 Mon Sep 17 00:00:00 2001 From: Sonu Kumar Date: Thu, 21 May 2026 23:19:41 +0530 Subject: [PATCH 08/16] Use MessagePack ObjectMapper in listener E2E test --- rqueue-spring-boot-starter/build.gradle | 2 +- .../MsgPackMessageConverterProvider.java | 35 ++++++------------- 2 files changed, 11 insertions(+), 26 deletions(-) diff --git a/rqueue-spring-boot-starter/build.gradle b/rqueue-spring-boot-starter/build.gradle index de02433c..f00e045f 100644 --- a/rqueue-spring-boot-starter/build.gradle +++ b/rqueue-spring-boot-starter/build.gradle @@ -68,7 +68,7 @@ dependencies { testImplementation "org.springframework.boot:spring-boot-starter-data-redis-reactive:${springBootVersion}" testImplementation "org.springframework.boot:spring-boot-starter-data-jpa:${springBootVersion}" testImplementation "org.springframework.boot:spring-boot-devtools:${springBootVersion}" - testImplementation "org.msgpack:msgpack-core:0.9.9" + testImplementation "org.msgpack:jackson-dataformat-msgpack:0.9.12" testImplementation "org.testcontainers:testcontainers:${testcontainersVersion}" testImplementation "org.testcontainers:junit-jupiter:${testcontainersVersion}" } diff --git a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/MsgPackMessageConverterProvider.java b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/MsgPackMessageConverterProvider.java index 688971af..94f380bf 100644 --- a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/MsgPackMessageConverterProvider.java +++ b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/MsgPackMessageConverterProvider.java @@ -15,12 +15,12 @@ */ package com.github.sonus21.rqueue.spring.boot.integration; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; import com.github.sonus21.rqueue.converter.MessageConverterProvider; import java.io.IOException; import java.util.Base64; -import org.msgpack.core.MessageBufferPacker; -import org.msgpack.core.MessagePack; -import org.msgpack.core.MessageUnpacker; +import org.msgpack.jackson.dataformat.MessagePackFactory; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.converter.MessageConversionException; @@ -77,36 +77,21 @@ public Message toMessage(Object payload, MessageHeaders headers) { private static final class MsgPackCodec { + private static final ObjectMapper MAPPER = new ObjectMapper(new MessagePackFactory()); + private MsgPackCodec() {} static byte[] encode(MessagePackageListenerTest.ListenerPayload payload) { - try (MessageBufferPacker packer = MessagePack.newDefaultBufferPacker()) { - packer.packMapHeader(2); - packer.packString("backend"); - packer.packString(payload.getBackend()); - packer.packString("body"); - packer.packString(payload.getBody()); - return packer.toByteArray(); - } catch (IOException e) { + try { + return MAPPER.writeValueAsBytes(payload); + } catch (JsonProcessingException e) { throw new MessageConversionException("MsgPack encoding failed", e); } } static MessagePackageListenerTest.ListenerPayload decode(byte[] bytes) { - try (MessageUnpacker unpacker = MessagePack.newDefaultUnpacker(bytes)) { - int entries = unpacker.unpackMapHeader(); - String backend = null; - String body = null; - for (int i = 0; i < entries; i++) { - String key = unpacker.unpackString(); - String value = unpacker.unpackString(); - if ("backend".equals(key)) { - backend = value; - } else if ("body".equals(key)) { - body = value; - } - } - return new MessagePackageListenerTest.ListenerPayload(backend, body); + try { + return MAPPER.readValue(bytes, MessagePackageListenerTest.ListenerPayload.class); } catch (IOException e) { throw new MessageConversionException("MsgPack decoding failed", e); } From 2c7f5f723ba255dbb739413cc475e6e401ba5aab Mon Sep 17 00:00:00 2001 From: Sonu Kumar Date: Thu, 21 May 2026 23:21:16 +0530 Subject: [PATCH 09/16] Remove unsupported MessagePack listener E2E test --- rqueue-spring-boot-starter/build.gradle | 1 - .../MessagePackageListenerTest.java | 243 ------------------ .../MsgPackMessageConverterProvider.java | 100 ------- 3 files changed, 344 deletions(-) delete mode 100644 rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/MessagePackageListenerTest.java delete mode 100644 rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/MsgPackMessageConverterProvider.java diff --git a/rqueue-spring-boot-starter/build.gradle b/rqueue-spring-boot-starter/build.gradle index f00e045f..ac5dc746 100644 --- a/rqueue-spring-boot-starter/build.gradle +++ b/rqueue-spring-boot-starter/build.gradle @@ -68,7 +68,6 @@ dependencies { testImplementation "org.springframework.boot:spring-boot-starter-data-redis-reactive:${springBootVersion}" testImplementation "org.springframework.boot:spring-boot-starter-data-jpa:${springBootVersion}" testImplementation "org.springframework.boot:spring-boot-devtools:${springBootVersion}" - testImplementation "org.msgpack:jackson-dataformat-msgpack:0.9.12" testImplementation "org.testcontainers:testcontainers:${testcontainersVersion}" testImplementation "org.testcontainers:junit-jupiter:${testcontainersVersion}" } diff --git a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/MessagePackageListenerTest.java b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/MessagePackageListenerTest.java deleted file mode 100644 index 5b5adee2..00000000 --- a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/MessagePackageListenerTest.java +++ /dev/null @@ -1,243 +0,0 @@ -/* - * Copyright (c) 2026 Sonu Kumar - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * You may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and limitations under the License. - * - */ -package com.github.sonus21.rqueue.spring.boot.integration; - -import static org.assertj.core.api.Assertions.assertThat; - -import com.github.sonus21.rqueue.annotation.RqueueListener; -import com.github.sonus21.rqueue.core.RqueueMessage; -import com.github.sonus21.rqueue.core.RqueueMessageEnqueuer; -import com.github.sonus21.rqueue.listener.RqueueMessageHeaders; -import com.github.sonus21.rqueue.test.application.BaseApplication; -import java.util.Objects; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; -import org.junit.jupiter.api.Tag; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.EnumSource; -import org.springframework.boot.WebApplicationType; -import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.boot.builder.SpringApplicationBuilder; -import org.springframework.boot.data.redis.autoconfigure.DataRedisAutoConfiguration; -import org.springframework.boot.data.redis.autoconfigure.DataRedisReactiveAutoConfiguration; -import org.springframework.context.ConfigurableApplicationContext; -import org.springframework.context.annotation.Import; -import org.springframework.messaging.Message; -import org.springframework.messaging.handler.annotation.Header; -import org.springframework.stereotype.Component; - -@Tag("springBootIntegration") -@Tag("integration") -@Tag("springBoot") -class MessagePackageListenerTest { - - private static final String MESSAGE_PACKAGE_QUEUE = "message-package-listener-package"; - private static final String MESSAGE_QUEUE = "message-package-listener-message"; - private static final String NATS_STREAM_PREFIX = "rqueue-js-messagePackageListener-"; - private static final String NATS_SUBJECT_PREFIX = "rqueue.js.messagePackageListener."; - private static final int REDIS_PORT = 8032; - - @ParameterizedTest(name = "{0}") - @EnumSource(BackendUnderTest.class) - void listenerReceivesMsgPackPayloadInsideSpringMessage(BackendUnderTest backend) - throws Exception { - try (ConfigurableApplicationContext context = startContext(backend)) { - TestListener listener = context.getBean(TestListener.class); - ListenerPayload payload = new ListenerPayload(backend.name(), "msgpack-message-package"); - - String messageId = - context.getBean(RqueueMessageEnqueuer.class).enqueue(MESSAGE_PACKAGE_QUEUE, payload); - - assertThat(listener.messagePackageLatch.await(20, TimeUnit.SECONDS)).isTrue(); - assertThat(listener.messagePackage.get()).isNotNull(); - assertThat(listener.messagePackage.get().getPayload()).isEqualTo(payload); - assertRqueueMessage( - listener.messagePackageRqueueMessage.get(), messageId, MESSAGE_PACKAGE_QUEUE, payload); - } - } - - @ParameterizedTest(name = "{0}") - @EnumSource(BackendUnderTest.class) - void listenerReceivesMsgPackPayload(BackendUnderTest backend) throws Exception { - try (ConfigurableApplicationContext context = startContext(backend)) { - TestListener listener = context.getBean(TestListener.class); - ListenerPayload payload = new ListenerPayload(backend.name(), "msgpack-message"); - - String messageId = - context.getBean(RqueueMessageEnqueuer.class).enqueue(MESSAGE_QUEUE, payload); - - assertThat(listener.messageLatch.await(20, TimeUnit.SECONDS)).isTrue(); - assertThat(listener.message.get()).isEqualTo(payload); - assertRqueueMessage(listener.messageRqueueMessage.get(), messageId, MESSAGE_QUEUE, payload); - } - } - - private ConfigurableApplicationContext startContext(BackendUnderTest backend) { - if (backend.isNats()) { - AbstractNatsBootIT.startNats(); - AbstractNatsBootIT.deleteStreamsWithPrefix(NATS_STREAM_PREFIX); - } - return new SpringApplicationBuilder(backend.applicationClass()) - .web(WebApplicationType.NONE) - .properties(backend.properties()) - .run(); - } - - private static void assertRqueueMessage( - RqueueMessage rqueueMessage, - String messageId, - String queueName, - ListenerPayload expectedPayload) { - assertThat(rqueueMessage).isNotNull(); - assertThat(rqueueMessage.getId()).isEqualTo(messageId); - assertThat(rqueueMessage.getQueueName()).isEqualTo(queueName); - assertThat(MsgPackMessageConverterProvider.isMsgPack(rqueueMessage.getMessage())) - .isTrue(); - assertThat(MsgPackMessageConverterProvider.decode(rqueueMessage.getMessage())) - .isEqualTo(expectedPayload); - } - - enum BackendUnderTest { - REDIS(RedisTestApp.class, new String[] { - "rqueue.backend=redis", - "spring.data.redis.host=127.0.0.1", - "spring.data.redis.port=" + REDIS_PORT, - "mysql.db.name=MessagePackageListenerTestRedis", - "use.system.redis=false" - }), - NATS(NatsTestApp.class, new String[] {}); - - private final Class applicationClass; - private final String[] properties; - - BackendUnderTest(Class applicationClass, String[] properties) { - this.applicationClass = applicationClass; - this.properties = properties; - } - - Class applicationClass() { - return applicationClass; - } - - String[] properties() { - String[] common = new String[] { - "rqueue.message.converter.provider.class=" - + MsgPackMessageConverterProvider.class.getName(), - }; - String[] backendProperties = isNats() - ? new String[] { - "rqueue.backend=nats", - "rqueue.nats.naming.stream-prefix=" + NATS_STREAM_PREFIX, - "rqueue.nats.naming.subject-prefix=" + NATS_SUBJECT_PREFIX, - "rqueue.nats.connection.url=" + AbstractNatsBootIT.activeNatsUrl() - } - : properties; - String[] merged = new String[common.length + backendProperties.length]; - System.arraycopy(common, 0, merged, 0, common.length); - System.arraycopy(backendProperties, 0, merged, common.length, backendProperties.length); - return merged; - } - - boolean isNats() { - return this == NATS; - } - } - - @SpringBootApplication - @Import(TestListener.class) - static class RedisTestApp extends BaseApplication {} - - @SpringBootApplication( - exclude = {DataRedisAutoConfiguration.class, DataRedisReactiveAutoConfiguration.class}) - @Import(TestListener.class) - static class NatsTestApp {} - - @Component - static class TestListener { - - final CountDownLatch messagePackageLatch = new CountDownLatch(1); - final CountDownLatch messageLatch = new CountDownLatch(1); - final AtomicReference> messagePackage = new AtomicReference<>(); - final AtomicReference messagePackageRqueueMessage = new AtomicReference<>(); - final AtomicReference message = new AtomicReference<>(); - final AtomicReference messageRqueueMessage = new AtomicReference<>(); - - @RqueueListener(value = MESSAGE_PACKAGE_QUEUE) - void onMessagePackage( - Message message, - @Header(RqueueMessageHeaders.MESSAGE) RqueueMessage rqueueMessage) { - messagePackage.set(message); - messagePackageRqueueMessage.set(rqueueMessage); - messagePackageLatch.countDown(); - } - - @RqueueListener(value = MESSAGE_QUEUE) - void onMessage( - ListenerPayload message, - @Header(RqueueMessageHeaders.MESSAGE) RqueueMessage rqueueMessage) { - this.message.set(message); - messageRqueueMessage.set(rqueueMessage); - messageLatch.countDown(); - } - } - - static class ListenerPayload { - - private String backend; - private String body; - - ListenerPayload() {} - - ListenerPayload(String backend, String body) { - this.backend = backend; - this.body = body; - } - - public String getBackend() { - return backend; - } - - public void setBackend(String backend) { - this.backend = backend; - } - - public String getBody() { - return body; - } - - public void setBody(String body) { - this.body = body; - } - - @Override - public boolean equals(Object other) { - if (this == other) { - return true; - } - if (!(other instanceof ListenerPayload)) { - return false; - } - ListenerPayload that = (ListenerPayload) other; - return Objects.equals(backend, that.backend) && Objects.equals(body, that.body); - } - - @Override - public int hashCode() { - return Objects.hash(backend, body); - } - } -} diff --git a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/MsgPackMessageConverterProvider.java b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/MsgPackMessageConverterProvider.java deleted file mode 100644 index 94f380bf..00000000 --- a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/MsgPackMessageConverterProvider.java +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Copyright (c) 2026 Sonu Kumar - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * You may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and limitations under the License. - * - */ -package com.github.sonus21.rqueue.spring.boot.integration; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.github.sonus21.rqueue.converter.MessageConverterProvider; -import java.io.IOException; -import java.util.Base64; -import org.msgpack.jackson.dataformat.MessagePackFactory; -import org.springframework.messaging.Message; -import org.springframework.messaging.MessageHeaders; -import org.springframework.messaging.converter.MessageConversionException; -import org.springframework.messaging.converter.MessageConverter; -import org.springframework.messaging.support.GenericMessage; - -public class MsgPackMessageConverterProvider implements MessageConverterProvider { - - private static final String PREFIX = "msgpack:"; - - @Override - public MessageConverter getConverter() { - return new MsgPackMessageConverter(); - } - - static boolean isMsgPack(String payload) { - return payload != null && payload.startsWith(PREFIX); - } - - static MessagePackageListenerTest.ListenerPayload decode(String payload) { - if (!isMsgPack(payload)) { - throw new MessageConversionException("Payload is not MsgPack encoded"); - } - return MsgPackCodec.decode(Base64.getDecoder().decode(payload.substring(PREFIX.length()))); - } - - private static class MsgPackMessageConverter implements MessageConverter { - - @Override - public Object fromMessage(Message message, Class targetClass) { - Object payload = message.getPayload(); - if (payload instanceof MessagePackageListenerTest.ListenerPayload) { - return payload; - } - if (!(payload instanceof String)) { - return null; - } - return decode((String) payload); - } - - @Override - public Message toMessage(Object payload, MessageHeaders headers) { - if (payload instanceof MessagePackageListenerTest.ListenerPayload) { - byte[] msgPackBytes = - MsgPackCodec.encode((MessagePackageListenerTest.ListenerPayload) payload); - return new GenericMessage<>(PREFIX + Base64.getEncoder().encodeToString(msgPackBytes)); - } - if (payload instanceof String) { - return new GenericMessage<>(payload); - } - return null; - } - } - - private static final class MsgPackCodec { - - private static final ObjectMapper MAPPER = new ObjectMapper(new MessagePackFactory()); - - private MsgPackCodec() {} - - static byte[] encode(MessagePackageListenerTest.ListenerPayload payload) { - try { - return MAPPER.writeValueAsBytes(payload); - } catch (JsonProcessingException e) { - throw new MessageConversionException("MsgPack encoding failed", e); - } - } - - static MessagePackageListenerTest.ListenerPayload decode(byte[] bytes) { - try { - return MAPPER.readValue(bytes, MessagePackageListenerTest.ListenerPayload.class); - } catch (IOException e) { - throw new MessageConversionException("MsgPack decoding failed", e); - } - } - } -} From 1a5af73bd19a10e95513bf8d3f6f2ec070188865 Mon Sep 17 00:00:00 2001 From: Sonu Kumar Date: Sat, 23 May 2026 12:12:10 +0530 Subject: [PATCH 10/16] fix: max retry --- .github/workflows/java-ci.yaml | 1 + .../listener/PostProcessingHandler.java | 9 +- .../sonus21/rqueue/listener/RetryPolicy.java | 61 +++++ .../rqueue/listener/RqueueExecutor.java | 14 +- .../rqueue/listener/RetryPolicyTest.java | 78 ++++++ ...sageListenerContainerBrokerBranchTest.java | 22 ++ .../nats/JetStreamMessageBrokerUnitTest.java | 9 + .../NatsGlobalRetryLimitE2EIT.java | 102 -------- .../integration/GlobalRetryLimitE2EIT.java | 247 ++++++++++++++++++ .../RedisGlobalRetryLimitE2EIT.java | 82 ------ .../unit/RqueueListenerAutoConfigTest.java | 45 ++++ 11 files changed, 467 insertions(+), 203 deletions(-) create mode 100644 rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RetryPolicy.java create mode 100644 rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/RetryPolicyTest.java delete mode 100644 rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsGlobalRetryLimitE2EIT.java create mode 100644 rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/integration/GlobalRetryLimitE2EIT.java delete mode 100644 rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/integration/RedisGlobalRetryLimitE2EIT.java diff --git a/.github/workflows/java-ci.yaml b/.github/workflows/java-ci.yaml index eb4518b4..f3d450b4 100644 --- a/.github/workflows/java-ci.yaml +++ b/.github/workflows/java-ci.yaml @@ -446,6 +446,7 @@ jobs: - name: Run NATS tests env: + RQUEUE_TEST_BACKEND: nats NATS_RUNNING: "true" NATS_URL: nats://127.0.0.1:4222 run: ./gradlew :rqueue-nats:test :rqueue-spring-boot-starter:test :rqueue-spring:test -DincludeTags=nats diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/PostProcessingHandler.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/PostProcessingHandler.java index 5090f524..824a1537 100644 --- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/PostProcessingHandler.java +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/PostProcessingHandler.java @@ -225,18 +225,11 @@ private void handleRetryExceededMessage(JobImpl job, int failureCount, Throwable } } - private int getMaxRetryCount(RqueueMessage rqueueMessage, QueueDetail queueDetail) { - return rqueueMessage.getRetryCount() == null - ? queueDetail.getNumRetry() - : rqueueMessage.getRetryCount(); - } - private void handleFailure(JobImpl job, int failureCount, Throwable throwable) { if (job.getQueueDetail().isDoNotRetryError(throwable)) { handleRetryExceededMessage(job, failureCount, throwable); } else { - int maxRetryCount = getMaxRetryCount(job.getRqueueMessage(), job.getQueueDetail()); - if (failureCount < maxRetryCount) { + if (!RetryPolicy.isExhausted(job.getRqueueMessage(), job.getQueueDetail(), failureCount)) { long delay = taskExecutionBackoff.nextBackOff( job.getMessage(), job.getRqueueMessage(), failureCount, throwable); if (delay == TaskExecutionBackOff.STOP) { diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RetryPolicy.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RetryPolicy.java new file mode 100644 index 00000000..218e4f00 --- /dev/null +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RetryPolicy.java @@ -0,0 +1,61 @@ +/* + * Copyright (c) 2026 Sonu Kumar + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the License. + * + */ + +package com.github.sonus21.rqueue.listener; + +import com.github.sonus21.rqueue.config.RqueueConfig; +import com.github.sonus21.rqueue.core.RqueueMessage; +import lombok.AccessLevel; +import lombok.NoArgsConstructor; + +@NoArgsConstructor(access = AccessLevel.PRIVATE) +final class RetryPolicy { + + static final int UNLIMITED_RETRY_LIMIT = 100_000; + + static int maxRetryCount(RqueueMessage rqueueMessage, QueueDetail queueDetail) { + int maxRetryCount = rqueueMessage.getRetryCount() == null + ? queueDetail.getNumRetry() + : rqueueMessage.getRetryCount(); + if (maxRetryCount == Integer.MAX_VALUE) { + return UNLIMITED_RETRY_LIMIT; + } + return maxRetryCount; + } + + static int remainingRetryCount( + RqueueMessage rqueueMessage, QueueDetail queueDetail, int failureCount) { + int maxRetryCount = maxRetryCount(rqueueMessage, queueDetail); + return Math.max(0, maxRetryCount - failureCount); + } + + static int retryCountForPoll( + RqueueConfig rqueueConfig, + RqueueMessage rqueueMessage, + QueueDetail queueDetail, + int failureCount) { + int remainingRetryCount = remainingRetryCount(rqueueMessage, queueDetail, failureCount); + if (rqueueConfig.getRetryPerPoll() == -1) { + return remainingRetryCount; + } + return Math.min(rqueueConfig.getRetryPerPoll(), remainingRetryCount); + } + + static boolean isExhausted( + RqueueMessage rqueueMessage, QueueDetail queueDetail, int failureCount) { + return remainingRetryCount(rqueueMessage, queueDetail, failureCount) == 0; + } +} diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RqueueExecutor.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RqueueExecutor.java index ad75f4d1..d72dfc0f 100644 --- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RqueueExecutor.java +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RqueueExecutor.java @@ -111,12 +111,6 @@ private void init() { this.failureCount = job.getRqueueMessage().getFailureCount(); } - private int getMaxRetryCount() { - return Objects.isNull(job.getRqueueMessage().getRetryCount()) - ? job.getQueueDetail().getNumRetry() - : job.getRqueueMessage().getRetryCount(); - } - private void updateCounter(boolean fail) { RqueueMetricsCounter counter = beanProvider.getRqueueMetricsCounter(); if (Objects.isNull(counter)) { @@ -179,11 +173,8 @@ private boolean isOldMessage() { } private int getRetryCount() { - int maxRetry = getMaxRetryCount(); - if (beanProvider.getRqueueConfig().getRetryPerPoll() == -1) { - return maxRetry; - } - return Math.min(beanProvider.getRqueueConfig().getRetryPerPoll(), maxRetry); + return RetryPolicy.retryCountForPoll( + beanProvider.getRqueueConfig(), job.getRqueueMessage(), job.getQueueDetail(), failureCount); } private boolean queueInactive() { @@ -283,6 +274,7 @@ private void execute() { private boolean shouldRetry(long maxProcessingTime, int retryCount, int failureCount) { if (retryCount > 0 && ExecutionStatus.FAILED.equals(status) + && !RetryPolicy.isExhausted(rqueueMessage, queueDetail, failureCount) && System.currentTimeMillis() < maxProcessingTime) { boolean doNoRetry = queueDetail.isDoNotRetryError(error); // it should not be retried based on the exception list diff --git a/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/RetryPolicyTest.java b/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/RetryPolicyTest.java new file mode 100644 index 00000000..4d163bdf --- /dev/null +++ b/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/RetryPolicyTest.java @@ -0,0 +1,78 @@ +/* + * Copyright (c) 2026 Sonu Kumar + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the License. + * + */ + +package com.github.sonus21.rqueue.listener; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.doReturn; + +import com.github.sonus21.rqueue.CoreUnitTest; +import com.github.sonus21.rqueue.config.RqueueConfig; +import com.github.sonus21.rqueue.core.RqueueMessage; +import com.github.sonus21.rqueue.utils.TestUtils; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +@CoreUnitTest +class RetryPolicyTest { + + private final QueueDetail queueDetail = TestUtils.createQueueDetail("queue", 2, 900000L, null); + + @Test + void retryCountForPollUsesRemainingRetryBudget() { + RqueueConfig rqueueConfig = Mockito.mock(RqueueConfig.class); + doReturn(100).when(rqueueConfig).getRetryPerPoll(); + RqueueMessage rqueueMessage = new RqueueMessage(); + + assertEquals(1, RetryPolicy.retryCountForPoll(rqueueConfig, rqueueMessage, queueDetail, 1)); + } + + @Test + void retryCountForPollKeepsExplicitMessageRetryCount() { + RqueueConfig rqueueConfig = Mockito.mock(RqueueConfig.class); + doReturn(-1).when(rqueueConfig).getRetryPerPoll(); + RqueueMessage rqueueMessage = RqueueMessage.builder().retryCount(1000).build(); + + assertEquals( + 999, RetryPolicy.retryCountForPoll(rqueueConfig, rqueueMessage, queueDetail, 1)); + } + + @Test + void isExhaustedUsesEffectiveRetryCount() { + RqueueMessage rqueueMessage = new RqueueMessage(); + + assertFalse(RetryPolicy.isExhausted(rqueueMessage, queueDetail, 1)); + assertTrue(RetryPolicy.isExhausted(rqueueMessage, queueDetail, 2)); + } + + @Test + void retryForeverSentinelUsesFiniteLimit() { + QueueDetail retryForeverQueue = + TestUtils.createQueueDetail("queue", Integer.MAX_VALUE, 900000L, null); + RqueueMessage rqueueMessage = new RqueueMessage(); + + assertEquals(RetryPolicy.UNLIMITED_RETRY_LIMIT, RetryPolicy.maxRetryCount( + rqueueMessage, retryForeverQueue)); + assertEquals( + 1, + RetryPolicy.remainingRetryCount( + rqueueMessage, retryForeverQueue, RetryPolicy.UNLIMITED_RETRY_LIMIT - 1)); + assertTrue(RetryPolicy.isExhausted( + rqueueMessage, retryForeverQueue, RetryPolicy.UNLIMITED_RETRY_LIMIT)); + } +} diff --git a/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainerBrokerBranchTest.java b/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainerBrokerBranchTest.java index f407ed9e..1e0f1ea5 100644 --- a/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainerBrokerBranchTest.java +++ b/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainerBrokerBranchTest.java @@ -228,6 +228,24 @@ void redisDefaultsBrokerAlsoUsesNormalStartQueuePath() throws Exception { } } + @Test + void startDoesNotStartQueuesAgainWhenContainerIsAlreadyRunning() throws Exception { + EndpointRegistry.delete(); + CountingBroker broker = new CountingBroker(Capabilities.REDIS_DEFAULTS); + TrackingContainer container = new TrackingContainer(messageHandler); + container.setMessageBroker(broker); + container.afterPropertiesSet(); + try { + container.start(); + container.start(); + + assertEquals(1, container.startQueueCalls.get() + container.startGroupCalls.get()); + } finally { + container.stop(); + container.destroy(); + } + } + @Test void pollerForwardsPollingIntervalAsBrokerFetchWait() throws Exception { EndpointRegistry.delete(); @@ -288,6 +306,8 @@ private class TrackingContainer extends RqueueMessageListenerContainer { final AtomicBoolean startBrokerPollersCalled = new AtomicBoolean(); final AtomicBoolean startQueueCalled = new AtomicBoolean(); final AtomicBoolean startGroupCalled = new AtomicBoolean(); + final AtomicInteger startQueueCalls = new AtomicInteger(); + final AtomicInteger startGroupCalls = new AtomicInteger(); TrackingContainer(RqueueMessageHandler handler) { super(handler, rqueueMessageTemplate); @@ -297,12 +317,14 @@ private class TrackingContainer extends RqueueMessageListenerContainer { @Override protected void startQueue(String pollerKey, QueueDetail queueDetail) { startQueueCalled.set(true); + startQueueCalls.incrementAndGet(); // Do not actually start the poller; it would need a real broker. } @Override protected void startGroup(String groupName, List queueDetails) { startGroupCalled.set(true); + startGroupCalls.incrementAndGet(); } } } diff --git a/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/JetStreamMessageBrokerUnitTest.java b/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/JetStreamMessageBrokerUnitTest.java index 62b1a80c..5430ffa7 100644 --- a/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/JetStreamMessageBrokerUnitTest.java +++ b/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/JetStreamMessageBrokerUnitTest.java @@ -101,6 +101,15 @@ void getPollWait_usesConfiguredFetchWait() { assertEquals(Duration.ofSeconds(9), f.broker.getPollWait(Duration.ofMillis(137))); } + @Test + void getPollWait_fallsBackToPollingIntervalWhenFetchWaitIsUnset() { + RqueueNatsConfig cfg = RqueueNatsConfig.defaults().setDefaultFetchWait(null); + Fixture f = newFixture(cfg); + Duration pollingInterval = Duration.ofMillis(137); + + assertEquals(pollingInterval, f.broker.getPollWait(pollingInterval)); + } + @Test void enqueueWithPriority_appendsPrioritySuffixToSubject() throws Exception { Fixture f = newFixture(RqueueNatsConfig.defaults()); diff --git a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsGlobalRetryLimitE2EIT.java b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsGlobalRetryLimitE2EIT.java deleted file mode 100644 index 918cfa61..00000000 --- a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsGlobalRetryLimitE2EIT.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Copyright (c) 2026 Sonu Kumar - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * You may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and limitations under the License. - * - */ -package com.github.sonus21.rqueue.spring.boot.integration; - -import static org.assertj.core.api.Assertions.assertThat; - -import com.github.sonus21.rqueue.annotation.RqueueListener; -import com.github.sonus21.rqueue.core.RqueueMessageEnqueuer; -import io.nats.client.JetStreamManagement; -import java.time.Duration; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import org.awaitility.Awaitility; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Tag; -import org.junit.jupiter.api.Test; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.boot.data.redis.autoconfigure.DataRedisAutoConfiguration; -import org.springframework.boot.data.redis.autoconfigure.DataRedisReactiveAutoConfiguration; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.context.annotation.Import; -import org.springframework.stereotype.Component; - -@SpringBootTest( - classes = NatsGlobalRetryLimitE2EIT.TestApp.class, - properties = { - "rqueue.backend=nats", - "rqueue.retry.max=2", - "rqueue.retry.per.poll=1000", - "rqueue.nats.naming.stream-prefix=" + NatsGlobalRetryLimitE2EIT.STREAM_PREFIX, - "rqueue.nats.naming.subject-prefix=" + NatsGlobalRetryLimitE2EIT.SUBJECT_PREFIX - }) -@Tag("nats") -class NatsGlobalRetryLimitE2EIT extends AbstractNatsBootIT { - - static final String QUEUE = "global-retry-nats"; - static final String STREAM_PREFIX = "rqueue-js-globalRetryE2E-"; - static final String SUBJECT_PREFIX = "rqueue.js.globalRetryE2E."; - - @BeforeAll - static void wipeOwnedStreams() { - deleteStreamsWithPrefix(STREAM_PREFIX); - } - - @Autowired - RqueueMessageEnqueuer enqueuer; - - @Autowired - JetStreamManagement jsm; - - @Autowired - FailingListener listener; - - @Test - void globalRetryLimitCapsImplicitRetryForever() throws Exception { - enqueuer.enqueue(QUEUE, "payload"); - - assertThat(listener.twoAttempts.await(20, TimeUnit.SECONDS)).isTrue(); - Awaitility.await() - .during(Duration.ofSeconds(2)) - .atMost(Duration.ofSeconds(3)) - .untilAsserted(() -> assertThat(listener.attempts).hasValue(2)); - - assertThat(jsm.getConsumerInfo(STREAM_PREFIX + QUEUE, QUEUE + "-consumer") - .getConsumerConfiguration() - .getMaxDeliver()) - .isEqualTo(3L); - } - - @SpringBootApplication( - exclude = {DataRedisAutoConfiguration.class, DataRedisReactiveAutoConfiguration.class}) - @Import(FailingListener.class) - static class TestApp {} - - @Component - static class FailingListener { - final AtomicInteger attempts = new AtomicInteger(); - final CountDownLatch twoAttempts = new CountDownLatch(2); - - @RqueueListener(value = QUEUE) - void onMessage(String ignored) { - attempts.incrementAndGet(); - twoAttempts.countDown(); - throw new IllegalStateException("force retry"); - } - } -} diff --git a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/integration/GlobalRetryLimitE2EIT.java b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/integration/GlobalRetryLimitE2EIT.java new file mode 100644 index 00000000..2144e785 --- /dev/null +++ b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/integration/GlobalRetryLimitE2EIT.java @@ -0,0 +1,247 @@ +/* + * Copyright (c) 2026 Sonu Kumar + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the License. + * + */ +package com.github.sonus21.rqueue.spring.boot.tests.integration; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.github.sonus21.rqueue.annotation.RqueueListener; +import com.github.sonus21.rqueue.config.RqueueConfig; +import com.github.sonus21.rqueue.config.SimpleRqueueListenerContainerFactory; +import com.github.sonus21.rqueue.core.RqueueMessageEnqueuer; +import com.github.sonus21.rqueue.spring.boot.tests.SpringBootIntegrationTest; +import com.github.sonus21.rqueue.test.application.BaseApplication; +import com.github.sonus21.rqueue.utils.backoff.FixedTaskExecutionBackOff; +import io.nats.client.Connection; +import io.nats.client.JetStreamApiException; +import io.nats.client.JetStreamManagement; +import io.nats.client.Nats; +import java.io.IOException; +import java.time.Duration; +import java.util.List; +import java.util.Locale; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.logging.Level; +import java.util.logging.Logger; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.Assumptions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; +import org.springframework.test.context.DynamicPropertyRegistry; +import org.springframework.test.context.DynamicPropertySource; +import org.testcontainers.DockerClientFactory; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.utility.DockerImageName; + +@SpringBootTest(classes = GlobalRetryLimitE2EIT.TestApp.class) +@SpringBootIntegrationTest +@Tag("nats") +class GlobalRetryLimitE2EIT { + + private static final Logger log = Logger.getLogger(GlobalRetryLimitE2EIT.class.getName()); + private static final String BACKEND = + System.getProperty( + "rqueue.test.backend", System.getenv().getOrDefault("RQUEUE_TEST_BACKEND", "redis")) + .toLowerCase(Locale.ROOT); + private static final String QUEUE = "global-retry-" + BACKEND; + private static final String STREAM_PREFIX = "rqueue-js-globalRetryE2E-"; + private static final String SUBJECT_PREFIX = "rqueue.js.globalRetryE2E."; + private static final String EXTERNAL_NATS_URL = + System.getenv().getOrDefault("NATS_URL", "nats://127.0.0.1:4222"); + private static final boolean USE_EXTERNAL_NATS = System.getenv("NATS_RUNNING") != null; + + private static GenericContainer nats; + + @Autowired + RqueueMessageEnqueuer enqueuer; + + @Autowired + FailingListener listener; + + @Autowired + RqueueConfig rqueueConfig; + + @Autowired(required = false) + JetStreamManagement jsm; + + @BeforeAll + static void bootstrapBackend() { + if (isNatsBackend()) { + startNats(); + deleteStreamsWithPrefix(STREAM_PREFIX); + } + } + + @DynamicPropertySource + static void properties(DynamicPropertyRegistry registry) { + registry.add("rqueue.backend", () -> BACKEND); + registry.add("rqueue.retry.max", () -> "2"); + registry.add("rqueue.retry.per.poll", () -> "1"); + registry.add("global.retry.limit.queue", () -> QUEUE); + if (isNatsBackend()) { + registry.add("rqueue.nats.connection.url", GlobalRetryLimitE2EIT::activeNatsUrl); + registry.add("rqueue.nats.naming.stream-prefix", () -> STREAM_PREFIX); + registry.add("rqueue.nats.naming.subject-prefix", () -> SUBJECT_PREFIX); + } else { + registry.add("spring.data.redis.port", () -> "8027"); + registry.add("mysql.db.name", () -> "GlobalRetryLimitE2EIT"); + registry.add("use.system.redis", () -> "false"); + } + } + + @BeforeEach + void resetListener() { + listener.reset(); + rqueueConfig.setRetryPerPoll(1); + } + + @Test + void globalRetryLimitCapsSimpleEnqueueWhenRetryPerPollIsOne() throws Exception { + enqueuer.enqueue(QUEUE, "payload"); + + assertTwoAttemptsOnly(); + if (isNatsBackend()) { + assertThat(jsm).isNotNull(); + assertThat(jsm.getConsumerInfo(STREAM_PREFIX + QUEUE, QUEUE + "-consumer") + .getConsumerConfiguration() + .getMaxDeliver()) + .isEqualTo(3L); + } + } + + @Test + void globalRetryLimitCapsSimpleEnqueueWhenRetryPerPollIsHigh() throws Exception { + rqueueConfig.setRetryPerPoll(100); + + enqueuer.enqueue(QUEUE, "payload"); + + assertTwoAttemptsOnly(); + } + + @Test + void globalRetryLimitUsesRemainingRetriesWhenRetryPerPollIncreases() throws Exception { + enqueuer.enqueue(QUEUE, "payload"); + + assertThat(listener.firstAttempt.await(20, TimeUnit.SECONDS)).isTrue(); + rqueueConfig.setRetryPerPoll(100); + + assertTwoAttemptsOnly(); + } + + private void assertTwoAttemptsOnly() throws InterruptedException { + assertThat(listener.twoAttempts.await(20, TimeUnit.SECONDS)).isTrue(); + Awaitility.await() + .during(Duration.ofMillis(600)) + .atMost(Duration.ofSeconds(3)) + .untilAsserted(() -> assertThat(listener.attempts).hasValue(2)); + } + + private static boolean isNatsBackend() { + return "nats".equalsIgnoreCase(BACKEND); + } + + private static void startNats() { + if (!isNatsBackend() || USE_EXTERNAL_NATS || nats != null) { + return; + } + Assumptions.assumeTrue( + DockerClientFactory.instance().isDockerAvailable(), + "Skipping: Docker is not available and NATS_RUNNING is not set"); + nats = new GenericContainer<>(DockerImageName.parse("nats:2.12-alpine")) + .withCommand("-js") + .withExposedPorts(4222) + .waitingFor(Wait.forLogMessage(".*Server is ready.*\\n", 1)); + nats.start(); + Runtime.getRuntime().addShutdownHook(new Thread(nats::stop)); + } + + private static String activeNatsUrl() { + if (USE_EXTERNAL_NATS) { + return EXTERNAL_NATS_URL; + } + startNats(); + return "nats://" + nats.getHost() + ":" + nats.getMappedPort(4222); + } + + private static void deleteStreamsWithPrefix(String prefix) { + try (Connection c = Nats.connect(activeNatsUrl())) { + JetStreamManagement management = c.jetStreamManagement(); + List names = management.getStreamNames(); + if (names == null) { + return; + } + for (String name : names) { + if (name.startsWith(prefix)) { + management.deleteStream(name); + } + } + } catch (IOException | JetStreamApiException e) { + log.log(Level.WARNING, "Failed to clean NATS streams: " + e.getMessage()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.log(Level.WARNING, "Failed to clean NATS streams: " + e.getMessage()); + } + } + + @SpringBootApplication + @Import({FailingListener.class, RedisTestConfig.class}) + static class TestApp { + + @Bean + public SimpleRqueueListenerContainerFactory simpleRqueueListenerContainerFactory() { + FixedTaskExecutionBackOff backOff = new FixedTaskExecutionBackOff(); + backOff.setInterval(100); + SimpleRqueueListenerContainerFactory factory = new SimpleRqueueListenerContainerFactory(); + factory.setTaskExecutionBackOff(backOff); + return factory; + } + } + + @Configuration + @ConditionalOnProperty(name = "rqueue.backend", havingValue = "redis", matchIfMissing = true) + static class RedisTestConfig extends BaseApplication {} + + static class FailingListener { + final AtomicInteger attempts = new AtomicInteger(); + CountDownLatch firstAttempt = new CountDownLatch(1); + CountDownLatch twoAttempts = new CountDownLatch(2); + + void reset() { + attempts.set(0); + firstAttempt = new CountDownLatch(1); + twoAttempts = new CountDownLatch(2); + } + + @RqueueListener(value = "${global.retry.limit.queue}") + void onMessage(String ignored) { + attempts.incrementAndGet(); + firstAttempt.countDown(); + twoAttempts.countDown(); + throw new IllegalStateException("force retry"); + } + } +} diff --git a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/integration/RedisGlobalRetryLimitE2EIT.java b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/integration/RedisGlobalRetryLimitE2EIT.java deleted file mode 100644 index d0df932e..00000000 --- a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/integration/RedisGlobalRetryLimitE2EIT.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Copyright (c) 2026 Sonu Kumar - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * You may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and limitations under the License. - * - */ -package com.github.sonus21.rqueue.spring.boot.tests.integration; - -import static org.assertj.core.api.Assertions.assertThat; - -import com.github.sonus21.rqueue.annotation.RqueueListener; -import com.github.sonus21.rqueue.core.RqueueMessageEnqueuer; -import com.github.sonus21.rqueue.spring.boot.tests.SpringBootIntegrationTest; -import com.github.sonus21.rqueue.test.application.BaseApplication; -import java.time.Duration; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import org.awaitility.Awaitility; -import org.junit.jupiter.api.Test; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.context.annotation.Import; -import org.springframework.test.context.TestPropertySource; - -@SpringBootTest(classes = RedisGlobalRetryLimitE2EIT.TestApp.class) -@TestPropertySource( - properties = { - "rqueue.retry.max=2", - "rqueue.retry.per.poll=1000", - "spring.data.redis.port=8027", - "mysql.db.name=RedisGlobalRetryLimitE2EIT", - "use.system.redis=false" - }) -@SpringBootIntegrationTest -class RedisGlobalRetryLimitE2EIT { - - static final String QUEUE = "global-retry-redis"; - - @Autowired - RqueueMessageEnqueuer enqueuer; - - @Autowired - FailingListener listener; - - @Test - void globalRetryLimitCapsImplicitRetryForever() throws Exception { - enqueuer.enqueue(QUEUE, "payload"); - - assertThat(listener.twoAttempts.await(20, TimeUnit.SECONDS)).isTrue(); - Awaitility.await() - .during(Duration.ofSeconds(2)) - .atMost(Duration.ofSeconds(3)) - .untilAsserted(() -> assertThat(listener.attempts).hasValue(2)); - } - - @SpringBootApplication - @Import(FailingListener.class) - static class TestApp extends BaseApplication {} - - static class FailingListener { - final AtomicInteger attempts = new AtomicInteger(); - final CountDownLatch twoAttempts = new CountDownLatch(2); - - @RqueueListener(value = QUEUE) - void onMessage(String ignored) { - attempts.incrementAndGet(); - twoAttempts.countDown(); - throw new IllegalStateException("force retry"); - } - } -} diff --git a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/unit/RqueueListenerAutoConfigTest.java b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/unit/RqueueListenerAutoConfigTest.java index 2f572c19..5b8f4686 100644 --- a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/unit/RqueueListenerAutoConfigTest.java +++ b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/unit/RqueueListenerAutoConfigTest.java @@ -20,6 +20,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.when; @@ -133,6 +134,41 @@ void rqueueMessageListenerContainer() assertTrue(factory.getAutoStartup()); } + @Test + void rqueueMessageListenerContainerKeepsAutoStartupWhenLifecycleMissing() + throws IllegalAccessException { + SimpleRqueueListenerContainerFactory factory = new SimpleRqueueListenerContainerFactory(); + factory.setMessageConverterProvider(new DefaultMessageConverterProvider()); + factory.setMessageBroker(messageBroker); + RqueueListenerAutoConfig messageAutoConfig = new RqueueListenerAutoConfig(); + FieldUtils.writeField(messageAutoConfig, "simpleRqueueListenerContainerFactory", factory, true); + + RqueueMessageListenerContainer container = + messageAutoConfig.rqueueMessageListenerContainer(rqueueMessageHandler, messageBroker); + + assertTrue(container.isAutoStartup()); + assertTrue(factory.getAutoStartup()); + assertSame(messageBroker, factory.getMessageBroker()); + } + + @Test + void rqueueMessageListenerContainerRestoresFactoryAutoStartupWhenCreationFails() + throws IllegalAccessException { + FailingListenerContainerFactory factory = new FailingListenerContainerFactory(); + RqueueListenerAutoConfig messageAutoConfig = new RqueueListenerAutoConfig(); + FieldUtils.writeField(messageAutoConfig, "simpleRqueueListenerContainerFactory", factory, true); + FieldUtils.writeField( + messageAutoConfig, "rqueueAutoStartupLifecycle", new RqueueAutoStartupLifecycle(), true); + + IllegalStateException exception = assertThrows( + IllegalStateException.class, + () -> + messageAutoConfig.rqueueMessageListenerContainer(rqueueMessageHandler, messageBroker)); + + assertEquals("boom", exception.getMessage()); + assertTrue(factory.getAutoStartup()); + } + @Test void autoStartupLifecycleDelaysOnlyAutoStartupContainers() { RqueueAutoStartupLifecycle lifecycle = new RqueueAutoStartupLifecycle(); @@ -193,6 +229,15 @@ void rqueueMessageSenderUsesConfiguredMessageConverter() throws IllegalAccessExc assertTrue(converter.hashCode() == messageConverter.hashCode()); } + private static class FailingListenerContainerFactory + extends SimpleRqueueListenerContainerFactory { + + @Override + public RqueueMessageListenerContainer createMessageListenerContainer() { + throw new IllegalStateException("boom"); + } + } + private class TestRqueueMessageListenerContainer extends RqueueMessageListenerContainer { private int startCount; From 83b235bd4f7495270c43a26bc1bd9ac9ee7d96c9 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Sat, 23 May 2026 06:43:15 +0000 Subject: [PATCH 11/16] Apply Palantir Java Format --- .../github/sonus21/rqueue/listener/RetryPolicyTest.java | 8 ++++---- .../boot/tests/integration/GlobalRetryLimitE2EIT.java | 7 +++---- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/RetryPolicyTest.java b/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/RetryPolicyTest.java index 4d163bdf..0eb47e6f 100644 --- a/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/RetryPolicyTest.java +++ b/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/RetryPolicyTest.java @@ -48,8 +48,7 @@ void retryCountForPollKeepsExplicitMessageRetryCount() { doReturn(-1).when(rqueueConfig).getRetryPerPoll(); RqueueMessage rqueueMessage = RqueueMessage.builder().retryCount(1000).build(); - assertEquals( - 999, RetryPolicy.retryCountForPoll(rqueueConfig, rqueueMessage, queueDetail, 1)); + assertEquals(999, RetryPolicy.retryCountForPoll(rqueueConfig, rqueueMessage, queueDetail, 1)); } @Test @@ -66,8 +65,9 @@ void retryForeverSentinelUsesFiniteLimit() { TestUtils.createQueueDetail("queue", Integer.MAX_VALUE, 900000L, null); RqueueMessage rqueueMessage = new RqueueMessage(); - assertEquals(RetryPolicy.UNLIMITED_RETRY_LIMIT, RetryPolicy.maxRetryCount( - rqueueMessage, retryForeverQueue)); + assertEquals( + RetryPolicy.UNLIMITED_RETRY_LIMIT, + RetryPolicy.maxRetryCount(rqueueMessage, retryForeverQueue)); assertEquals( 1, RetryPolicy.remainingRetryCount( diff --git a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/integration/GlobalRetryLimitE2EIT.java b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/integration/GlobalRetryLimitE2EIT.java index 2144e785..ac86770a 100644 --- a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/integration/GlobalRetryLimitE2EIT.java +++ b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/integration/GlobalRetryLimitE2EIT.java @@ -63,10 +63,9 @@ class GlobalRetryLimitE2EIT { private static final Logger log = Logger.getLogger(GlobalRetryLimitE2EIT.class.getName()); - private static final String BACKEND = - System.getProperty( - "rqueue.test.backend", System.getenv().getOrDefault("RQUEUE_TEST_BACKEND", "redis")) - .toLowerCase(Locale.ROOT); + private static final String BACKEND = System.getProperty( + "rqueue.test.backend", System.getenv().getOrDefault("RQUEUE_TEST_BACKEND", "redis")) + .toLowerCase(Locale.ROOT); private static final String QUEUE = "global-retry-" + BACKEND; private static final String STREAM_PREFIX = "rqueue-js-globalRetryE2E-"; private static final String SUBJECT_PREFIX = "rqueue.js.globalRetryE2E."; From 99b2e989b06a272168efdf7295077d05c5a1a2e9 Mon Sep 17 00:00:00 2001 From: Sonu Kumar Date: Sun, 24 May 2026 00:12:36 +0530 Subject: [PATCH 12/16] Consolidate backend contract E2E tests --- .../integration/NatsBackendEndToEndIT.java | 107 ------- .../integration/NatsConcurrencyE2EIT.java | 101 ------- .../integration/NatsPriorityQueuesE2EIT.java | 100 ------- .../integration/NatsReactiveEnqueueE2EIT.java | 99 ------- .../integration/BackendContractE2EIT.java | 278 ++++++++++++++++++ 5 files changed, 278 insertions(+), 407 deletions(-) delete mode 100644 rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsBackendEndToEndIT.java delete mode 100644 rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsConcurrencyE2EIT.java delete mode 100644 rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsPriorityQueuesE2EIT.java delete mode 100644 rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsReactiveEnqueueE2EIT.java create mode 100644 rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/integration/BackendContractE2EIT.java diff --git a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsBackendEndToEndIT.java b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsBackendEndToEndIT.java deleted file mode 100644 index cadadf01..00000000 --- a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsBackendEndToEndIT.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Copyright (c) 2026 Sonu Kumar - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * You may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and limitations under the License. - * - */ -package com.github.sonus21.rqueue.spring.boot.integration; - -import static org.assertj.core.api.Assertions.assertThat; - -import com.github.sonus21.rqueue.annotation.RqueueListener; -import com.github.sonus21.rqueue.core.RqueueMessageEnqueuer; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Tag; -import org.junit.jupiter.api.Test; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.boot.data.redis.autoconfigure.DataRedisAutoConfiguration; -import org.springframework.boot.data.redis.autoconfigure.DataRedisReactiveAutoConfiguration; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.context.annotation.Import; -import org.springframework.stereotype.Component; - -/** - * End-to-end integration test wiring a Spring Boot application against a NATS JetStream - * instance via {@code rqueue.backend=nats}, an {@link RqueueListener}, and the default - * {@link RqueueMessageEnqueuer}. It exercises the full intended path: - * - *
- *   Enqueue -> JetStreamMessageBroker.enqueue -> JetStream stream
- *           -> BrokerMessagePoller.pop -> @RqueueListener invocation -> broker.ack
- * 
- * - *

The NATS instance is supplied by {@link AbstractNatsBootIT}: when {@code NATS_RUNNING=true} - * (CI), the test connects to a locally running nats-server; otherwise it falls back to a - * Testcontainers-managed container, which itself skips gracefully without Docker. - * - *

Boots without any Redis at all: every Redis-shaped bean (config DAOs, dashboard controllers, - * pub/sub channel, schedulers) is gated by {@code @Conditional(RedisBackendCondition.class)} and - * stays out of the context when {@code rqueue.backend=nats}. {@code DataRedisAutoConfiguration} - * is excluded so Spring Boot doesn't try to wire a Lettuce client either. - */ -@SpringBootTest( - classes = NatsBackendEndToEndIT.TestApp.class, - properties = { - "rqueue.backend=nats", - "rqueue.nats.naming.stream-prefix=" + NatsBackendEndToEndIT.STREAM_PREFIX, - "rqueue.nats.naming.subject-prefix=" + NatsBackendEndToEndIT.SUBJECT_PREFIX - }) -@Tag("nats") -class NatsBackendEndToEndIT extends AbstractNatsBootIT { - - static final String STREAM_PREFIX = "rqueue-js-backendE2E-"; - static final String SUBJECT_PREFIX = "rqueue.js.backendE2E."; - - @BeforeAll - static void wipeOwnedStreams() { - deleteStreamsWithPrefix(STREAM_PREFIX); - } - - @Autowired - RqueueMessageEnqueuer enqueuer; - - @Autowired - TestListener listener; - - @Test - void enqueueIsReceivedByListener() throws Exception { - for (int i = 0; i < 5; i++) { - enqueuer.enqueue("e2e-test", "payload-" + i); - } - assertThat(listener.latch.await(20, TimeUnit.SECONDS)).isTrue(); - assertThat(listener.received) - .containsExactlyInAnyOrder("payload-0", "payload-1", "payload-2", "payload-3", "payload-4"); - } - - @SpringBootApplication( - exclude = {DataRedisAutoConfiguration.class, DataRedisReactiveAutoConfiguration.class}) - @Import(TestListener.class) - static class TestApp {} - - @Component - static class TestListener { - final CountDownLatch latch = new CountDownLatch(5); - final List received = Collections.synchronizedList(new ArrayList<>()); - - @RqueueListener(value = "e2e-test") - void onMessage(String payload) { - received.add(payload); - latch.countDown(); - } - } -} diff --git a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsConcurrencyE2EIT.java b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsConcurrencyE2EIT.java deleted file mode 100644 index 6076017d..00000000 --- a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsConcurrencyE2EIT.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Copyright (c) 2026 Sonu Kumar - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * You may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and limitations under the License. - * - */ -package com.github.sonus21.rqueue.spring.boot.integration; - -import static org.assertj.core.api.Assertions.assertThat; - -import com.github.sonus21.rqueue.annotation.RqueueListener; -import com.github.sonus21.rqueue.core.RqueueMessageEnqueuer; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Tag; -import org.junit.jupiter.api.Test; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.boot.data.redis.autoconfigure.DataRedisAutoConfiguration; -import org.springframework.boot.data.redis.autoconfigure.DataRedisReactiveAutoConfiguration; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.context.annotation.Import; -import org.springframework.stereotype.Component; - -/** - * End-to-end test confirming that {@code @RqueueListener(concurrency=...)} actually runs more - * than one handler invocation in parallel against the NATS backend. We don't assert an exact - * parallelism value because JetStream prefetch + thread scheduling makes that flaky; observing - * any parallelism > 1 is enough proof the concurrency knob is wired through to a pull - * subscription with multiple poller threads. - */ -@SpringBootTest( - classes = NatsConcurrencyE2EIT.TestApp.class, - properties = { - "rqueue.backend=nats", - "rqueue.nats.naming.stream-prefix=" + NatsConcurrencyE2EIT.STREAM_PREFIX, - "rqueue.nats.naming.subject-prefix=" + NatsConcurrencyE2EIT.SUBJECT_PREFIX - }) -@Tag("nats") -class NatsConcurrencyE2EIT extends AbstractNatsBootIT { - - static final String STREAM_PREFIX = "rqueue-js-concurrencyE2E-"; - static final String SUBJECT_PREFIX = "rqueue.js.concurrencyE2E."; - - @BeforeAll - static void wipeOwnedStreams() { - deleteStreamsWithPrefix(STREAM_PREFIX); - } - - @Autowired - RqueueMessageEnqueuer enqueuer; - - @Autowired - ConcurrencyListener listener; - - @Test - void parallelInvocationsAreObserved() throws Exception { - for (int i = 0; i < 30; i++) { - enqueuer.enqueue("conc-e2e", "msg-" + i); - } - assertThat(listener.latch.await(45, TimeUnit.SECONDS)).isTrue(); - assertThat(listener.maxParallel.get()) - .as("at least 2 concurrent invocations should have been observed") - .isGreaterThanOrEqualTo(2); - } - - @SpringBootApplication( - exclude = {DataRedisAutoConfiguration.class, DataRedisReactiveAutoConfiguration.class}) - @Import(ConcurrencyListener.class) - static class TestApp {} - - @Component - static class ConcurrencyListener { - final CountDownLatch latch = new CountDownLatch(30); - final AtomicInteger active = new AtomicInteger(); - final AtomicInteger maxParallel = new AtomicInteger(); - - @RqueueListener(value = "conc-e2e", concurrency = "3") - void onMessage(String payload) throws InterruptedException { - int now = active.incrementAndGet(); - maxParallel.updateAndGet(curr -> Math.max(curr, now)); - try { - Thread.sleep(200L); - } finally { - active.decrementAndGet(); - latch.countDown(); - } - } - } -} diff --git a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsPriorityQueuesE2EIT.java b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsPriorityQueuesE2EIT.java deleted file mode 100644 index f4a62f5f..00000000 --- a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsPriorityQueuesE2EIT.java +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Copyright (c) 2026 Sonu Kumar - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * You may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and limitations under the License. - * - */ -package com.github.sonus21.rqueue.spring.boot.integration; - -import static org.assertj.core.api.Assertions.assertThat; - -import com.github.sonus21.rqueue.annotation.RqueueListener; -import com.github.sonus21.rqueue.core.RqueueMessageEnqueuer; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Tag; -import org.junit.jupiter.api.Test; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.boot.data.redis.autoconfigure.DataRedisAutoConfiguration; -import org.springframework.boot.data.redis.autoconfigure.DataRedisReactiveAutoConfiguration; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.context.annotation.Import; -import org.springframework.stereotype.Component; - -/** - * Verifies queue-level priority on the NATS backend: a single listener with - * {@code priority="high=10,low=1"} consumes from two internal sub-queues - * ({@code pq_high} and {@code pq_low}) and the producer sends to each via - * {@link RqueueMessageEnqueuer#enqueueWithPriority}. We assert all 10 messages are - * received and that 5 messages with payload prefix "high-" and 5 with "low-" arrive. - */ -@SpringBootTest( - classes = NatsPriorityQueuesE2EIT.TestApp.class, - properties = { - "rqueue.backend=nats", - "rqueue.nats.naming.stream-prefix=" + NatsPriorityQueuesE2EIT.STREAM_PREFIX, - "rqueue.nats.naming.subject-prefix=" + NatsPriorityQueuesE2EIT.SUBJECT_PREFIX - }) -@Tag("nats") -class NatsPriorityQueuesE2EIT extends AbstractNatsBootIT { - - static final String STREAM_PREFIX = "rqueue-js-priorityE2E-"; - static final String SUBJECT_PREFIX = "rqueue.js.priorityE2E."; - - @BeforeAll - static void wipeOwnedStreams() { - deleteStreamsWithPrefix(STREAM_PREFIX); - } - - @Autowired - RqueueMessageEnqueuer enqueuer; - - @Autowired - PriorityListener listener; - - @Test - void messagesEnqueuedAtBothPrioritiesAreReceived() throws Exception { - for (int i = 0; i < 5; i++) { - enqueuer.enqueueWithPriority("pq", "high", "high-" + i); - enqueuer.enqueueWithPriority("pq", "low", "low-" + i); - } - assertThat(listener.latch.await(30, TimeUnit.SECONDS)).isTrue(); - - long highCount = - listener.received.stream().filter(s -> s.startsWith("high-")).count(); - long lowCount = listener.received.stream().filter(s -> s.startsWith("low-")).count(); - assertThat(highCount).isEqualTo(5); - assertThat(lowCount).isEqualTo(5); - } - - @SpringBootApplication( - exclude = {DataRedisAutoConfiguration.class, DataRedisReactiveAutoConfiguration.class}) - @Import(PriorityListener.class) - static class TestApp {} - - @Component - static class PriorityListener { - final CountDownLatch latch = new CountDownLatch(10); - final List received = Collections.synchronizedList(new ArrayList<>()); - - @RqueueListener(value = "pq", priority = "high=10,low=1", batchSize = "5", concurrency = "5") - void onMessage(String payload) { - received.add(payload); - latch.countDown(); - } - } -} diff --git a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsReactiveEnqueueE2EIT.java b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsReactiveEnqueueE2EIT.java deleted file mode 100644 index bb92cca0..00000000 --- a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsReactiveEnqueueE2EIT.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Copyright (c) 2026 Sonu Kumar - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * You may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and limitations under the License. - * - */ -package com.github.sonus21.rqueue.spring.boot.integration; - -import static org.assertj.core.api.Assertions.assertThat; - -import com.github.sonus21.rqueue.annotation.RqueueListener; -import com.github.sonus21.rqueue.core.ReactiveRqueueMessageEnqueuer; -import java.time.Duration; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Tag; -import org.junit.jupiter.api.Test; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.boot.data.redis.autoconfigure.DataRedisAutoConfiguration; -import org.springframework.boot.data.redis.autoconfigure.DataRedisReactiveAutoConfiguration; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.context.annotation.Import; -import org.springframework.stereotype.Component; -import reactor.core.publisher.Flux; - -/** - * Verifies the reactive producer path on the NATS backend: enqueueing 5 messages via - * {@link ReactiveRqueueMessageEnqueuer} (subscribed via {@link Flux#merge}) and confirming a - * synchronous {@code @RqueueListener} on the same queue receives all 5. - */ -@SpringBootTest( - classes = NatsReactiveEnqueueE2EIT.TestApp.class, - properties = { - "rqueue.backend=nats", - "rqueue.reactive.enabled=true", - "rqueue.nats.naming.stream-prefix=" + NatsReactiveEnqueueE2EIT.STREAM_PREFIX, - "rqueue.nats.naming.subject-prefix=" + NatsReactiveEnqueueE2EIT.SUBJECT_PREFIX - }) -@Tag("nats") -class NatsReactiveEnqueueE2EIT extends AbstractNatsBootIT { - - static final String STREAM_PREFIX = "rqueue-js-reactiveEnqE2E-"; - static final String SUBJECT_PREFIX = "rqueue.js.reactiveEnqE2E."; - - @BeforeAll - static void wipeOwnedStreams() { - deleteStreamsWithPrefix(STREAM_PREFIX); - } - - @Autowired - ReactiveRqueueMessageEnqueuer reactiveEnqueuer; - - @Autowired - ReactiveListener listener; - - @Test - void reactivelyEnqueuedMessagesAreReceivedByListener() throws Exception { - List> publishers = new ArrayList<>(); - for (int i = 0; i < 5; i++) { - publishers.add(reactiveEnqueuer.enqueue("reactive-e2e", "rx-" + i)); - } - List ids = Flux.merge(publishers).collectList().block(Duration.ofSeconds(15)); - assertThat(ids).hasSize(5); - - assertThat(listener.latch.await(20, TimeUnit.SECONDS)).isTrue(); - assertThat(listener.received).containsExactlyInAnyOrder("rx-0", "rx-1", "rx-2", "rx-3", "rx-4"); - } - - @SpringBootApplication( - exclude = {DataRedisAutoConfiguration.class, DataRedisReactiveAutoConfiguration.class}) - @Import(ReactiveListener.class) - static class TestApp {} - - @Component - static class ReactiveListener { - final CountDownLatch latch = new CountDownLatch(5); - final List received = Collections.synchronizedList(new ArrayList<>()); - - @RqueueListener(value = "reactive-e2e") - void onMessage(String payload) { - received.add(payload); - latch.countDown(); - } - } -} diff --git a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/integration/BackendContractE2EIT.java b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/integration/BackendContractE2EIT.java new file mode 100644 index 00000000..dec14662 --- /dev/null +++ b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/integration/BackendContractE2EIT.java @@ -0,0 +1,278 @@ +/* + * Copyright (c) 2026 Sonu Kumar + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the License. + * + */ +package com.github.sonus21.rqueue.spring.boot.tests.integration; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.github.sonus21.rqueue.annotation.RqueueListener; +import com.github.sonus21.rqueue.core.ReactiveRqueueMessageEnqueuer; +import com.github.sonus21.rqueue.core.RqueueMessageEnqueuer; +import com.github.sonus21.rqueue.spring.boot.tests.SpringBootIntegrationTest; +import com.github.sonus21.rqueue.test.application.BaseApplication; +import io.nats.client.Connection; +import io.nats.client.JetStreamApiException; +import io.nats.client.JetStreamManagement; +import io.nats.client.Nats; +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Locale; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.logging.Level; +import java.util.logging.Logger; +import org.junit.jupiter.api.Assumptions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; +import org.springframework.test.context.DynamicPropertyRegistry; +import org.springframework.test.context.DynamicPropertySource; +import org.testcontainers.DockerClientFactory; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.utility.DockerImageName; +import reactor.core.publisher.Flux; + +@SpringBootTest(classes = BackendContractE2EIT.TestApp.class) +@SpringBootIntegrationTest +@Tag("nats") +class BackendContractE2EIT { + + private static final Logger log = Logger.getLogger(BackendContractE2EIT.class.getName()); + private static final String BACKEND = + System.getProperty( + "rqueue.test.backend", System.getenv().getOrDefault("RQUEUE_TEST_BACKEND", "redis")) + .toLowerCase(Locale.ROOT); + private static final String STREAM_PREFIX = "rqueue-js-backendContract-"; + private static final String SUBJECT_PREFIX = "rqueue.js.backendContract."; + private static final String EXTERNAL_NATS_URL = + System.getenv().getOrDefault("NATS_URL", "nats://127.0.0.1:4222"); + private static final boolean USE_EXTERNAL_NATS = System.getenv("NATS_RUNNING") != null; + + private static GenericContainer nats; + + @Autowired + RqueueMessageEnqueuer enqueuer; + + @Autowired + ReactiveRqueueMessageEnqueuer reactiveEnqueuer; + + @Autowired + ContractListener listener; + + @BeforeAll + static void bootstrapBackend() { + if (isNatsBackend()) { + startNats(); + deleteStreamsWithPrefix(STREAM_PREFIX); + } + } + + @DynamicPropertySource + static void properties(DynamicPropertyRegistry registry) { + registry.add("rqueue.backend", () -> BACKEND); + registry.add("rqueue.reactive.enabled", () -> "true"); + if (isNatsBackend()) { + registry.add("rqueue.nats.connection.url", BackendContractE2EIT::activeNatsUrl); + registry.add("rqueue.nats.naming.stream-prefix", () -> STREAM_PREFIX); + registry.add("rqueue.nats.naming.subject-prefix", () -> SUBJECT_PREFIX); + } else { + registry.add("spring.data.redis.port", () -> "8028"); + registry.add("mysql.db.name", () -> "BackendContractE2EIT"); + registry.add("use.system.redis", () -> "false"); + } + } + + @BeforeEach + void resetListener() { + listener.reset(); + } + + @Test + void enqueuedMessagesAreReceivedByListener() throws Exception { + for (int i = 0; i < 5; i++) { + enqueuer.enqueue("contract-basic", "payload-" + i); + } + + assertThat(listener.basicLatch.await(20, TimeUnit.SECONDS)).isTrue(); + assertThat(listener.basicReceived) + .containsExactlyInAnyOrder("payload-0", "payload-1", "payload-2", "payload-3", "payload-4"); + } + + @Test + void reactivelyEnqueuedMessagesAreReceivedByListener() throws Exception { + List> publishers = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + publishers.add(reactiveEnqueuer.enqueue("contract-reactive", "rx-" + i)); + } + + List ids = Flux.merge(publishers).collectList().block(Duration.ofSeconds(15)); + assertThat(ids).hasSize(5).doesNotContainNull(); + assertThat(listener.reactiveLatch.await(20, TimeUnit.SECONDS)).isTrue(); + assertThat(listener.reactiveReceived) + .containsExactlyInAnyOrder("rx-0", "rx-1", "rx-2", "rx-3", "rx-4"); + } + + @Test + void concurrentListenerInvocationsAreObserved() throws Exception { + for (int i = 0; i < 30; i++) { + enqueuer.enqueue("contract-concurrency", "msg-" + i); + } + + assertThat(listener.concurrencyLatch.await(45, TimeUnit.SECONDS)).isTrue(); + assertThat(listener.maxParallel.get()).isGreaterThanOrEqualTo(2); + } + + @Test + void messagesEnqueuedAtBothPrioritiesAreReceived() throws Exception { + for (int i = 0; i < 5; i++) { + enqueuer.enqueueWithPriority("contract-priority", "high", "high-" + i); + enqueuer.enqueueWithPriority("contract-priority", "low", "low-" + i); + } + + assertThat(listener.priorityLatch.await(30, TimeUnit.SECONDS)).isTrue(); + assertThat(listener.priorityReceived.stream().filter(s -> s.startsWith("high-")).count()) + .isEqualTo(5); + assertThat(listener.priorityReceived.stream().filter(s -> s.startsWith("low-")).count()) + .isEqualTo(5); + } + + private static boolean isNatsBackend() { + return "nats".equalsIgnoreCase(BACKEND); + } + + private static void startNats() { + if (!isNatsBackend() || USE_EXTERNAL_NATS || nats != null) { + return; + } + Assumptions.assumeTrue( + DockerClientFactory.instance().isDockerAvailable(), + "Skipping: Docker is not available and NATS_RUNNING is not set"); + nats = new GenericContainer<>(DockerImageName.parse("nats:2.12-alpine")) + .withCommand("-js") + .withExposedPorts(4222) + .waitingFor(Wait.forLogMessage(".*Server is ready.*\\n", 1)); + nats.start(); + Runtime.getRuntime().addShutdownHook(new Thread(nats::stop)); + } + + private static String activeNatsUrl() { + if (USE_EXTERNAL_NATS) { + return EXTERNAL_NATS_URL; + } + startNats(); + return "nats://" + nats.getHost() + ":" + nats.getMappedPort(4222); + } + + private static void deleteStreamsWithPrefix(String prefix) { + try (Connection c = Nats.connect(activeNatsUrl())) { + JetStreamManagement management = c.jetStreamManagement(); + List names = management.getStreamNames(); + if (names == null) { + return; + } + for (String name : names) { + if (name.startsWith(prefix)) { + management.deleteStream(name); + } + } + } catch (IOException | JetStreamApiException e) { + log.log(Level.WARNING, "Failed to clean NATS streams: " + e.getMessage()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.log(Level.WARNING, "Failed to clean NATS streams: " + e.getMessage()); + } + } + + @SpringBootApplication + @Import({ContractListener.class, RedisTestConfig.class}) + static class TestApp {} + + @Configuration + @ConditionalOnProperty(name = "rqueue.backend", havingValue = "redis", matchIfMissing = true) + static class RedisTestConfig extends BaseApplication {} + + static class ContractListener { + CountDownLatch basicLatch = new CountDownLatch(5); + List basicReceived = Collections.synchronizedList(new ArrayList<>()); + + CountDownLatch reactiveLatch = new CountDownLatch(5); + List reactiveReceived = Collections.synchronizedList(new ArrayList<>()); + + CountDownLatch concurrencyLatch = new CountDownLatch(30); + AtomicInteger active = new AtomicInteger(); + AtomicInteger maxParallel = new AtomicInteger(); + + CountDownLatch priorityLatch = new CountDownLatch(10); + List priorityReceived = Collections.synchronizedList(new ArrayList<>()); + + void reset() { + basicLatch = new CountDownLatch(5); + basicReceived = Collections.synchronizedList(new ArrayList<>()); + reactiveLatch = new CountDownLatch(5); + reactiveReceived = Collections.synchronizedList(new ArrayList<>()); + concurrencyLatch = new CountDownLatch(30); + active = new AtomicInteger(); + maxParallel = new AtomicInteger(); + priorityLatch = new CountDownLatch(10); + priorityReceived = Collections.synchronizedList(new ArrayList<>()); + } + + @RqueueListener(value = "contract-basic") + void onBasic(String payload) { + basicReceived.add(payload); + basicLatch.countDown(); + } + + @RqueueListener(value = "contract-reactive") + void onReactive(String payload) { + reactiveReceived.add(payload); + reactiveLatch.countDown(); + } + + @RqueueListener(value = "contract-concurrency", concurrency = "3") + void onConcurrent(String payload) throws InterruptedException { + int now = active.incrementAndGet(); + maxParallel.updateAndGet(curr -> Math.max(curr, now)); + try { + Thread.sleep(200L); + } finally { + active.decrementAndGet(); + concurrencyLatch.countDown(); + } + } + + @RqueueListener( + value = "contract-priority", + priority = "high=10,low=1", + batchSize = "5", + concurrency = "5") + void onPriority(String payload) { + priorityReceived.add(payload); + priorityLatch.countDown(); + } + } +} From 421ac706b6ce83e9461edf8af58bf08e7a228a00 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Sat, 23 May 2026 18:44:09 +0000 Subject: [PATCH 13/16] Apply Palantir Java Format --- .../tests/integration/BackendContractE2EIT.java | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/integration/BackendContractE2EIT.java b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/integration/BackendContractE2EIT.java index dec14662..d23da3b1 100644 --- a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/integration/BackendContractE2EIT.java +++ b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/integration/BackendContractE2EIT.java @@ -62,10 +62,9 @@ class BackendContractE2EIT { private static final Logger log = Logger.getLogger(BackendContractE2EIT.class.getName()); - private static final String BACKEND = - System.getProperty( - "rqueue.test.backend", System.getenv().getOrDefault("RQUEUE_TEST_BACKEND", "redis")) - .toLowerCase(Locale.ROOT); + private static final String BACKEND = System.getProperty( + "rqueue.test.backend", System.getenv().getOrDefault("RQUEUE_TEST_BACKEND", "redis")) + .toLowerCase(Locale.ROOT); private static final String STREAM_PREFIX = "rqueue-js-backendContract-"; private static final String SUBJECT_PREFIX = "rqueue.js.backendContract."; private static final String EXTERNAL_NATS_URL = @@ -154,9 +153,12 @@ void messagesEnqueuedAtBothPrioritiesAreReceived() throws Exception { } assertThat(listener.priorityLatch.await(30, TimeUnit.SECONDS)).isTrue(); - assertThat(listener.priorityReceived.stream().filter(s -> s.startsWith("high-")).count()) + assertThat(listener.priorityReceived.stream() + .filter(s -> s.startsWith("high-")) + .count()) .isEqualTo(5); - assertThat(listener.priorityReceived.stream().filter(s -> s.startsWith("low-")).count()) + assertThat( + listener.priorityReceived.stream().filter(s -> s.startsWith("low-")).count()) .isEqualTo(5); } From 914e432fe83d5cf43d57abfd5d34058141fb54ca Mon Sep 17 00:00:00 2001 From: Sonu Kumar Date: Sun, 24 May 2026 09:12:11 +0530 Subject: [PATCH 14/16] Increase broker unit coverage --- .../spi/MessageBrokerDefaultMethodsTest.java | 189 +++++++++++++++++ .../nats/JetStreamMessageBrokerUnitTest.java | 196 +++++++++++++++++- 2 files changed, 383 insertions(+), 2 deletions(-) create mode 100644 rqueue-core/src/test/java/com/github/sonus21/rqueue/core/spi/MessageBrokerDefaultMethodsTest.java diff --git a/rqueue-core/src/test/java/com/github/sonus21/rqueue/core/spi/MessageBrokerDefaultMethodsTest.java b/rqueue-core/src/test/java/com/github/sonus21/rqueue/core/spi/MessageBrokerDefaultMethodsTest.java new file mode 100644 index 00000000..ccc15b9e --- /dev/null +++ b/rqueue-core/src/test/java/com/github/sonus21/rqueue/core/spi/MessageBrokerDefaultMethodsTest.java @@ -0,0 +1,189 @@ +/* + * Copyright (c) 2026 Sonu Kumar + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + */ +package com.github.sonus21.rqueue.core.spi; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.github.sonus21.rqueue.CoreUnitTest; +import com.github.sonus21.rqueue.core.RqueueMessage; +import com.github.sonus21.rqueue.listener.QueueDetail; +import com.github.sonus21.rqueue.utils.TestUtils; +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import java.util.function.Consumer; +import org.junit.jupiter.api.Test; +import reactor.test.StepVerifier; + +@CoreUnitTest +class MessageBrokerDefaultMethodsTest { + + private final QueueDetail queue = TestUtils.createQueueDetail("queue", 1, 30_000L, null); + private final RqueueMessage oldMessage = + RqueueMessage.builder().id("old").message("old").build(); + private final RqueueMessage updatedMessage = + RqueueMessage.builder().id("updated").message("updated").processAt(1L).build(); + + @Test + void priorityAndReactiveDefaultsDelegateToBlockingOperations() { + RecordingBroker broker = new RecordingBroker(); + + broker.enqueue(queue, "high", updatedMessage); + StepVerifier.create(broker.enqueueReactive(queue, oldMessage)).verifyComplete(); + StepVerifier.create(broker.enqueueWithDelayReactive(queue, updatedMessage, 17L)) + .verifyComplete(); + broker.pop(queue, "high", "consumer", 3, Duration.ofMillis(25L)); + + assertEquals(2, broker.enqueueCalls); + assertEquals(oldMessage, broker.lastEnqueued); + assertEquals(1, broker.delayCalls); + assertEquals(17L, broker.lastDelayMs); + assertEquals(1, broker.popCalls); + assertEquals("consumer", broker.lastConsumerName); + assertEquals(Duration.ofMillis(25L), broker.lastWait); + } + + @Test + void retryDlqAndScheduleDefaultsUseBackendPrimitives() { + RecordingBroker broker = new RecordingBroker(); + + broker.parkForRetry(queue, oldMessage, updatedMessage, 123L); + broker.moveToDlq(queue, "dlq", oldMessage, updatedMessage, 0L); + broker.moveToDlq(queue, "dlq", oldMessage, updatedMessage, 99L); + broker.scheduleNext(queue, "period-key", updatedMessage, 60L); + + assertEquals(1, broker.nackCalls); + assertEquals(updatedMessage, broker.lastNacked); + assertEquals(123L, broker.lastRetryDelayMs); + assertEquals(1, broker.enqueueCalls); + assertEquals(2, broker.delayCalls); + assertEquals(updatedMessage, broker.lastDelayed); + } + + @Test + void dashboardDefaultsExposeRedisLabelsAndSingleSubscriberFallback() { + RecordingBroker broker = new RecordingBroker(); + + assertEquals("Redis", broker.storageKicker()); + assertEquals( + "Underlying Redis structures for the queues visible on this page.", + broker.storageDescription()); + assertNull(broker.storageDisplayName(queue)); + assertNull(broker.dlqStorageDisplayName(queue)); + assertNull(broker.consumerPendingSizes(queue)); + assertNull(broker.dataTypeLabel(null, null)); + assertFalse(broker.isSizeApproximate(queue)); + assertNull(broker.getVisibilityTimeoutScore(queue, oldMessage)); + assertFalse(broker.extendVisibilityTimeout(queue, oldMessage, 1L)); + + List subscribers = broker.subscribers(queue); + + assertEquals(1, subscribers.size()); + assertEquals(queue.resolvedConsumerName(), subscribers.get(0).consumerName()); + assertEquals(42L, subscribers.get(0).pending()); + assertEquals(0L, subscribers.get(0).inFlight()); + assertTrue(subscribers.get(0).pendingShared()); + } + + @Test + void subscribersDefaultFallsBackToZeroWhenSizeFails() { + RecordingBroker broker = new RecordingBroker(); + broker.failSize = true; + + List subscribers = broker.subscribers(queue); + + assertEquals(1, subscribers.size()); + assertEquals(0L, subscribers.get(0).pending()); + } + + private static final class RecordingBroker implements MessageBroker { + + int enqueueCalls; + int delayCalls; + int popCalls; + int nackCalls; + boolean failSize; + String lastConsumerName; + Duration lastWait; + long lastDelayMs; + long lastRetryDelayMs; + RqueueMessage lastEnqueued; + RqueueMessage lastDelayed; + RqueueMessage lastNacked; + + @Override + public void enqueue(QueueDetail q, RqueueMessage m) { + enqueueCalls++; + lastEnqueued = m; + } + + @Override + public void enqueueWithDelay(QueueDetail q, RqueueMessage m, long delayMs) { + delayCalls++; + lastDelayed = m; + lastDelayMs = delayMs; + } + + @Override + public List pop(QueueDetail q, String consumerName, int batch, Duration wait) { + popCalls++; + lastConsumerName = consumerName; + lastWait = wait; + return Collections.emptyList(); + } + + @Override + public boolean ack(QueueDetail q, RqueueMessage m) { + return true; + } + + @Override + public boolean nack(QueueDetail q, RqueueMessage m, long retryDelayMs) { + nackCalls++; + lastNacked = m; + lastRetryDelayMs = retryDelayMs; + return true; + } + + @Override + public long moveExpired(QueueDetail q, long now, int batch) { + return 0; + } + + @Override + public List peek(QueueDetail q, long offset, long count) { + return Collections.emptyList(); + } + + @Override + public long size(QueueDetail q) { + if (failSize) { + throw new IllegalStateException("backend down"); + } + return 42L; + } + + @Override + public AutoCloseable subscribe(String channel, Consumer handler) { + return () -> {}; + } + + @Override + public void publish(String channel, String payload) {} + + @Override + public Capabilities capabilities() { + return Capabilities.REDIS_DEFAULTS; + } + } +} diff --git a/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/JetStreamMessageBrokerUnitTest.java b/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/JetStreamMessageBrokerUnitTest.java index 5430ffa7..5c1be1b3 100644 --- a/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/JetStreamMessageBrokerUnitTest.java +++ b/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/JetStreamMessageBrokerUnitTest.java @@ -13,9 +13,11 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -25,7 +27,11 @@ import com.github.sonus21.rqueue.core.RqueueMessage; import com.github.sonus21.rqueue.core.spi.Capabilities; +import com.github.sonus21.rqueue.core.spi.SubscriberView; +import com.github.sonus21.rqueue.enums.QueueType; import com.github.sonus21.rqueue.listener.QueueDetail; +import com.github.sonus21.rqueue.models.enums.DataType; +import com.github.sonus21.rqueue.models.enums.NavTab; import com.github.sonus21.rqueue.nats.internal.NatsProvisioner; import com.github.sonus21.rqueue.nats.js.JetStreamMessageBroker; import com.github.sonus21.rqueue.serdes.RqJacksonSerDes; @@ -35,11 +41,21 @@ import io.nats.client.JetStream; import io.nats.client.JetStreamApiException; import io.nats.client.JetStreamManagement; +import io.nats.client.JetStreamSubscription; +import io.nats.client.Message; import io.nats.client.MessageHandler; +import io.nats.client.PullSubscribeOptions; +import io.nats.client.api.ConsumerInfo; import io.nats.client.api.PublishAck; +import io.nats.client.api.RetentionPolicy; +import io.nats.client.api.SequenceInfo; +import io.nats.client.api.StreamConfiguration; +import io.nats.client.api.StreamInfo; +import io.nats.client.api.StreamState; import io.nats.client.impl.Headers; import java.io.IOException; import java.time.Duration; +import java.util.List; import java.util.concurrent.CompletableFuture; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; @@ -56,9 +72,52 @@ class JetStreamMessageBrokerUnitTest { private static QueueDetail queueNamed(String name) { QueueDetail q = mock(QueueDetail.class); when(q.getName()).thenReturn(name); + when(q.getType()).thenReturn(QueueType.QUEUE); + when(q.resolvedConsumerName()).thenReturn(name + "-consumer"); return q; } + private static QueueDetail queueNamed( + String name, long visibilityTimeout, int numRetry, String consumerName) { + QueueDetail q = queueNamed(name); + when(q.getVisibilityTimeout()).thenReturn(visibilityTimeout); + when(q.getNumRetry()).thenReturn(numRetry); + when(q.resolvedConsumerName()).thenReturn(consumerName); + return q; + } + + private static StreamInfo streamInfo(long firstSeq, long lastSeq, long msgCount) { + return streamInfo(firstSeq, lastSeq, msgCount, RetentionPolicy.WorkQueue); + } + + private static StreamInfo streamInfo( + long firstSeq, long lastSeq, long msgCount, RetentionPolicy retentionPolicy) { + StreamState state = mock(StreamState.class); + when(state.getFirstSequence()).thenReturn(firstSeq); + when(state.getLastSequence()).thenReturn(lastSeq); + when(state.getMsgCount()).thenReturn(msgCount); + StreamConfiguration configuration = mock(StreamConfiguration.class); + when(configuration.getRetentionPolicy()).thenReturn(retentionPolicy); + StreamInfo info = mock(StreamInfo.class); + when(info.getStreamState()).thenReturn(state); + when(info.getConfiguration()).thenReturn(configuration); + return info; + } + + private static ConsumerInfo consumerInfo( + long pending, long ackPending, long ackFloorSequence, long deliveredSequence) { + ConsumerInfo info = mock(ConsumerInfo.class); + when(info.getNumPending()).thenReturn(pending); + when(info.getNumAckPending()).thenReturn(ackPending); + SequenceInfo ackFloor = mock(SequenceInfo.class); + when(ackFloor.getStreamSequence()).thenReturn(ackFloorSequence); + when(info.getAckFloor()).thenReturn(ackFloor); + SequenceInfo delivered = mock(SequenceInfo.class); + when(delivered.getStreamSequence()).thenReturn(deliveredSequence); + when(info.getDelivered()).thenReturn(delivered); + return info; + } + /** Build a broker with all NATS primitives mocked and stream provisioning short-circuited. */ private static Fixture newFixture(RqueueNatsConfig config) { return newFixture(config, false); @@ -80,7 +139,7 @@ private static Fixture newFixture(RqueueNatsConfig config, boolean schedulingSup config, new RqJacksonSerDes(SerializationUtils.getObjectMapper()), provisioner); - return new Fixture(conn, js, jsm, broker); + return new Fixture(conn, js, jsm, provisioner, broker); } @Test @@ -110,6 +169,61 @@ void getPollWait_fallsBackToPollingIntervalWhenFetchWaitIsUnset() { assertEquals(pollingInterval, f.broker.getPollWait(pollingInterval)); } + @Test + void popPriorityBindsConsumerWithQueueRetrySettingsAndTracksInFlightMessage() throws Exception { + Fixture f = newFixture(RqueueNatsConfig.defaults()); + QueueDetail q = queueNamed("orders", 2_500L, 4, "worker-a"); + JetStreamSubscription subscription = mock(JetStreamSubscription.class); + when(f.provisioner.ensureConsumer( + eq("rqueue-js-orders_high"), + eq("worker-a"), + eq("rqueue.js.orders_high"), + eq(Duration.ofMillis(2_500L)), + eq(5L), + anyLong())) + .thenReturn("worker-a"); + when(f.js.subscribe(eq(null), any(PullSubscribeOptions.class))).thenReturn(subscription); + RqueueMessage message = RqueueMessage.builder().id("m1").message("payload").build(); + Message natsMessage = mock(Message.class); + when(natsMessage.getData()) + .thenReturn(new RqJacksonSerDes(SerializationUtils.getObjectMapper()).serialize(message)); + when(subscription.fetch(eq(1), eq(Duration.ofMillis(50L)))).thenReturn(List.of(natsMessage)); + + List messages = f.broker.pop(q, "high", "worker-a", 1, Duration.ZERO); + + assertEquals(1, messages.size()); + assertEquals("m1", messages.get(0).getId()); + assertTrue(f.broker.extendVisibilityTimeout(q, messages.get(0), 1_000L)); + verify(natsMessage, times(1)).inProgress(); + assertTrue(f.broker.nack(q, messages.get(0), -1L)); + verify(natsMessage, times(1)).nakWithDelay(Duration.ZERO); + assertFalse(f.broker.ack(q, messages.get(0))); + } + + @Test + void popNaksUndeserializablePayloadAndReturnsRemainingMessages() throws Exception { + Fixture f = newFixture(RqueueNatsConfig.defaults()); + JetStreamSubscription subscription = mock(JetStreamSubscription.class); + when(f.provisioner.ensureConsumer( + eq("rqueue-js-orders"), + eq("orders-consumer"), + eq("rqueue.js.orders"), + any(Duration.class), + anyLong(), + anyLong())) + .thenReturn("orders-consumer"); + when(f.js.subscribe(eq(null), any(PullSubscribeOptions.class))).thenReturn(subscription); + Message badMessage = mock(Message.class); + when(badMessage.getData()).thenReturn("not-json".getBytes(UTF_8)); + when(subscription.fetch(eq(1), eq(Duration.ofMillis(10L)))).thenReturn(List.of(badMessage)); + + List messages = + f.broker.pop(queueNamed("orders"), "orders-consumer", 1, Duration.ofMillis(10L)); + + assertTrue(messages.isEmpty()); + verify(badMessage, times(1)).nak(); + } + @Test void enqueueWithPriority_appendsPrioritySuffixToSubject() throws Exception { Fixture f = newFixture(RqueueNatsConfig.defaults()); @@ -190,6 +304,77 @@ void subscribe_createsDispatcherAndSubscribesChannel_closeReleasesIt() throws Ex verify(f.conn, times(1)).closeDispatcher(d); } + @Test + void sizeUsesStreamCountForWorkQueueAndApproximatesLimitsFromSlowestConsumer() throws Exception { + Fixture f = newFixture(RqueueNatsConfig.defaults()); + StreamInfo workInfo = streamInfo(1L, 9L, 7L); + when(f.jsm.getStreamInfo("rqueue-js-work")).thenReturn(workInfo); + assertEquals(7L, f.broker.size(queueNamed("work"))); + assertFalse(f.broker.isSizeApproximate(queueNamed("work"))); + + StreamInfo fanoutInfo = streamInfo(1L, 10L, 10L, RetentionPolicy.Limits); + ConsumerInfo fastConsumer = consumerInfo(1L, 0L, 8L, 9L); + ConsumerInfo slowConsumer = consumerInfo(5L, 2L, 3L, 4L); + when(f.jsm.getStreamInfo("rqueue-js-fanout")).thenReturn(fanoutInfo); + when(f.jsm.getConsumerNames("rqueue-js-fanout")).thenReturn(List.of("fast", "slow")); + when(f.jsm.getConsumerInfo("rqueue-js-fanout", "fast")).thenReturn(fastConsumer); + when(f.jsm.getConsumerInfo("rqueue-js-fanout", "slow")).thenReturn(slowConsumer); + + assertEquals(7L, f.broker.size(queueNamed("fanout"))); + assertTrue(f.broker.isSizeApproximate(queueNamed("fanout"))); + } + + @Test + void subscribersExposeSharedPendingForWorkQueueAndPerConsumerPendingForLimits() throws Exception { + Fixture f = newFixture(RqueueNatsConfig.defaults()); + StreamInfo workInfo = streamInfo(1L, 4L, 4L); + ConsumerInfo workConsumer = consumerInfo(12L, 3L, 1L, 1L); + when(f.jsm.getStreamInfo("rqueue-js-work")).thenReturn(workInfo); + when(f.jsm.getConsumerNames("rqueue-js-work")).thenReturn(List.of("a")); + when(f.jsm.getConsumerInfo("rqueue-js-work", "a")).thenReturn(workConsumer); + + List workSubscribers = f.broker.subscribers(queueNamed("work")); + + assertEquals(1, workSubscribers.size()); + assertEquals("a", workSubscribers.get(0).consumerName()); + assertEquals(4L, workSubscribers.get(0).pending()); + assertEquals(3L, workSubscribers.get(0).inFlight()); + assertTrue(workSubscribers.get(0).pendingShared()); + + StreamInfo fanoutInfo = streamInfo(1L, 6L, 6L, RetentionPolicy.Limits); + ConsumerInfo fanoutA = consumerInfo(2L, 1L, 3L, 4L); + ConsumerInfo fanoutB = consumerInfo(5L, 0L, 1L, 1L); + when(f.jsm.getStreamInfo("rqueue-js-fanout")).thenReturn(fanoutInfo); + when(f.jsm.getConsumerNames("rqueue-js-fanout")).thenReturn(List.of("a", "b")); + when(f.jsm.getConsumerInfo("rqueue-js-fanout", "a")).thenReturn(fanoutA); + when(f.jsm.getConsumerInfo("rqueue-js-fanout", "b")).thenReturn(fanoutB); + + List fanoutSubscribers = f.broker.subscribers(queueNamed("fanout")); + + assertEquals(2, fanoutSubscribers.size()); + assertEquals(2L, fanoutSubscribers.get(0).pending()); + assertEquals(1L, fanoutSubscribers.get(0).inFlight()); + assertFalse(fanoutSubscribers.get(0).pendingShared()); + assertEquals(5L, fanoutSubscribers.get(1).pending()); + } + + @Test + void storageDisplayAndDataTypeLabelsUseNatsTerminology() { + Fixture f = newFixture(RqueueNatsConfig.defaults()); + QueueDetail q = queueNamed("orders"); + + assertEquals("NATS", f.broker.storageKicker()); + assertTrue(f.broker.storageDescription().contains("NATS JetStream")); + assertEquals("rqueue-js-orders", f.broker.storageDisplayName(q)); + assertEquals("rqueue-js-orders-dlq", f.broker.dlqStorageDisplayName(q)); + assertEquals("Queue (Stream)", f.broker.dataTypeLabel(NavTab.PENDING, DataType.LIST)); + assertEquals("Dead Letter (Stream)", f.broker.dataTypeLabel(NavTab.DEAD, DataType.ZSET)); + assertEquals("Completed (KV)", f.broker.dataTypeLabel(NavTab.COMPLETED, DataType.SET)); + assertEquals("Stream", f.broker.dataTypeLabel(NavTab.RUNNING, DataType.ZSET)); + assertEquals("Stream", f.broker.dataTypeLabel(null, DataType.LIST)); + assertNull(f.broker.dataTypeLabel(null, null)); + } + @Test void enqueueReactive_completesWhenPublishFutureCompletes() { Fixture f = newFixture(RqueueNatsConfig.defaults()); @@ -362,12 +547,19 @@ private static final class Fixture { final Connection conn; final JetStream js; final JetStreamManagement jsm; + final NatsProvisioner provisioner; final JetStreamMessageBroker broker; - Fixture(Connection conn, JetStream js, JetStreamManagement jsm, JetStreamMessageBroker broker) { + Fixture( + Connection conn, + JetStream js, + JetStreamManagement jsm, + NatsProvisioner provisioner, + JetStreamMessageBroker broker) { this.conn = conn; this.js = js; this.jsm = jsm; + this.provisioner = provisioner; this.broker = broker; } } From 32a8c0f9451e95a61ebc1a5f32eee241cf20eea2 Mon Sep 17 00:00:00 2001 From: Sonu Kumar Date: Sun, 24 May 2026 09:26:22 +0530 Subject: [PATCH 15/16] chore: bump version --- build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.gradle b/build.gradle index 4a0d8b21..b5791994 100644 --- a/build.gradle +++ b/build.gradle @@ -84,7 +84,7 @@ ext { subprojects { group = "com.github.sonus21" - version = "4.0.0-RC10" + version = "4.0.0-RC11" dependencies { // https://mvnrepository.com/artifact/org.springframework/spring-messaging From 37a61882f19f29f4512f8c4ca696c63b42b5e8cc Mon Sep 17 00:00:00 2001 From: Sonu Kumar Date: Sun, 24 May 2026 09:41:00 +0530 Subject: [PATCH 16/16] Update RC10 and RC11 release notes --- docs/CHANGELOG.md | 46 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index ff7dfdc3..b1324221 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -18,6 +18,52 @@ foundational Spring Boot 4 and Jackson 3 migration notes; RC3 for the Java 17 baseline change; RC4–RC6 below for the NATS backend, broker SPI, dashboard work, and middleware additions that build on top. +## Release [4.0.0.RC11] 2026-05-24 + +{: .highlight} +Release candidate. + +### Fixes +* **Delayed listener startup for Spring Boot web apps** — Rqueue listener + containers in servlet and reactive Spring Boot web applications now wait until + `ApplicationReadyEvent` before consuming work. Non-web worker applications keep + the existing `SmartLifecycle` startup behavior. +* **Idempotent listener container startup** — repeated `start()` calls no longer + re-run queue startup, and the container marks itself running only after + startup succeeds. +* **Global retry cap enforcement** — `rqueue.retry.max` now caps the remaining + retry budget even when `rqueue.retry.per.poll` is low or high. The retry logic + is centralized in `RetryPolicy`, preserving explicit message/listener retry + counts while preventing implicit retry-forever jobs from bypassing the global + max. +* **NATS listener polling wait** — NATS pollers now use the backend-configured + fetch wait via the broker SPI, reducing short-poll churn while keeping Redis + behavior unchanged. + +### Build +* **Shared backend contract E2E tests** — Redis and NATS now run the same backend + contract E2E coverage through environment-selected bootstrapping, replacing + duplicated NATS-only E2E classes. +* **Broker coverage** — added focused unit coverage for broker defaults and NATS + JetStream pop, in-flight, size, subscriber, and dashboard-label paths. + +## Release [4.0.0.RC10] 2026-05-21 + +{: .highlight} +Release candidate. + +### Fixes +* **Spring Boot 3.x to 4.x message compatibility** — restored Jackson 2.x + property ordering compatibility in `RqueueRedisSerializer` so messages written + by Rqueue 3.x can be acknowledged or parked for retry after upgrading to + Rqueue 4.x. This prevents stale processing-queue entries caused by byte-exact + Redis `ZSCORE` / `ZREM` lookups using a different serialized property order. + +### Docs +* **Migration guidance** — clarified the 3.x to 4.x upgrade notes around + `rqueue.serialization.property.order` so applications can choose the + compatibility mode intentionally during rolling upgrades. + ## Release [4.0.0.RC9] 2026-05-13 {: .highlight}