diff --git a/build.gradle b/build.gradle index 87093c40..4a0d8b21 100644 --- a/build.gradle +++ b/build.gradle @@ -84,7 +84,7 @@ ext { subprojects { group = "com.github.sonus21" - version = "4.0.0-RC9" + version = "4.0.0-RC10" dependencies { // https://mvnrepository.com/artifact/org.springframework/spring-messaging diff --git a/docs/configuration/configuration.md b/docs/configuration/configuration.md index 49676387..c4bb83c3 100644 --- a/docs/configuration/configuration.md +++ b/docs/configuration/configuration.md @@ -274,6 +274,19 @@ whenever Rqueue generates the message ID internally. ## Additional Configuration +- **`rqueue.serialization.property.order`**: Controls the JSON property ordering used + when serialising `RqueueMessage` to Redis. Accepted values: + - `ALPHABETICAL` *(default)* — alphabetical order, the Jackson 3.x native default. + This is the standard setting for RQueue 4.x deployments. + - `DECLARATION` — declaration order, matching the Jackson 2.x behaviour used by RQueue 3.x. + Use this when upgrading from 3.x with messages still present in Redis queues. + + {: .warning} + Switching between values while messages are present in the processing queue will cause + those in-flight messages to be unexpectedly retried — the visibility-timeout rescue + path preserves raw bytes verbatim, so the serialisation mismatch persists across + re-deliveries. Drain the processing queue before changing this setting. + - **`rqueue.retry.per.poll`**: Determines how many times a polled message is retried immediately if processing fails, before it is moved back to the queue for a subsequent poll. The default value is `1`. If increased to `N`, the message will diff --git a/docs/migrations.md b/docs/migrations.md index 75aac352..b0924ace 100644 --- a/docs/migrations.md +++ b/docs/migrations.md @@ -73,3 +73,28 @@ ZCARD rqueue-processing:: If all commands return **0**, your queues are empty and you can proceed with the migration without additional configuration. + +--- + +## Upgrading from 3.x to 4.x + +RQueue 4.x switched from Jackson 2.x (`com.fasterxml.jackson`) to Jackson 3.x +(`tools.jackson`). Jackson 3.x defaults to **alphabetical** JSON property ordering, +while Jackson 2.x used **declaration order**. Messages enqueued by 3.x and messages +enqueued by 4.x therefore have different byte representations in Redis. + +The processing queue uses byte-exact lookups (ZSCORE/ZREM) to move or acknowledge +messages. If stored bytes do not match the re-serialised bytes, the lookup silently +fails and the message is repeatedly re-delivered via the visibility-timeout rescue path. + +**If you are upgrading with messages still present in Redis**, set the following +property to keep using declaration order (matching what 3.x stored): + +```properties +rqueue.serialization.property.order=DECLARATION +``` + +{: .warning} +Changing `rqueue.serialization.property.order` while messages are present in the +processing queue will cause those messages to be unexpectedly retried. Drain the processing +queue before switching values. diff --git a/rqueue-core/build.gradle b/rqueue-core/build.gradle index b941f9c1..e37f1b20 100644 --- a/rqueue-core/build.gradle +++ b/rqueue-core/build.gradle @@ -52,6 +52,7 @@ dependencies { api "org.apache.commons:commons-collections4:${apacheCommonCollectionVerion}" // https://mvnrepository.com/artifact/io.micrometer/micrometer-core api "io.micrometer:micrometer-core:${microMeterVersion}" + testImplementation "com.fasterxml.jackson.core:jackson-databind:2.21.2" testImplementation "io.lettuce:lettuce-core:${lettuceVersion}" testImplementation "io.projectreactor:reactor-test:${projectReactorReactorTestVersion}" testImplementation project(":rqueue-test-util") diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/config/RqueueConfig.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/config/RqueueConfig.java index 9131421c..891df1cf 100644 --- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/config/RqueueConfig.java +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/config/RqueueConfig.java @@ -16,6 +16,7 @@ package com.github.sonus21.rqueue.config; +import com.github.sonus21.rqueue.converter.RqueueRedisSerializer; import com.github.sonus21.rqueue.models.enums.RqueueMode; import com.github.sonus21.rqueue.utils.Constants; import com.github.sonus21.rqueue.utils.StringUtils; @@ -166,6 +167,9 @@ private static String generateBrokerId() { @Value("${rqueue.worker.registry.enabled:true}") private boolean workerRegistryEnabled; + @Value("${rqueue.serialization.property.order:ALPHABETICAL}") + private RqueueRedisSerializer.PropertyOrder serializationPropertyOrder; + @Value("${rqueue.worker.registry.worker.ttl:300}") private long workerRegistryWorkerTtlInSeconds; diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/config/RqueueListenerBaseConfig.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/config/RqueueListenerBaseConfig.java index 2bd43887..30ebfc2c 100644 --- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/config/RqueueListenerBaseConfig.java +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/config/RqueueListenerBaseConfig.java @@ -18,6 +18,7 @@ import com.github.sonus21.rqueue.common.RqueueRedisTemplate; import com.github.sonus21.rqueue.converter.MessageConverterProvider; +import com.github.sonus21.rqueue.converter.RqueueRedisSerializer; import com.github.sonus21.rqueue.core.RqueueBeanProvider; import com.github.sonus21.rqueue.core.RqueueMessageIdGenerator; import com.github.sonus21.rqueue.core.RqueueMessageTemplate; @@ -37,6 +38,9 @@ import org.springframework.context.annotation.Conditional; import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory; import org.springframework.data.redis.connection.RedisConnectionFactory; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.data.redis.serializer.RedisSerializationContext; +import org.springframework.data.redis.serializer.StringRedisSerializer; /** * This is a base configuration class for Rqueue, that is used in Spring and Spring boot Rqueue libs @@ -60,6 +64,9 @@ public abstract class RqueueListenerBaseConfig { @Value("${rqueue.reactive.enabled:false}") protected boolean reactiveEnabled; + @Value("${rqueue.serialization.property.order:ALPHABETICAL}") + private RqueueRedisSerializer.PropertyOrder serializationPropertyOrder; + @Value( "${rqueue.message.converter.provider.class:com.github.sonus21.rqueue.converter.DefaultMessageConverterProvider}") private String messageConverterProviderClass; @@ -109,6 +116,31 @@ public RqueueConfig rqueueConfig( @Value("${rqueue.backend:REDIS}") Backend backend, @Value("${rqueue.version.key:__rq::version}") String versionKey, @Value("${rqueue.db.version:}") Integer dbVersion) { + if (serializationPropertyOrder == RqueueRedisSerializer.PropertyOrder.DECLARATION) { + RqueueRedisSerializer serializer = + new RqueueRedisSerializer(RqueueRedisSerializer.PropertyOrder.DECLARATION); + StringRedisSerializer keySerializer = new StringRedisSerializer(); + RedisUtils.redisTemplateProvider = new RedisUtils.RedisTemplateProvider() { + @Override + public RedisTemplate getRedisTemplate( + RedisConnectionFactory redisConnectionFactory) { + RedisTemplate template = new RedisTemplate<>(); + template.setConnectionFactory(redisConnectionFactory); + template.setKeySerializer(keySerializer); + template.setValueSerializer(serializer); + template.setHashKeySerializer(keySerializer); + template.setHashValueSerializer(serializer); + return template; + } + }; + RedisUtils.redisSerializationContextProvider = + () -> RedisSerializationContext.newSerializationContext() + .key(keySerializer) + .value(serializer) + .hashKey(keySerializer) + .hashValue(serializer) + .build(); + } boolean sharedConnection = false; RedisConnectionFactory connectionFactory = simpleRqueueListenerContainerFactory.getRedisConnectionFactory(); diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/converter/RqueueRedisSerializer.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/converter/RqueueRedisSerializer.java index 245fe181..20796958 100644 --- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/converter/RqueueRedisSerializer.java +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/converter/RqueueRedisSerializer.java @@ -25,6 +25,7 @@ import tools.jackson.core.JacksonException; import tools.jackson.core.JsonGenerator; import tools.jackson.databind.DefaultTyping; +import tools.jackson.databind.MapperFeature; import tools.jackson.databind.ObjectMapper; import tools.jackson.databind.SerializationContext; import tools.jackson.databind.jsontype.BasicPolymorphicTypeValidator; @@ -34,14 +35,35 @@ @Slf4j public class RqueueRedisSerializer implements RedisSerializer { + /** + * Controls JSON property ordering for {@link com.github.sonus21.rqueue.core.RqueueMessage} + * serialisation. Configure via {@code rqueue.serialization.property.order}. + * + *
    + *
  • {@link #ALPHABETICAL} — alphabetical order, Jackson 3.x native behaviour. This is the + * default for RQueue 4.x. + *
  • {@link #DECLARATION} — declaration order, matching Jackson 2.x / RQueue 3.x. Use when + * upgrading from 3.x with messages still present in Redis queues. + *
+ */ + public enum PropertyOrder { + ALPHABETICAL, + DECLARATION + } + private final RedisSerializer serializer; public RqueueRedisSerializer(RedisSerializer redisSerializer) { this.serializer = redisSerializer; } + /** Creates a serialiser using {@link PropertyOrder#ALPHABETICAL} (Jackson 3.x default). */ public RqueueRedisSerializer() { - this(new RqueueRedisSerDes()); + this(PropertyOrder.ALPHABETICAL); + } + + public RqueueRedisSerializer(PropertyOrder order) { + this(new RqueueRedisSerDes(order)); } @Override @@ -66,8 +88,8 @@ public Object deserialize(byte[] bytes) throws SerializationException { private static class RqueueRedisSerDes implements RedisSerializer { private final ObjectMapper mapper; - RqueueRedisSerDes() { - this.mapper = SerializationUtils.getObjectMapper() + RqueueRedisSerDes(PropertyOrder order) { + var builder = SerializationUtils.getObjectMapper() .rebuild() .addModule(new SimpleModule().addSerializer(new NullValueSerializer())) .activateDefaultTyping( @@ -75,8 +97,11 @@ private static class RqueueRedisSerDes implements RedisSerializer { .allowIfSubType(Object.class) .build(), DefaultTyping.NON_FINAL, - As.PROPERTY) - .build(); + As.PROPERTY); + if (order == PropertyOrder.DECLARATION) { + builder = builder.disable(MapperFeature.SORT_PROPERTIES_ALPHABETICALLY); + } + this.mapper = builder.build(); } @Override diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/RqueueInternalPubSubChannel.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/RqueueInternalPubSubChannel.java index ac2517c9..e8cae8d1 100644 --- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/RqueueInternalPubSubChannel.java +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/RqueueInternalPubSubChannel.java @@ -57,7 +57,8 @@ public RqueueInternalPubSubChannel( this.rqueueConfig = rqueueConfig; this.stringRqueueRedisTemplate = stringRqueueRedisTemplate; this.rqueueBeanProvider = rqueueBeanProvider; - this.rqueueRedisSerializer = new RqueueRedisSerializer(); + this.rqueueRedisSerializer = + new RqueueRedisSerializer(rqueueConfig.getSerializationPropertyOrder()); } @Override diff --git a/rqueue-core/src/test/java/com/github/sonus21/rqueue/core/spi/redis/RedisMessageBrokerSerializationOrderTest.java b/rqueue-core/src/test/java/com/github/sonus21/rqueue/core/spi/redis/RedisMessageBrokerSerializationOrderTest.java new file mode 100644 index 00000000..f303ad7e --- /dev/null +++ b/rqueue-core/src/test/java/com/github/sonus21/rqueue/core/spi/redis/RedisMessageBrokerSerializationOrderTest.java @@ -0,0 +1,390 @@ +package com.github.sonus21.rqueue.core.spi.redis; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import com.github.sonus21.rqueue.converter.RqueueRedisSerializer; +import com.github.sonus21.rqueue.converter.RqueueRedisSerializer.PropertyOrder; +import com.github.sonus21.rqueue.core.RqueueMessage; +import com.github.sonus21.rqueue.core.impl.RqueueMessageTemplateImpl; +import com.github.sonus21.rqueue.listener.QueueDetail; +import com.github.sonus21.rqueue.utils.RedisUtils; +import com.github.sonus21.rqueue.utils.TestUtils; +import java.io.IOException; +import java.net.ServerSocket; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.springframework.data.redis.connection.RedisStandaloneConfiguration; +import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.data.redis.serializer.RedisSerializer; +import org.springframework.data.redis.serializer.StringRedisSerializer; +import redis.embedded.RedisServer; + +@Tag("core") +class RedisMessageBrokerSerializationOrderTest { + + private static RedisServer redisServer; + private static int redisPort; + + @BeforeAll + static void startRedis() throws IOException { + try (ServerSocket socket = new ServerSocket(0)) { + redisPort = socket.getLocalPort(); + } + redisServer = new RedisServer(redisPort); + redisServer.start(); + } + + @AfterAll + static void stopRedis() throws IOException { + if (redisServer != null) { + redisServer.stop(); + } + } + + private LettuceConnectionFactory connectionFactory; + private RqueueMessageTemplateImpl messageTemplate; + private RedisMessageBroker broker; + // Passthrough serialiser: inserts raw bytes into the ZSET without re-serialising, mirroring + // how dequeue_message.lua byte-copies messages from q-queue into the processing queue. + private RedisTemplate rawTemplate; + + @BeforeEach + void setUp() { + connectionFactory = + new LettuceConnectionFactory(new RedisStandaloneConfiguration("localhost", redisPort)); + connectionFactory.afterPropertiesSet(); + messageTemplate = new RqueueMessageTemplateImpl(connectionFactory, null); + broker = new RedisMessageBroker(messageTemplate); + + rawTemplate = new RedisTemplate<>(); + rawTemplate.setConnectionFactory(connectionFactory); + rawTemplate.setKeySerializer(new StringRedisSerializer()); + rawTemplate.setValueSerializer(RedisSerializer.byteArray()); + rawTemplate.afterPropertiesSet(); + } + + @AfterEach + void tearDown() { + QueueDetail queueDetail = TestUtils.createQueueDetail("test-queue"); + messageTemplate.getRedisTemplate().delete(queueDetail.getProcessingQueueName()); + messageTemplate.getRedisTemplate().delete(queueDetail.getScheduledQueueName()); + connectionFactory.destroy(); + } + + // --- property order: ALPHABETICAL (Jackson 3.x default, RQueue 4.x default) --- + + @Test + void parkForRetry_alphabeticalOrder_4xMessage_isMovedToScheduledQueue() { + QueueDetail queueDetail = TestUtils.createQueueDetail("test-queue"); + + RqueueMessage original = RqueueMessage.builder() + .id("test-msg-alpha-4x-001") + .queueName("test-queue") + .message("{\"payload\":\"test\"}") + .processAt(1_000_000L) + .queuedTime(2_000_000L) + .build(); + + messageTemplate.addToZset( + queueDetail.getProcessingQueueName(), original, System.currentTimeMillis()); + + RqueueMessage updated = original.toBuilder().failureCount(1).build().updateReEnqueuedAt(); + broker.parkForRetry(queueDetail, original, updated, 60_000L); + + assertEquals( + 0L, + messageTemplate.getRedisTemplate().opsForZSet().size(queueDetail.getProcessingQueueName()), + "Processing queue must be empty after parkForRetry"); + assertEquals( + 1L, + messageTemplate.getRedisTemplate().opsForZSet().size(queueDetail.getScheduledQueueName()), + "Scheduled queue must contain the rescheduled message"); + } + + @Test + void ack_alphabeticalOrder_4xMessage_isRemovedFromProcessingQueue() { + QueueDetail queueDetail = TestUtils.createQueueDetail("test-queue"); + + RqueueMessage original = RqueueMessage.builder() + .id("test-msg-alpha-4x-ack-001") + .queueName("test-queue") + .message("{\"payload\":\"test\"}") + .processAt(1_000_000L) + .queuedTime(2_000_000L) + .build(); + + messageTemplate.addToZset( + queueDetail.getProcessingQueueName(), original, System.currentTimeMillis()); + + broker.ack(queueDetail, original); + + assertEquals( + 0L, + messageTemplate.getRedisTemplate().opsForZSet().size(queueDetail.getProcessingQueueName()), + "Processing queue must be empty after ack"); + } + + @Test + void parkForRetry_alphabeticalOrder_v3Message_strandsMessage() throws Exception { + QueueDetail queueDetail = TestUtils.createQueueDetail("test-queue"); + + rawTemplate + .opsForZSet() + .add( + queueDetail.getProcessingQueueName(), + v3Bytes(RqueueMessage.builder() + .id("test-msg-alpha-v3-001") + .queueName("test-queue") + .message("{\"payload\":\"test\"}") + .processAt(1_000_000L) + .queuedTime(2_000_000L) + .build()), + System.currentTimeMillis()); + + RqueueMessage original = RqueueMessage.builder() + .id("test-msg-alpha-v3-001") + .queueName("test-queue") + .message("{\"payload\":\"test\"}") + .processAt(1_000_000L) + .queuedTime(2_000_000L) + .build(); + RqueueMessage updated = original.toBuilder().failureCount(1).build().updateReEnqueuedAt(); + broker.parkForRetry(queueDetail, original, updated, 60_000L); + + assertEquals( + 1L, + messageTemplate.getRedisTemplate().opsForZSet().size(queueDetail.getProcessingQueueName()), + "3.x message must remain stranded under ALPHABETICAL order"); + assertEquals( + 0L, + messageTemplate.getRedisTemplate().opsForZSet().size(queueDetail.getScheduledQueueName()), + "Scheduled queue must be empty: ZSCORE missed due to alphabetical vs declaration mismatch"); + } + + @Test + void ack_alphabeticalOrder_v3Message_strandsMessage() throws Exception { + QueueDetail queueDetail = TestUtils.createQueueDetail("test-queue"); + + RqueueMessage original = RqueueMessage.builder() + .id("test-msg-alpha-v3-ack-001") + .queueName("test-queue") + .message("{\"payload\":\"test\"}") + .processAt(1_000_000L) + .queuedTime(2_000_000L) + .build(); + + rawTemplate + .opsForZSet() + .add(queueDetail.getProcessingQueueName(), v3Bytes(original), System.currentTimeMillis()); + + broker.ack(queueDetail, original); + + assertEquals( + 1L, + messageTemplate.getRedisTemplate().opsForZSet().size(queueDetail.getProcessingQueueName()), + "3.x message must remain stranded under ALPHABETICAL order"); + } + + // --- property order: DECLARATION (declaration order, matching RQueue 3.x) --- + + @Test + void parkForRetry_declarationOrder_v3Message_isMovedToScheduledQueue() throws Exception { + QueueDetail queueDetail = TestUtils.createQueueDetail("test-queue"); + RedisUtils.RedisTemplateProvider saved = RedisUtils.redisTemplateProvider; + try { + RedisMessageBroker declarationBroker = brokerWithOrder(PropertyOrder.DECLARATION); + RqueueMessageTemplateImpl declarationTemplate = templateWithOrder(PropertyOrder.DECLARATION); + + RqueueMessage original = RqueueMessage.builder() + .id("test-msg-v3-001") + .queueName("test-queue") + .message("{\"payload\":\"test\"}") + .processAt(1_000_000L) + .queuedTime(2_000_000L) + .build(); + + rawTemplate + .opsForZSet() + .add(queueDetail.getProcessingQueueName(), v3Bytes(original), System.currentTimeMillis()); + + RqueueMessage updated = original.toBuilder().failureCount(1).build().updateReEnqueuedAt(); + declarationBroker.parkForRetry(queueDetail, original, updated, 60_000L); + + assertEquals( + 0L, + declarationTemplate + .getRedisTemplate() + .opsForZSet() + .size(queueDetail.getProcessingQueueName()), + "Processing queue must be empty after parkForRetry"); + assertEquals( + 1L, + declarationTemplate + .getRedisTemplate() + .opsForZSet() + .size(queueDetail.getScheduledQueueName()), + "Scheduled queue must contain the rescheduled message"); + } finally { + RedisUtils.redisTemplateProvider = saved; + } + } + + @Test + void ack_declarationOrder_v3Message_isRemovedFromProcessingQueue() throws Exception { + QueueDetail queueDetail = TestUtils.createQueueDetail("test-queue"); + RedisUtils.RedisTemplateProvider saved = RedisUtils.redisTemplateProvider; + try { + RedisMessageBroker declarationBroker = brokerWithOrder(PropertyOrder.DECLARATION); + RqueueMessageTemplateImpl declarationTemplate = templateWithOrder(PropertyOrder.DECLARATION); + + RqueueMessage original = RqueueMessage.builder() + .id("test-msg-v3-002") + .queueName("test-queue") + .message("{\"payload\":\"test\"}") + .processAt(1_000_000L) + .queuedTime(2_000_000L) + .build(); + + rawTemplate + .opsForZSet() + .add(queueDetail.getProcessingQueueName(), v3Bytes(original), System.currentTimeMillis()); + + declarationBroker.ack(queueDetail, original); + + assertEquals( + 0L, + declarationTemplate + .getRedisTemplate() + .opsForZSet() + .size(queueDetail.getProcessingQueueName()), + "Processing queue must be empty after ack"); + } finally { + RedisUtils.redisTemplateProvider = saved; + } + } + + @Test + void parkForRetry_declarationOrder_4xAlphabeticalMessage_strandsMessage() { + QueueDetail queueDetail = TestUtils.createQueueDetail("test-queue"); + RedisUtils.RedisTemplateProvider saved = RedisUtils.redisTemplateProvider; + try { + RedisMessageBroker declarationBroker = brokerWithOrder(PropertyOrder.DECLARATION); + RqueueMessageTemplateImpl declarationTemplate = templateWithOrder(PropertyOrder.DECLARATION); + + RqueueMessage original = RqueueMessage.builder() + .id("test-msg-legacy-4x-001") + .queueName("test-queue") + .message("{\"payload\":\"test\"}") + .processAt(1_000_000L) + .queuedTime(2_000_000L) + .build(); + + byte[] v4AlphaBytes = + new RqueueRedisSerializer(PropertyOrder.ALPHABETICAL).serialize(original); + rawTemplate + .opsForZSet() + .add(queueDetail.getProcessingQueueName(), v4AlphaBytes, System.currentTimeMillis()); + + RqueueMessage updated = original.toBuilder().failureCount(1).build().updateReEnqueuedAt(); + declarationBroker.parkForRetry(queueDetail, original, updated, 60_000L); + + assertEquals( + 1L, + declarationTemplate + .getRedisTemplate() + .opsForZSet() + .size(queueDetail.getProcessingQueueName()), + "4.x alphabetical message must remain stranded under DECLARATION order"); + assertEquals( + 0L, + declarationTemplate + .getRedisTemplate() + .opsForZSet() + .size(queueDetail.getScheduledQueueName()), + "Scheduled queue must be empty: ZSCORE missed due to declaration vs alphabetical" + + " mismatch"); + } finally { + RedisUtils.redisTemplateProvider = saved; + } + } + + @Test + void ack_declarationOrder_4xAlphabeticalMessage_strandsMessage() { + QueueDetail queueDetail = TestUtils.createQueueDetail("test-queue"); + RedisUtils.RedisTemplateProvider saved = RedisUtils.redisTemplateProvider; + try { + RedisMessageBroker declarationBroker = brokerWithOrder(PropertyOrder.DECLARATION); + RqueueMessageTemplateImpl declarationTemplate = templateWithOrder(PropertyOrder.DECLARATION); + + RqueueMessage original = RqueueMessage.builder() + .id("test-msg-legacy-4x-ack-001") + .queueName("test-queue") + .message("{\"payload\":\"test\"}") + .processAt(1_000_000L) + .queuedTime(2_000_000L) + .build(); + + byte[] v4AlphaBytes = + new RqueueRedisSerializer(PropertyOrder.ALPHABETICAL).serialize(original); + rawTemplate + .opsForZSet() + .add(queueDetail.getProcessingQueueName(), v4AlphaBytes, System.currentTimeMillis()); + + declarationBroker.ack(queueDetail, original); + + assertEquals( + 1L, + declarationTemplate + .getRedisTemplate() + .opsForZSet() + .size(queueDetail.getProcessingQueueName()), + "4.x alphabetical message must remain stranded under DECLARATION order"); + } finally { + RedisUtils.redisTemplateProvider = saved; + } + } + + private RqueueMessageTemplateImpl templateWithOrder(PropertyOrder order) { + RqueueRedisSerializer serializer = new RqueueRedisSerializer(order); + StringRedisSerializer key = new StringRedisSerializer(); + RedisUtils.redisTemplateProvider = new RedisUtils.RedisTemplateProvider() { + @Override + public org.springframework.data.redis.core.RedisTemplate getRedisTemplate( + org.springframework.data.redis.connection.RedisConnectionFactory factory) { + org.springframework.data.redis.core.RedisTemplate t = + new org.springframework.data.redis.core.RedisTemplate<>(); + t.setConnectionFactory(factory); + t.setKeySerializer(key); + t.setValueSerializer(serializer); + t.setHashKeySerializer(key); + t.setHashValueSerializer(serializer); + return t; + } + }; + return new RqueueMessageTemplateImpl(connectionFactory, null); + } + + private RedisMessageBroker brokerWithOrder(PropertyOrder order) { + return new RedisMessageBroker(templateWithOrder(order)); + } + + // Reproduce the bytes RQueue 3.x would have stored using the actual Jackson 2.x ObjectMapper + // so the serialised form stays in sync with RqueueMessage field changes automatically. + private static byte[] v3Bytes(RqueueMessage message) throws Exception { + com.fasterxml.jackson.databind.ObjectMapper jackson2 = + new com.fasterxml.jackson.databind.ObjectMapper() + .configure( + com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, + false); + //noinspection deprecation — mirrors the API RQueue 3.x RqueueRedisSerDes used + jackson2.enableDefaultTyping( + com.fasterxml.jackson.databind.ObjectMapper.DefaultTyping.NON_FINAL, + com.fasterxml.jackson.annotation.JsonTypeInfo.As.PROPERTY); + return jackson2.writeValueAsBytes(message); + } +}