diff --git a/spring-integration-core/src/main/java/org/springframework/integration/util/CheckedFunction.java b/spring-integration-core/src/main/java/org/springframework/integration/util/CheckedFunction.java new file mode 100644 index 00000000000..d6572becdc2 --- /dev/null +++ b/spring-integration-core/src/main/java/org/springframework/integration/util/CheckedFunction.java @@ -0,0 +1,55 @@ +/* + * Copyright 2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.util; + +import java.util.function.Function; + +/** + * A Function-like interface which allows throwing Error. + * + * @param the input type. + * @param the output type. + * + * @author Artem Bilan + * + * @since 6.1 + */ +@FunctionalInterface +public interface CheckedFunction { + + R apply(T t) throws Throwable; // NOSONAR + + default Function unchecked() { + return t1 -> { + try { + return apply(t1); + } + catch (Throwable t) { // NOSONAR + if (t instanceof RuntimeException runtimeException) { + throw runtimeException; + } + else if (t instanceof Error error) { + throw error; + } + else { + throw new IllegalStateException(t); + } + } + }; + } + +} diff --git a/spring-integration-jms/src/main/java/org/springframework/integration/jms/ChannelPublishingJmsMessageListener.java b/spring-integration-jms/src/main/java/org/springframework/integration/jms/ChannelPublishingJmsMessageListener.java index ef427eb2f76..6788346dbef 100644 --- a/spring-integration-jms/src/main/java/org/springframework/integration/jms/ChannelPublishingJmsMessageListener.java +++ b/spring-integration-jms/src/main/java/org/springframework/integration/jms/ChannelPublishingJmsMessageListener.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -30,7 +30,10 @@ import org.springframework.beans.factory.BeanFactoryAware; import org.springframework.beans.factory.InitializingBean; import org.springframework.core.log.LogAccessor; +import org.springframework.expression.Expression; +import org.springframework.expression.spel.support.StandardEvaluationContext; import org.springframework.integration.core.MessagingTemplate; +import org.springframework.integration.expression.ExpressionUtils; import org.springframework.integration.gateway.MessagingGatewaySupport; import org.springframework.integration.support.DefaultMessageBuilderFactory; import org.springframework.integration.support.MessageBuilderFactory; @@ -42,6 +45,7 @@ import org.springframework.jms.support.converter.SimpleMessageConverter; import org.springframework.jms.support.destination.DestinationResolver; import org.springframework.jms.support.destination.DynamicDestinationResolver; +import org.springframework.lang.Nullable; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessagingException; @@ -90,12 +94,16 @@ public class ChannelPublishingJmsMessageListener private DestinationResolver destinationResolver = new DynamicDestinationResolver(); + private Expression replyToExpression; + private JmsHeaderMapper headerMapper = new DefaultJmsHeaderMapper(); private BeanFactory beanFactory; private MessageBuilderFactory messageBuilderFactory = new DefaultMessageBuilderFactory(); + private StandardEvaluationContext evaluationContext; + /** * Specify whether a JMS reply Message is expected. * @param expectReply true if a reply is expected. @@ -261,6 +269,17 @@ public void setDestinationResolver(DestinationResolver destinationResolver) { this.destinationResolver = destinationResolver; } + /** + * Set a SpEL expression to resolve a 'replyTo' destination from a request + * {@link jakarta.jms.Message} as a root evaluation object + * if {@link jakarta.jms.Message#getJMSReplyTo()} is null. + * @param replyToExpression the SpEL expression for 'replyTo' destination. + * @since 6.1 + */ + public void setReplyToExpression(Expression replyToExpression) { + this.replyToExpression = replyToExpression; + } + /** * Provide a {@link MessageConverter} implementation to use when * converting between JMS Messages and Spring Integration Messages. @@ -382,6 +401,7 @@ public void afterPropertiesSet() { } this.gatewayDelegate.afterPropertiesSet(); this.messageBuilderFactory = IntegrationUtils.getMessageBuilderFactory(this.beanFactory); + this.evaluationContext = ExpressionUtils.createStandardEvaluationContext(this.beanFactory); } protected void start() { @@ -417,21 +437,22 @@ else if (replyMessage.getJMSCorrelationID() == null) { /** * Determine a reply destination for the given message. - *

This implementation first checks the boolean 'error' flag which signifies that the reply is an error message. - * If reply is not an error it will first check the JMS Reply-To {@link Destination} of the supplied request message; - * if that is not null it is returned; if it is null, then the configured - * {@link #resolveDefaultReplyDestination default reply destination} is returned; if this too is null, + * It will first check the JMS Reply-To {@link Destination} + * of the supplied request message; + * if that is null, then the configured {@link #replyToExpression} is evaluated + * (if any), then a{@link #resolveDefaultReplyDestination default reply destination} + * is returned; if this too is null, * then an {@link InvalidDestinationException} is thrown. * @param request the original incoming JMS message * @param session the JMS Session to operate on - * @return the reply destination (never null) + * @return the reply destination (never null) * @throws JMSException if thrown by JMS API methods * @throws InvalidDestinationException if no {@link Destination} can be determined * @see #setDefaultReplyDestination * @see jakarta.jms.Message#getJMSReplyTo() */ private Destination getReplyDestination(jakarta.jms.Message request, Session session) throws JMSException { - Destination replyTo = request.getJMSReplyTo(); + Destination replyTo = resolveReplyTo(request, session); if (replyTo == null) { replyTo = resolveDefaultReplyDestination(session); if (replyTo == null) { @@ -440,6 +461,24 @@ private Destination getReplyDestination(jakarta.jms.Message request, Session ses } } return replyTo; + + } + + @Nullable + private Destination resolveReplyTo(jakarta.jms.Message request, Session session) throws JMSException { + Destination replyTo = request.getJMSReplyTo(); + if (replyTo == null) { + if (this.replyToExpression != null) { + Object replyToValue = this.replyToExpression.getValue(this.evaluationContext, request); + if (replyToValue instanceof Destination destination) { + return destination; + } + else if (replyToValue instanceof String destinationName) { + return this.destinationResolver.resolveDestinationName(session, destinationName, false); + } + } + } + return replyTo; } /** diff --git a/spring-integration-jms/src/main/java/org/springframework/integration/jms/JmsInboundGateway.java b/spring-integration-jms/src/main/java/org/springframework/integration/jms/JmsInboundGateway.java index 0fbd88d92ee..4ade7d7976b 100644 --- a/spring-integration-jms/src/main/java/org/springframework/integration/jms/JmsInboundGateway.java +++ b/spring-integration-jms/src/main/java/org/springframework/integration/jms/JmsInboundGateway.java @@ -103,9 +103,9 @@ public void setShouldTrack(boolean shouldTrack) { } /** - * Set to false to prevent listener container shutdown when the endpoint is stopped. + * Set to {@code false} to prevent listener container shutdown when the endpoint is stopped. * Then, if so configured, any cached consumer(s) in the container will remain. - * Otherwise the shared connection and will be closed and the listener invokers shut + * Otherwise, the shared connection and will be closed and the listener invokers shut * down; this behavior is new starting with version 5.1. Default: true. * @param shutdownContainerOnStop false to not shutdown. * @since 5.1 diff --git a/spring-integration-jms/src/main/java/org/springframework/integration/jms/dsl/JmsInboundGatewaySpec.java b/spring-integration-jms/src/main/java/org/springframework/integration/jms/dsl/JmsInboundGatewaySpec.java index 935e4a8d079..f9539f4bbc1 100644 --- a/spring-integration-jms/src/main/java/org/springframework/integration/jms/dsl/JmsInboundGatewaySpec.java +++ b/spring-integration-jms/src/main/java/org/springframework/integration/jms/dsl/JmsInboundGatewaySpec.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2022 the original author or authors. + * Copyright 2016-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,11 +19,15 @@ import java.util.function.Consumer; import jakarta.jms.Destination; +import jakarta.jms.Message; +import org.springframework.expression.Expression; import org.springframework.integration.dsl.MessagingGatewaySpec; +import org.springframework.integration.expression.FunctionExpression; import org.springframework.integration.jms.ChannelPublishingJmsMessageListener; import org.springframework.integration.jms.JmsHeaderMapper; import org.springframework.integration.jms.JmsInboundGateway; +import org.springframework.integration.util.CheckedFunction; import org.springframework.jms.listener.AbstractMessageListenerContainer; import org.springframework.jms.support.converter.MessageConverter; import org.springframework.jms.support.destination.DestinationResolver; @@ -137,6 +141,44 @@ public S destinationResolver(DestinationResolver destinationResolver) { return _this(); } + + /** + * Set a SpEL expression to resolve a 'replyTo' destination from a request {@link jakarta.jms.Message} + * as a root evaluation object if {@link jakarta.jms.Message#getJMSReplyTo()} is null. + * @param replyToExpression the SpEL expression for 'replyTo' destination. + * @return the spec. + * @since 6.1 + * @see ChannelPublishingJmsMessageListener#setReplyToExpression(Expression) + */ + public S replyToExpression(String replyToExpression) { + return replyToExpression(PARSER.parseExpression(replyToExpression)); + } + + /** + * Set a function to resolve a 'replyTo' destination from a request {@link jakarta.jms.Message} + * as a root evaluation object if {@link jakarta.jms.Message#getJMSReplyTo()} is null. + * @param replyToFunction the function for 'replyTo' destination. + * @return the spec. + * @since 6.1 + * @see ChannelPublishingJmsMessageListener#setReplyToExpression(Expression) + */ + public S replyToFunction(CheckedFunction replyToFunction) { + return replyToExpression(new FunctionExpression<>(replyToFunction.unchecked())); + } + + /** + * Set a SpEL expression to resolve a 'replyTo' destination from a request {@link jakarta.jms.Message} + * as a root evaluation object if {@link jakarta.jms.Message#getJMSReplyTo()} is null. + * @param replyToExpression the SpEL expression for 'replyTo' destination. + * @return the spec. + * @since 6.1 + * @see ChannelPublishingJmsMessageListener#setReplyToExpression(Expression) + */ + public S replyToExpression(Expression replyToExpression) { + this.target.getListener().setReplyToExpression(replyToExpression); + return _this(); + } + /** * @param messageConverter the messageConverter. * @return the spec. diff --git a/spring-integration-jms/src/test/java/org/springframework/integration/jms/dsl/JmsTests.java b/spring-integration-jms/src/test/java/org/springframework/integration/jms/dsl/JmsTests.java index c394c4f9670..e50d4c44506 100644 --- a/spring-integration-jms/src/test/java/org/springframework/integration/jms/dsl/JmsTests.java +++ b/spring-integration-jms/src/test/java/org/springframework/integration/jms/dsl/JmsTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2022 the original author or authors. + * Copyright 2016-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,6 +22,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import jakarta.jms.JMSException; +import jakarta.jms.TextMessage; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.ListableBeanFactory; @@ -137,6 +139,9 @@ public class JmsTests extends ActiveMQMultiContextTests { @Autowired private CountDownLatch redeliveryLatch; + @Autowired + JmsTemplate jmsTemplate; + @Test public void testPollingFlow() { this.controlBus.send("@'integerMessageSource.inboundChannelAdapter'.start()"); @@ -264,6 +269,19 @@ public void testJmsRedeliveryFlow() throws InterruptedException { this.jmsMessageDrivenRedeliveryFlowContainer.stop(); } + @Test + public void customReplyToHeader() throws JMSException { + this.jmsTemplate.send("jmsPipelineTest", session -> { + TextMessage message = session.createTextMessage("test data"); + message.setStringProperty("myReplyTo", "replyToQueue"); + return message; + }); + + jakarta.jms.Message replyMessage = this.jmsTemplate.receive("replyToQueue"); + assertThat(replyMessage).isNotNull(); + assertThat(replyMessage.getBody(String.class)).isEqualTo("TEST DATA"); + } + @MessagingGateway(defaultRequestChannel = "controlBus.input") private interface ControlBusGateway { @@ -278,7 +296,9 @@ public static class ContextConfiguration { @Bean public JmsTemplate jmsTemplate() { - return new JmsTemplate(connectionFactory); + JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory); + jmsTemplate.setReceiveTimeout(10_000); + return jmsTemplate; } @Bean(name = PollerMetadata.DEFAULT_POLLER) @@ -422,6 +442,7 @@ protected Object handleRequestMessage(Message requestMessage) { })) .requestDestination("jmsPipelineTest") + .replyToFunction(message -> message.getStringProperty("myReplyTo")) .configureListenerContainer(c -> c.transactionManager(mock(PlatformTransactionManager.class)))) .filter(payload -> !"junk".equals(payload)) diff --git a/src/reference/asciidoc/jms.adoc b/src/reference/asciidoc/jms.adoc index 137da02e922..58c07fd1d22 100644 --- a/src/reference/asciidoc/jms.adoc +++ b/src/reference/asciidoc/jms.adoc @@ -411,11 +411,32 @@ Starting with version 5.1, when the endpoint is stopped while the application re Previously, the connection and consumers remained open. To revert to the previous behavior, set the `shutdownContainerOnStop` on the `JmsInboundGateway` to `false`. +By default, the `JmsInboundGateway` looks for a `jakarta.jms.Message.getJMSReplyTo()` property in the received message to determine where to send a reply. +Otherwise, it can be configured with a static `defaultReplyDestination`, or `defaultReplyQueueName` or `defaultReplyTopicName`. +In addition, starting with version 6.1, a `replyToExpression` can be configured on a provided `ChannelPublishingJmsMessageListener` to determine the reply destination dynamically, if the standard `JMSReplyTo` property is `null` on the request. +The received `jakarta.jms.Message` is used the root evaluation context object. +The following example demonstrates how to use Java DSL API to configure an inbound JMS gateway with a custom reply destination resolved from the request message: + +==== +[source,java] +---- +@Bean +public IntegrationFlow jmsInboundGatewayFlow(ConnectionFactory connectionFactory) { + return IntegrationFlow.from( + Jms.inboundGateway(connectionFactory) + .requestDestination("requestDestination") + .replyToFunction(message -> message.getStringProperty("myReplyTo"))) + .transform(String::toUpperCase) + .get(); +} +---- +==== + [[jms-outbound-gateway]] === Outbound Gateway -The outbound gateway creates JMS messages from Spring Integration messages and sends them to a 'request-destination'. -It then handles the JMS reply message either by using a selector to receive from the 'reply-destination' that you configure or, if no 'reply-destination' is provided, by creating JMS `TemporaryQueue` (or `TemporaryTopic` if `replyPubSubDomain= true`) instances. +The outbound gateway creates JMS messages from Spring Integration messages and sends them to a `request-destination`. +It then handles the JMS reply message either by using a selector to receive from the `reply-destination` that you configure or, if no `reply-destination` is provided, by creating JMS `TemporaryQueue` (or `TemporaryTopic` if `replyPubSubDomain= true`) instances. [[jms-outbound-gateway-memory-caution]] [CAUTION] diff --git a/src/reference/asciidoc/whats-new.adoc b/src/reference/asciidoc/whats-new.adoc index 3ed5ce789c5..9caae84fe3b 100644 --- a/src/reference/asciidoc/whats-new.adoc +++ b/src/reference/asciidoc/whats-new.adoc @@ -27,9 +27,14 @@ See <<./zip.adoc#zip,Zip Support>> for more information. [[x6.1-general]] === General Changes - [[x6.1-web-sockets]] === Web Sockets Changes A `ClientWebSocketContainer` can now be configured with a predefined `URI` instead of a combination of `uriTemplate` and `uriVariables`. See <<./web-sockets.adoc#web-socket-overview, WebSocket Overview>> for more information. + +[[x6.1-jms]] +=== JMS Changes + +The `JmsInboundGateway`, via its `ChannelPublishingJmsMessageListener`, can now be configured with a `replyToExpression` to resolve a reply destination against the request message at runtime. +See <<./jms.adoc#jms-inbound-gateway, JMS Inbound Gateway>> for more information.