diff --git a/spring-amqp/src/main/java/org/springframework/amqp/core/MessageProperties.java b/spring-amqp/src/main/java/org/springframework/amqp/core/MessageProperties.java index eba31fb895..f999acc0a3 100644 --- a/spring-amqp/src/main/java/org/springframework/amqp/core/MessageProperties.java +++ b/spring-amqp/src/main/java/org/springframework/amqp/core/MessageProperties.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,6 +24,8 @@ import java.util.List; import java.util.Map; +import org.springframework.util.Assert; + /** * Message Properties for an AMQP message. * @@ -33,6 +35,7 @@ * @author Dmitry Chernyshov * @author Artem Bilan * @author Csaba Soti + * @author Raylax Grey */ public class MessageProperties implements Serializable { @@ -66,6 +69,12 @@ public class MessageProperties implements Serializable { public static final Integer DEFAULT_PRIORITY = 0; + /** + * The maximum value of x-delay header. + * @since 3.1.2 + */ + public static final long X_DELAY_MAX = 0xffffffffL; + private final Map headers = new HashMap<>(); private Date timestamp; @@ -118,7 +127,7 @@ public class MessageProperties implements Serializable { private String consumerQueue; - private Integer receivedDelay; + private Long receivedDelay; private MessageDeliveryMode receivedDeliveryMode; @@ -352,10 +361,13 @@ public String getReceivedRoutingKey() { * received message contains the delay. * @return the received delay. * @since 1.6 + * @deprecated in favor of {@link #getReceivedDelayLong()} * @see #getDelay() */ + @Deprecated(since = "3.1.2", forRemoval = true) public Integer getReceivedDelay() { - return this.receivedDelay; + Long receivedDelay = getReceivedDelayLong(); + return receivedDelay != null ? Math.toIntExact(receivedDelay) : null; } /** @@ -363,8 +375,32 @@ public Integer getReceivedDelay() { * received message contains the delay. * @param receivedDelay the received delay. * @since 1.6 + * @deprecated in favor of {@link #setReceivedDelayLong(Long)} */ + @Deprecated(since = "3.1.2", forRemoval = true) public void setReceivedDelay(Integer receivedDelay) { + setReceivedDelayLong(receivedDelay != null ? receivedDelay.longValue() : null); + } + + /** + * When a delayed message exchange is used the x-delay header on a + * received message contains the delay. + * @return the received delay. + * @since 3.1.2 + * @see #getDelayLong() + */ + public Long getReceivedDelayLong() { + return this.receivedDelay; + } + + /** + * When a delayed message exchange is used the x-delay header on a + * received message contains the delay. + * @param receivedDelay the received delay. + * @since 3.1.2 + * @see #setDelayLong(Long) + */ + public void setReceivedDelayLong(Long receivedDelay) { this.receivedDelay = receivedDelay; } @@ -434,12 +470,35 @@ public void setConsumerQueue(String consumerQueue) { * The x-delay header (outbound). * @return the delay. * @since 1.6 + * @deprecated in favor of {@link #getDelayLong()} * @see #getReceivedDelay() */ + @Deprecated(since = "3.1.2", forRemoval = true) public Integer getDelay() { + Long delay = getDelayLong(); + return delay != null ? Math.toIntExact(delay) : null; + } + + /** + * Set the x-delay header. + * @param delay the delay. + * @since 1.6 + * @deprecated in favor of {@link #setDelayLong(Long)} + */ + @Deprecated(since = "3.1.2", forRemoval = true) + public void setDelay(Integer delay) { + setDelayLong(delay != null ? delay.longValue() : null); + } + + /** + * Get the x-delay header long value. + * @return the delay. + * @since 3.1.2 + */ + public Long getDelayLong() { Object delay = this.headers.get(X_DELAY); - if (delay instanceof Integer) { - return (Integer) delay; + if (delay instanceof Long) { + return (Long) delay; } else { return null; @@ -447,17 +506,18 @@ public Integer getDelay() { } /** - * Set the x-delay header. + * Set the x-delay header to a long value. * @param delay the delay. - * @since 1.6 + * @since 3.1.2 */ - public void setDelay(Integer delay) { + public void setDelayLong(Long delay) { if (delay == null || delay < 0) { this.headers.remove(X_DELAY); + return; } - else { - this.headers.put(X_DELAY, delay); - } + + Assert.isTrue(delay <= X_DELAY_MAX, "Delay cannot exceed " + X_DELAY_MAX); + this.headers.put(X_DELAY, delay); } public boolean isFinalRetryForMessageWithNoId() { diff --git a/spring-amqp/src/main/java/org/springframework/amqp/support/SimpleAmqpHeaderMapper.java b/spring-amqp/src/main/java/org/springframework/amqp/support/SimpleAmqpHeaderMapper.java index 3031d3d87c..51af201fa8 100644 --- a/spring-amqp/src/main/java/org/springframework/amqp/support/SimpleAmqpHeaderMapper.java +++ b/spring-amqp/src/main/java/org/springframework/amqp/support/SimpleAmqpHeaderMapper.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -48,6 +48,7 @@ * @author Gary Russell * @author Artem Bilan * @author Stephane Nicoll + * @author Raylax Grey * @since 1.4 */ public class SimpleAmqpHeaderMapper extends AbstractHeaderMapper implements AmqpHeaderMapper { @@ -69,8 +70,8 @@ public void fromHeaders(MessageHeaders headers, MessageProperties amqpMessagePro amqpMessageProperties.setCorrelationId((String) correlationId); } javaUtils - .acceptIfNotNull(getHeaderIfAvailable(headers, AmqpHeaders.DELAY, Integer.class), - amqpMessageProperties::setDelay) + .acceptIfNotNull(getHeaderIfAvailable(headers, AmqpHeaders.DELAY, Long.class), + amqpMessageProperties::setDelayLong) .acceptIfNotNull(getHeaderIfAvailable(headers, AmqpHeaders.DELIVERY_MODE, MessageDeliveryMode.class), amqpMessageProperties::setDeliveryMode) .acceptIfNotNull(getHeaderIfAvailable(headers, AmqpHeaders.DELIVERY_TAG, Long.class), @@ -150,7 +151,7 @@ public MessageHeaders toHeaders(MessageProperties amqpMessageProperties) { javaUtils .acceptIfCondition(priority != null && priority > 0, AmqpMessageHeaderAccessor.PRIORITY, priority, putObject) - .acceptIfNotNull(AmqpHeaders.RECEIVED_DELAY, amqpMessageProperties.getReceivedDelay(), putObject) + .acceptIfNotNull(AmqpHeaders.RECEIVED_DELAY, amqpMessageProperties.getReceivedDelayLong(), putObject) .acceptIfHasText(AmqpHeaders.RECEIVED_EXCHANGE, amqpMessageProperties.getReceivedExchange(), putString) .acceptIfHasText(AmqpHeaders.RECEIVED_ROUTING_KEY, amqpMessageProperties.getReceivedRoutingKey(), diff --git a/spring-amqp/src/test/java/org/springframework/amqp/core/MessagePropertiesTests.java b/spring-amqp/src/test/java/org/springframework/amqp/core/MessagePropertiesTests.java index 6a1df9d995..2d080e8ea4 100644 --- a/spring-amqp/src/test/java/org/springframework/amqp/core/MessagePropertiesTests.java +++ b/spring-amqp/src/test/java/org/springframework/amqp/core/MessagePropertiesTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -30,6 +30,7 @@ * @author Artem Bilan * @author Gary Russell * @author Csaba Soti + * @author Raylax Grey * */ public class MessagePropertiesTests { @@ -53,10 +54,10 @@ public void testReplyToNullByDefault() { @Test public void testDelayHeader() { MessageProperties properties = new MessageProperties(); - Integer delay = 100; - properties.setDelay(delay); + Long delay = 100L; + properties.setDelayLong(delay); assertThat(properties.getHeaders().get(MessageProperties.X_DELAY)).isEqualTo(delay); - properties.setDelay(null); + properties.setDelayLong(null); assertThat(properties.getHeaders().containsKey(MessageProperties.X_DELAY)).isFalse(); } diff --git a/spring-amqp/src/test/java/org/springframework/amqp/support/SimpleAmqpHeaderMapperTests.java b/spring-amqp/src/test/java/org/springframework/amqp/support/SimpleAmqpHeaderMapperTests.java index 2e9c768279..20967652f3 100644 --- a/spring-amqp/src/test/java/org/springframework/amqp/support/SimpleAmqpHeaderMapperTests.java +++ b/spring-amqp/src/test/java/org/springframework/amqp/support/SimpleAmqpHeaderMapperTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,6 +17,7 @@ package org.springframework.amqp.support; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.assertj.core.api.Assertions.fail; import java.util.Date; @@ -37,6 +38,7 @@ * @author Mark Fisher * @author Gary Russell * @author Oleg Zhurakousky + * @author Raylax Grey */ public class SimpleAmqpHeaderMapperTests { @@ -51,7 +53,7 @@ public void fromHeaders() { headerMap.put(AmqpHeaders.CONTENT_TYPE, "test.contentType"); String testCorrelationId = "foo"; headerMap.put(AmqpHeaders.CORRELATION_ID, testCorrelationId); - headerMap.put(AmqpHeaders.DELAY, 1234); + headerMap.put(AmqpHeaders.DELAY, 1234L); headerMap.put(AmqpHeaders.DELIVERY_MODE, MessageDeliveryMode.NON_PERSISTENT); headerMap.put(AmqpHeaders.DELIVERY_TAG, 1234L); headerMap.put(AmqpHeaders.EXPIRATION, "test.expiration"); @@ -92,9 +94,35 @@ public void fromHeaders() { assertThat(amqpProperties.getTimestamp()).isEqualTo(testTimestamp); assertThat(amqpProperties.getType()).isEqualTo("test.type"); assertThat(amqpProperties.getUserId()).isEqualTo("test.userId"); - assertThat(amqpProperties.getDelay()).isEqualTo(Integer.valueOf(1234)); + assertThat(amqpProperties.getDelayLong()).isEqualTo(Long.valueOf(1234)); } + @Test + public void fromHeadersWithLongDelay() { + SimpleAmqpHeaderMapper headerMapper = new SimpleAmqpHeaderMapper(); + Map headerMap = new HashMap<>(); + headerMap.put(AmqpHeaders.DELAY, 1234L); + MessageHeaders messageHeaders = new MessageHeaders(headerMap); + MessageProperties amqpProperties = new MessageProperties(); + headerMapper.fromHeaders(messageHeaders, amqpProperties); + assertThat(amqpProperties.getDelayLong()).isEqualTo(Long.valueOf(1234)); + + amqpProperties.setDelayLong(5678L); + assertThat(amqpProperties.getDelayLong()).isEqualTo(Long.valueOf(5678)); + + amqpProperties.setDelayLong(null); + assertThat(amqpProperties.getHeaders().containsKey(AmqpHeaders.DELAY)).isFalse(); + + amqpProperties.setDelayLong(MessageProperties.X_DELAY_MAX); + assertThat(amqpProperties.getDelayLong()).isEqualTo(Long.valueOf(MessageProperties.X_DELAY_MAX)); + + assertThatThrownBy(() -> amqpProperties.setDelayLong(MessageProperties.X_DELAY_MAX + 1)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Delay cannot exceed"); + + } + + @Test public void fromHeadersWithContentTypeAsMediaType() { SimpleAmqpHeaderMapper headerMapper = new SimpleAmqpHeaderMapper(); @@ -126,7 +154,7 @@ public void toHeaders() { amqpProperties.setMessageCount(42); amqpProperties.setMessageId("test.messageId"); amqpProperties.setPriority(22); - amqpProperties.setReceivedDelay(1234); + amqpProperties.setReceivedDelayLong(1234L); amqpProperties.setReceivedExchange("test.receivedExchange"); amqpProperties.setReceivedRoutingKey("test.receivedRoutingKey"); amqpProperties.setRedelivered(true); @@ -151,7 +179,7 @@ public void toHeaders() { assertThat(headerMap.get(AmqpHeaders.EXPIRATION)).isEqualTo("test.expiration"); assertThat(headerMap.get(AmqpHeaders.MESSAGE_COUNT)).isEqualTo(42); assertThat(headerMap.get(AmqpHeaders.MESSAGE_ID)).isEqualTo("test.messageId"); - assertThat(headerMap.get(AmqpHeaders.RECEIVED_DELAY)).isEqualTo(1234); + assertThat(headerMap.get(AmqpHeaders.RECEIVED_DELAY)).isEqualTo(1234L); assertThat(headerMap.get(AmqpHeaders.RECEIVED_EXCHANGE)).isEqualTo("test.receivedExchange"); assertThat(headerMap.get(AmqpHeaders.RECEIVED_ROUTING_KEY)).isEqualTo("test.receivedRoutingKey"); assertThat(headerMap.get(AmqpHeaders.REPLY_TO)).isEqualTo("test.replyTo"); diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/support/DefaultMessagePropertiesConverter.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/support/DefaultMessagePropertiesConverter.java index 72053d2846..84c29dfeb7 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/support/DefaultMessagePropertiesConverter.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/support/DefaultMessagePropertiesConverter.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -40,6 +40,7 @@ * @author Mark Fisher * @author Gary Russell * @author Soeren Unruh + * @author Raylax Grey * @since 1.0 */ public class DefaultMessagePropertiesConverter implements MessagePropertiesConverter { @@ -92,8 +93,14 @@ public MessageProperties toMessageProperties(final BasicProperties source, final String key = entry.getKey(); if (MessageProperties.X_DELAY.equals(key)) { Object value = entry.getValue(); - if (value instanceof Integer integ) { - target.setReceivedDelay(integ); + if (value instanceof Integer intValue) { + long receivedDelayLongValue = intValue.longValue(); + target.setReceivedDelayLong(receivedDelayLongValue); + target.setHeader(key, receivedDelayLongValue); + } + else if (value instanceof Long longVal) { + target.setReceivedDelayLong(longVal); + target.setHeader(key, longVal); } } else { diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitAdminIntegrationTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitAdminIntegrationTests.java index abf8f76cb3..71d9792a2e 100644 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitAdminIntegrationTests.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitAdminIntegrationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -59,6 +59,7 @@ * @author Gary Russell * @author Gunnar Hillert * @author Artem Bilan + * @author Raylax Grey */ @RabbitAvailable(management = true) public class RabbitAdminIntegrationTests extends NeedsManagementTests { @@ -397,20 +398,20 @@ public void testDeclareDelayedExchange() throws Exception { RabbitTemplate template = new RabbitTemplate(this.connectionFactory); template.setReceiveTimeout(10000); template.convertAndSend(exchangeName, queue.getName(), "foo", message -> { - message.getMessageProperties().setDelay(1000); + message.getMessageProperties().setDelayLong(1000L); return message; }); MessageProperties properties = new MessageProperties(); - properties.setDelay(500); + properties.setDelayLong(500L); template.send(exchangeName, queue.getName(), MessageBuilder.withBody("foo".getBytes()).andProperties(properties).build()); long t1 = System.currentTimeMillis(); Message received = template.receive(queue.getName()); assertThat(received).isNotNull(); - assertThat(received.getMessageProperties().getReceivedDelay()).isEqualTo(Integer.valueOf(500)); + assertThat(received.getMessageProperties().getDelayLong()).isEqualTo(Long.valueOf(500L)); received = template.receive(queue.getName()); assertThat(received).isNotNull(); - assertThat(received.getMessageProperties().getReceivedDelay()).isEqualTo(Integer.valueOf(1000)); + assertThat(received.getMessageProperties().getDelayLong()).isEqualTo(Long.valueOf(1000L)); assertThat(System.currentTimeMillis() - t1).isGreaterThan(950L); Map exchange2 = getExchange(exchangeName);