Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/java-ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,7 @@ jobs:

- name: Run NATS tests
env:
RQUEUE_TEST_BACKEND: nats
NATS_RUNNING: "true"
NATS_URL: nats://127.0.0.1:4222
run: ./gradlew :rqueue-nats:test :rqueue-spring-boot-starter:test :rqueue-spring:test -DincludeTags=nats
Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ ext {

subprojects {
group = "com.github.sonus21"
version = "4.0.0-RC10"
version = "4.0.0-RC11"

dependencies {
// https://mvnrepository.com/artifact/org.springframework/spring-messaging
Expand Down
46 changes: 46 additions & 0 deletions docs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,52 @@ foundational Spring Boot 4 and Jackson 3 migration notes; RC3 for the Java 17
baseline change; RC4–RC6 below for the NATS backend, broker SPI, dashboard
work, and middleware additions that build on top.

## Release [4.0.0.RC11] 2026-05-24

{: .highlight}
Release candidate.

### Fixes
* **Delayed listener startup for Spring Boot web apps** — Rqueue listener
containers in servlet and reactive Spring Boot web applications now wait until
`ApplicationReadyEvent` before consuming work. Non-web worker applications keep
the existing `SmartLifecycle` startup behavior.
* **Idempotent listener container startup** — repeated `start()` calls no longer
re-run queue startup, and the container marks itself running only after
startup succeeds.
* **Global retry cap enforcement** — `rqueue.retry.max` now caps the remaining
retry budget even when `rqueue.retry.per.poll` is low or high. The retry logic
is centralized in `RetryPolicy`, preserving explicit message/listener retry
counts while preventing implicit retry-forever jobs from bypassing the global
max.
* **NATS listener polling wait** — NATS pollers now use the backend-configured
fetch wait via the broker SPI, reducing short-poll churn while keeping Redis
behavior unchanged.

### Build
* **Shared backend contract E2E tests** — Redis and NATS now run the same backend
contract E2E coverage through environment-selected bootstrapping, replacing
duplicated NATS-only E2E classes.
* **Broker coverage** — added focused unit coverage for broker defaults and NATS
JetStream pop, in-flight, size, subscriber, and dashboard-label paths.

## Release [4.0.0.RC10] 2026-05-21

{: .highlight}
Release candidate.

### Fixes
* **Spring Boot 3.x to 4.x message compatibility** — restored Jackson 2.x
property ordering compatibility in `RqueueRedisSerializer` so messages written
by Rqueue 3.x can be acknowledged or parked for retry after upgrading to
Rqueue 4.x. This prevents stale processing-queue entries caused by byte-exact
Redis `ZSCORE` / `ZREM` lookups using a different serialized property order.

### Docs
* **Migration guidance** — clarified the 3.x to 4.x upgrade notes around
`rqueue.serialization.property.order` so applications can choose the
compatibility mode intentionally during rolling upgrades.

## Release [4.0.0.RC9] 2026-05-13

{: .highlight}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,19 @@ default Mono<Void> enqueueWithDelayReactive(QueueDetail q, RqueueMessage m, long
return Mono.fromRunnable(() -> enqueueWithDelay(q, m, delayMs));
}

/**
* Resolve the wait duration that listener pollers should pass to {@link #pop}. The default uses
* the listener container's polling interval, preserving existing Redis behavior where that value
* also controls idle sleeps. Backends with native long-poll controls can override this so their
* fetch wait can be tuned independently.
*
* @param pollingInterval listener container polling interval
* @return wait duration for listener-driven pop calls
*/
default Duration getPollWait(Duration pollingInterval) {
return pollingInterval;
}

List<RqueueMessage> pop(QueueDetail q, String consumerName, int batch, Duration wait);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,18 +225,11 @@ private void handleRetryExceededMessage(JobImpl job, int failureCount, Throwable
}
}

private int getMaxRetryCount(RqueueMessage rqueueMessage, QueueDetail queueDetail) {
return rqueueMessage.getRetryCount() == null
? queueDetail.getNumRetry()
: rqueueMessage.getRetryCount();
}

private void handleFailure(JobImpl job, int failureCount, Throwable throwable) {
if (job.getQueueDetail().isDoNotRetryError(throwable)) {
handleRetryExceededMessage(job, failureCount, throwable);
} else {
int maxRetryCount = getMaxRetryCount(job.getRqueueMessage(), job.getQueueDetail());
if (failureCount < maxRetryCount) {
if (!RetryPolicy.isExhausted(job.getRqueueMessage(), job.getQueueDetail(), failureCount)) {
long delay = taskExecutionBackoff.nextBackOff(
job.getMessage(), job.getRqueueMessage(), failureCount, throwable);
if (delay == TaskExecutionBackOff.STOP) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright (c) 2026 Sonu Kumar
*
* Licensed under the Apache License, Version 2.0 (the "License");
* You may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and limitations under the License.
*
*/

package com.github.sonus21.rqueue.listener;

import com.github.sonus21.rqueue.config.RqueueConfig;
import com.github.sonus21.rqueue.core.RqueueMessage;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;

@NoArgsConstructor(access = AccessLevel.PRIVATE)
final class RetryPolicy {

static final int UNLIMITED_RETRY_LIMIT = 100_000;

static int maxRetryCount(RqueueMessage rqueueMessage, QueueDetail queueDetail) {
int maxRetryCount = rqueueMessage.getRetryCount() == null
? queueDetail.getNumRetry()
: rqueueMessage.getRetryCount();
if (maxRetryCount == Integer.MAX_VALUE) {
return UNLIMITED_RETRY_LIMIT;
}
return maxRetryCount;
}

static int remainingRetryCount(
RqueueMessage rqueueMessage, QueueDetail queueDetail, int failureCount) {
int maxRetryCount = maxRetryCount(rqueueMessage, queueDetail);
return Math.max(0, maxRetryCount - failureCount);
}

static int retryCountForPoll(
RqueueConfig rqueueConfig,
RqueueMessage rqueueMessage,
QueueDetail queueDetail,
int failureCount) {
int remainingRetryCount = remainingRetryCount(rqueueMessage, queueDetail, failureCount);
if (rqueueConfig.getRetryPerPoll() == -1) {
return remainingRetryCount;
}
return Math.min(rqueueConfig.getRetryPerPoll(), remainingRetryCount);
}

static boolean isExhausted(
RqueueMessage rqueueMessage, QueueDetail queueDetail, int failureCount) {
return remainingRetryCount(rqueueMessage, queueDetail, failureCount) == 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,6 @@ private void init() {
this.failureCount = job.getRqueueMessage().getFailureCount();
}

private int getMaxRetryCount() {
return Objects.isNull(job.getRqueueMessage().getRetryCount())
? job.getQueueDetail().getNumRetry()
: job.getRqueueMessage().getRetryCount();
}

private void updateCounter(boolean fail) {
RqueueMetricsCounter counter = beanProvider.getRqueueMetricsCounter();
if (Objects.isNull(counter)) {
Expand Down Expand Up @@ -179,11 +173,8 @@ private boolean isOldMessage() {
}

private int getRetryCount() {
int maxRetry = getMaxRetryCount();
if (beanProvider.getRqueueConfig().getRetryPerPoll() == -1) {
return maxRetry;
}
return Math.min(beanProvider.getRqueueConfig().getRetryPerPoll(), maxRetry);
return RetryPolicy.retryCountForPoll(
beanProvider.getRqueueConfig(), job.getRqueueMessage(), job.getQueueDetail(), failureCount);
}

private boolean queueInactive() {
Expand Down Expand Up @@ -283,6 +274,7 @@ private void execute() {
private boolean shouldRetry(long maxProcessingTime, int retryCount, int failureCount) {
if (retryCount > 0
&& ExecutionStatus.FAILED.equals(status)
&& !RetryPolicy.isExhausted(rqueueMessage, queueDetail, failureCount)
&& System.currentTimeMillis() < maxProcessingTime) {
boolean doNoRetry = queueDetail.isDoNotRetryError(error);
// it should not be retried based on the exception list
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -517,10 +517,14 @@ private List<QueueDetail> getQueueDetail(String queue, MappingInformation mappin

@Override
public void start() {
log.info("Starting Rqueue Message container {}", RqueueConfig.getBrokerId());
synchronized (lifecycleMgr) {
running = true;
if (running) {
log.debug("Rqueue Message container {} is already running", RqueueConfig.getBrokerId());
return;
}
log.info("Starting Rqueue Message container {}", RqueueConfig.getBrokerId());
doStart();
running = true;
rqueueBeanProvider
.getApplicationEventPublisher()
.publishEvent(new RqueueBootstrapEvent(EVENT_SOURCE, true));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.github.sonus21.rqueue.core.RqueueBeanProvider;
import com.github.sonus21.rqueue.core.RqueueMessage;
import com.github.sonus21.rqueue.core.middleware.Middleware;
import com.github.sonus21.rqueue.core.spi.MessageBroker;
import com.github.sonus21.rqueue.listener.RqueueMessageListenerContainer.QueueStateMgr;
import com.github.sonus21.rqueue.utils.Constants;
import com.github.sonus21.rqueue.utils.QueueThreadPool;
Expand Down Expand Up @@ -59,13 +60,9 @@ abstract class RqueueMessagePoller extends MessageContainerBase {
}

private List<RqueueMessage> getMessages(QueueDetail queueDetail, int count) {
return rqueueBeanProvider
.getMessageBroker()
.pop(
queueDetail,
queueDetail.resolvedConsumerName(),
count,
Duration.ofMillis(pollingInterval));
MessageBroker broker = rqueueBeanProvider.getMessageBroker();
Duration wait = broker.getPollWait(Duration.ofMillis(pollingInterval));
return broker.pop(queueDetail, queueDetail.resolvedConsumerName(), count, wait);
}

private void execute(
Expand Down
Loading
Loading