Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright 2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.util;

import java.util.function.Function;

/**
* A Function-like interface which allows throwing Error.
*
* @param <T> the input type.
* @param <R> the output type.
*
* @author Artem Bilan
*
* @since 6.1
*/
@FunctionalInterface
public interface CheckedFunction<T, R> {

R apply(T t) throws Throwable; // NOSONAR

default Function<T, R> unchecked() {
return t1 -> {
try {
return apply(t1);
}
catch (Throwable t) { // NOSONAR
if (t instanceof RuntimeException runtimeException) {
throw runtimeException;
}
else if (t instanceof Error error) {
throw error;
}
else {
throw new IllegalStateException(t);
}
}
};
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not really comfortable with institutionalizing this JVM abuse. Is there no alternative?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. Will it be better if there is no that unchecked() then?
Something like plain:

@FunctionalInterface
public interface CheckedFunction<T, R> {

	R apply(T t) throws Throwable;

}

?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right; that's the bit I was objecting to. You probebly need a // NOSONAR here too.


}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -30,7 +30,10 @@
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.core.log.LogAccessor;
import org.springframework.expression.Expression;
import org.springframework.expression.spel.support.StandardEvaluationContext;
import org.springframework.integration.core.MessagingTemplate;
import org.springframework.integration.expression.ExpressionUtils;
import org.springframework.integration.gateway.MessagingGatewaySupport;
import org.springframework.integration.support.DefaultMessageBuilderFactory;
import org.springframework.integration.support.MessageBuilderFactory;
Expand All @@ -42,6 +45,7 @@
import org.springframework.jms.support.converter.SimpleMessageConverter;
import org.springframework.jms.support.destination.DestinationResolver;
import org.springframework.jms.support.destination.DynamicDestinationResolver;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessagingException;
Expand Down Expand Up @@ -90,12 +94,16 @@ public class ChannelPublishingJmsMessageListener

private DestinationResolver destinationResolver = new DynamicDestinationResolver();

private Expression replyToExpression;

private JmsHeaderMapper headerMapper = new DefaultJmsHeaderMapper();

private BeanFactory beanFactory;

private MessageBuilderFactory messageBuilderFactory = new DefaultMessageBuilderFactory();

private StandardEvaluationContext evaluationContext;

/**
* Specify whether a JMS reply Message is expected.
* @param expectReply true if a reply is expected.
Expand Down Expand Up @@ -261,6 +269,17 @@ public void setDestinationResolver(DestinationResolver destinationResolver) {
this.destinationResolver = destinationResolver;
}

/**
* Set a SpEL expression to resolve a 'replyTo' destination from a request
* {@link jakarta.jms.Message} as a root evaluation object
* if {@link jakarta.jms.Message#getJMSReplyTo()} is null.
* @param replyToExpression the SpEL expression for 'replyTo' destination.
* @since 6.1
*/
public void setReplyToExpression(Expression replyToExpression) {
this.replyToExpression = replyToExpression;
}

/**
* Provide a {@link MessageConverter} implementation to use when
* converting between JMS Messages and Spring Integration Messages.
Expand Down Expand Up @@ -382,6 +401,7 @@ public void afterPropertiesSet() {
}
this.gatewayDelegate.afterPropertiesSet();
this.messageBuilderFactory = IntegrationUtils.getMessageBuilderFactory(this.beanFactory);
this.evaluationContext = ExpressionUtils.createStandardEvaluationContext(this.beanFactory);
}

protected void start() {
Expand Down Expand Up @@ -417,21 +437,22 @@ else if (replyMessage.getJMSCorrelationID() == null) {

/**
* Determine a reply destination for the given message.
* <p> This implementation first checks the boolean 'error' flag which signifies that the reply is an error message.
* If reply is not an error it will first check the JMS Reply-To {@link Destination} of the supplied request message;
* if that is not <code>null</code> it is returned; if it is <code>null</code>, then the configured
* {@link #resolveDefaultReplyDestination default reply destination} is returned; if this too is <code>null</code>,
* It will first check the JMS Reply-To {@link Destination}
* of the supplied request message;
* if that is null, then the configured {@link #replyToExpression} is evaluated
* (if any), then a{@link #resolveDefaultReplyDestination default reply destination}
* is returned; if this too is null,
* then an {@link InvalidDestinationException} is thrown.
* @param request the original incoming JMS message
* @param session the JMS Session to operate on
* @return the reply destination (never <code>null</code>)
* @return the reply destination (never null)
* @throws JMSException if thrown by JMS API methods
* @throws InvalidDestinationException if no {@link Destination} can be determined
* @see #setDefaultReplyDestination
* @see jakarta.jms.Message#getJMSReplyTo()
*/
private Destination getReplyDestination(jakarta.jms.Message request, Session session) throws JMSException {
Destination replyTo = request.getJMSReplyTo();
Destination replyTo = resolveReplyTo(request, session);
if (replyTo == null) {
replyTo = resolveDefaultReplyDestination(session);
if (replyTo == null) {
Expand All @@ -440,6 +461,24 @@ private Destination getReplyDestination(jakarta.jms.Message request, Session ses
}
}
return replyTo;

}

@Nullable
private Destination resolveReplyTo(jakarta.jms.Message request, Session session) throws JMSException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

protected so subclasses can override?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so. Then it is probably better to make the whole getReplyDestination() as protected.
And then we will dive to a mess what is protected and what is not and can be accessed from the inheritors.
If you wish, we can revise this in a separate issue.

Destination replyTo = request.getJMSReplyTo();
if (replyTo == null) {
if (this.replyToExpression != null) {
Object replyToValue = this.replyToExpression.getValue(this.evaluationContext, request);
if (replyToValue instanceof Destination destination) {
return destination;
}
else if (replyToValue instanceof String destinationName) {
return this.destinationResolver.resolveDestinationName(session, destinationName, false);
}
}
}
return replyTo;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,9 @@ public void setShouldTrack(boolean shouldTrack) {
}

/**
* Set to false to prevent listener container shutdown when the endpoint is stopped.
* Set to {@code false} to prevent listener container shutdown when the endpoint is stopped.
* Then, if so configured, any cached consumer(s) in the container will remain.
* Otherwise the shared connection and will be closed and the listener invokers shut
* Otherwise, the shared connection and will be closed and the listener invokers shut
* down; this behavior is new starting with version 5.1. Default: true.
* @param shutdownContainerOnStop false to not shutdown.
* @since 5.1
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2022 the original author or authors.
* Copyright 2016-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,11 +19,15 @@
import java.util.function.Consumer;

import jakarta.jms.Destination;
import jakarta.jms.Message;

import org.springframework.expression.Expression;
import org.springframework.integration.dsl.MessagingGatewaySpec;
import org.springframework.integration.expression.FunctionExpression;
import org.springframework.integration.jms.ChannelPublishingJmsMessageListener;
import org.springframework.integration.jms.JmsHeaderMapper;
import org.springframework.integration.jms.JmsInboundGateway;
import org.springframework.integration.util.CheckedFunction;
import org.springframework.jms.listener.AbstractMessageListenerContainer;
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.jms.support.destination.DestinationResolver;
Expand Down Expand Up @@ -137,6 +141,44 @@ public S destinationResolver(DestinationResolver destinationResolver) {
return _this();
}


/**
* Set a SpEL expression to resolve a 'replyTo' destination from a request {@link jakarta.jms.Message}
* as a root evaluation object if {@link jakarta.jms.Message#getJMSReplyTo()} is null.
* @param replyToExpression the SpEL expression for 'replyTo' destination.
* @return the spec.
* @since 6.1
* @see ChannelPublishingJmsMessageListener#setReplyToExpression(Expression)
*/
public S replyToExpression(String replyToExpression) {
return replyToExpression(PARSER.parseExpression(replyToExpression));
}

/**
* Set a function to resolve a 'replyTo' destination from a request {@link jakarta.jms.Message}
* as a root evaluation object if {@link jakarta.jms.Message#getJMSReplyTo()} is null.
* @param replyToFunction the function for 'replyTo' destination.
* @return the spec.
* @since 6.1
* @see ChannelPublishingJmsMessageListener#setReplyToExpression(Expression)
*/
public S replyToFunction(CheckedFunction<Message, ?> replyToFunction) {
return replyToExpression(new FunctionExpression<>(replyToFunction.unchecked()));
}
Comment on lines +165 to +167
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this; why not just

Suggested change
public S replyToFunction(CheckedFunction<Message, ?> replyToFunction) {
return replyToExpression(new FunctionExpression<>(replyToFunction.unchecked()));
}
public S replyToFunction(Function<Message, ?> replyToFunction) {
return replyToExpression(new FunctionExpression<>(replyToFunction));
}

??

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, but then .replyToFunction(message -> message.getStringProperty("myReplyTo")) is not going to be clean and end-user has to handle that JMSException himself which is not runtime one.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, you are assuming that the user will use a JMS operation to determine the destination? I guess that's reasonable given that the root object is a Message.

OK.


/**
* Set a SpEL expression to resolve a 'replyTo' destination from a request {@link jakarta.jms.Message}
* as a root evaluation object if {@link jakarta.jms.Message#getJMSReplyTo()} is null.
* @param replyToExpression the SpEL expression for 'replyTo' destination.
* @return the spec.
* @since 6.1
* @see ChannelPublishingJmsMessageListener#setReplyToExpression(Expression)
*/
public S replyToExpression(Expression replyToExpression) {
this.target.getListener().setReplyToExpression(replyToExpression);
return _this();
}

/**
* @param messageConverter the messageConverter.
* @return the spec.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2022 the original author or authors.
* Copyright 2016-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -22,6 +22,8 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import jakarta.jms.JMSException;
import jakarta.jms.TextMessage;
import org.junit.jupiter.api.Test;

import org.springframework.beans.factory.ListableBeanFactory;
Expand Down Expand Up @@ -137,6 +139,9 @@ public class JmsTests extends ActiveMQMultiContextTests {
@Autowired
private CountDownLatch redeliveryLatch;

@Autowired
JmsTemplate jmsTemplate;

@Test
public void testPollingFlow() {
this.controlBus.send("@'integerMessageSource.inboundChannelAdapter'.start()");
Expand Down Expand Up @@ -264,6 +269,19 @@ public void testJmsRedeliveryFlow() throws InterruptedException {
this.jmsMessageDrivenRedeliveryFlowContainer.stop();
}

@Test
public void customReplyToHeader() throws JMSException {
this.jmsTemplate.send("jmsPipelineTest", session -> {
TextMessage message = session.createTextMessage("test data");
message.setStringProperty("myReplyTo", "replyToQueue");
return message;
});

jakarta.jms.Message replyMessage = this.jmsTemplate.receive("replyToQueue");
assertThat(replyMessage).isNotNull();
assertThat(replyMessage.getBody(String.class)).isEqualTo("TEST DATA");
}

@MessagingGateway(defaultRequestChannel = "controlBus.input")
private interface ControlBusGateway {

Expand All @@ -278,7 +296,9 @@ public static class ContextConfiguration {

@Bean
public JmsTemplate jmsTemplate() {
return new JmsTemplate(connectionFactory);
JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory);
jmsTemplate.setReceiveTimeout(10_000);
return jmsTemplate;
}

@Bean(name = PollerMetadata.DEFAULT_POLLER)
Expand Down Expand Up @@ -422,6 +442,7 @@ protected Object handleRequestMessage(Message<?> requestMessage) {

}))
.requestDestination("jmsPipelineTest")
.replyToFunction(message -> message.getStringProperty("myReplyTo"))
.configureListenerContainer(c ->
c.transactionManager(mock(PlatformTransactionManager.class))))
.filter(payload -> !"junk".equals(payload))
Expand Down
25 changes: 23 additions & 2 deletions src/reference/asciidoc/jms.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -411,11 +411,32 @@ Starting with version 5.1, when the endpoint is stopped while the application re
Previously, the connection and consumers remained open.
To revert to the previous behavior, set the `shutdownContainerOnStop` on the `JmsInboundGateway` to `false`.

By default, the `JmsInboundGateway` looks for a `jakarta.jms.Message.getJMSReplyTo()` property in the received message to determine where to send a reply.
Otherwise, it can be configured with a static `defaultReplyDestination`, or `defaultReplyQueueName` or `defaultReplyTopicName`.
In addition, starting with version 6.1, a `replyToExpression` can be configured on a provided `ChannelPublishingJmsMessageListener` to determine the reply destination dynamically, if the standard `JMSReplyTo` property is `null` on the request.
The received `jakarta.jms.Message` is used the root evaluation context object.
The following example demonstrates how to use Java DSL API to configure an inbound JMS gateway with a custom reply destination resolved from the request message:

====
[source,java]
----
@Bean
public IntegrationFlow jmsInboundGatewayFlow(ConnectionFactory connectionFactory) {
return IntegrationFlow.from(
Jms.inboundGateway(connectionFactory)
.requestDestination("requestDestination")
.replyToFunction(message -> message.getStringProperty("myReplyTo")))
.<String, String>transform(String::toUpperCase)
.get();
}
----
====

[[jms-outbound-gateway]]
=== Outbound Gateway

The outbound gateway creates JMS messages from Spring Integration messages and sends them to a 'request-destination'.
It then handles the JMS reply message either by using a selector to receive from the 'reply-destination' that you configure or, if no 'reply-destination' is provided, by creating JMS `TemporaryQueue` (or `TemporaryTopic` if `replyPubSubDomain= true`) instances.
The outbound gateway creates JMS messages from Spring Integration messages and sends them to a `request-destination`.
It then handles the JMS reply message either by using a selector to receive from the `reply-destination` that you configure or, if no `reply-destination` is provided, by creating JMS `TemporaryQueue` (or `TemporaryTopic` if `replyPubSubDomain= true`) instances.

[[jms-outbound-gateway-memory-caution]]
[CAUTION]
Expand Down
7 changes: 6 additions & 1 deletion src/reference/asciidoc/whats-new.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,14 @@ See <<./zip.adoc#zip,Zip Support>> for more information.
[[x6.1-general]]
=== General Changes


[[x6.1-web-sockets]]
=== Web Sockets Changes

A `ClientWebSocketContainer` can now be configured with a predefined `URI` instead of a combination of `uriTemplate` and `uriVariables`.
See <<./web-sockets.adoc#web-socket-overview, WebSocket Overview>> for more information.

[[x6.1-jms]]
=== JMS Changes

The `JmsInboundGateway`, via its `ChannelPublishingJmsMessageListener`, can now be configured with a `replyToExpression` to resolve a reply destination against the request message at runtime.
See <<./jms.adoc#jms-inbound-gateway, JMS Inbound Gateway>> for more information.