Skip to content

Commit fe8d42f

Browse files
committed
Add JdkHttpClientResourceFactory
See gh-23432
1 parent b3b50f8 commit fe8d42f

File tree

4 files changed

+146
-9
lines changed

4 files changed

+146
-9
lines changed

spring-web/src/main/java/org/springframework/http/client/reactive/JdkClientHttpConnector.java

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.nio.ByteBuffer;
2424
import java.util.List;
2525
import java.util.concurrent.CompletableFuture;
26+
import java.util.concurrent.Executor;
2627
import java.util.concurrent.Flow;
2728
import java.util.function.Function;
2829

@@ -31,6 +32,8 @@
3132
import org.springframework.core.io.buffer.DataBufferFactory;
3233
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
3334
import org.springframework.http.HttpMethod;
35+
import org.springframework.lang.Nullable;
36+
import org.springframework.util.Assert;
3437

3538
/**
3639
* {@link ClientHttpConnector} for the Java {@link HttpClient}.
@@ -44,21 +47,47 @@ public class JdkClientHttpConnector implements ClientHttpConnector {
4447

4548
private final HttpClient httpClient;
4649

47-
private final DataBufferFactory bufferFactory;
50+
private DataBufferFactory bufferFactory = DefaultDataBufferFactory.sharedInstance;
4851

4952

5053
/**
5154
* Default constructor that uses {@link HttpClient#newHttpClient()}.
5255
*/
5356
public JdkClientHttpConnector() {
54-
this(HttpClient.newHttpClient(), new DefaultDataBufferFactory());
57+
this(HttpClient.newHttpClient());
5558
}
5659

5760
/**
5861
* Constructor with an initialized {@link HttpClient} and a {@link DataBufferFactory}.
5962
*/
60-
public JdkClientHttpConnector(HttpClient httpClient, DataBufferFactory bufferFactory) {
63+
public JdkClientHttpConnector(HttpClient httpClient) {
6164
this.httpClient = httpClient;
65+
}
66+
67+
/**
68+
* Constructor with a {@link JdkHttpClientResourceFactory} that provides
69+
* shared resources.
70+
* @param clientBuilder a pre-initialized builder for the client that will
71+
* be further initialized with the shared resources to use
72+
* @param resourceFactory the {@link JdkHttpClientResourceFactory} to use
73+
*/
74+
public JdkClientHttpConnector(
75+
HttpClient.Builder clientBuilder, @Nullable JdkHttpClientResourceFactory resourceFactory) {
76+
77+
if (resourceFactory != null) {
78+
Executor executor = resourceFactory.getExecutor();
79+
clientBuilder.executor(executor);
80+
}
81+
this.httpClient = clientBuilder.build();
82+
}
83+
84+
85+
/**
86+
* Set the buffer factory to use.
87+
* <p>By default, this is {@link DefaultDataBufferFactory#sharedInstance}.
88+
*/
89+
public void setBufferFactory(DataBufferFactory bufferFactory) {
90+
Assert.notNull(bufferFactory, "DataBufferFactory is required");
6291
this.bufferFactory = bufferFactory;
6392
}
6493

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
/*
2+
* Copyright 2002-2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.http.client.reactive;
18+
19+
import java.net.http.HttpClient;
20+
import java.util.concurrent.Executor;
21+
import java.util.concurrent.Executors;
22+
import java.util.concurrent.ThreadFactory;
23+
24+
import org.eclipse.jetty.util.component.LifeCycle;
25+
import org.eclipse.jetty.util.thread.QueuedThreadPool;
26+
27+
import org.springframework.beans.factory.DisposableBean;
28+
import org.springframework.beans.factory.InitializingBean;
29+
import org.springframework.lang.Nullable;
30+
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
31+
import org.springframework.util.Assert;
32+
33+
/**
34+
* Factory to manage JDK HttpClient resources such as a shared {@link Executor}
35+
* within the lifecycle of a Spring {@code ApplicationContext}.
36+
*
37+
* <p>This factory implements {@link InitializingBean} and {@link DisposableBean}
38+
* and is expected typically to be declared as a Spring-managed bean.
39+
*
40+
* @author Rossen Stoyanchev
41+
* @since 6.0
42+
* @see JdkClientHttpConnector#JdkClientHttpConnector(HttpClient.Builder, JdkHttpClientResourceFactory)
43+
*/
44+
public class JdkHttpClientResourceFactory implements InitializingBean, DisposableBean {
45+
46+
@Nullable
47+
private Executor executor;
48+
49+
private String threadPrefix = "jdk-http";
50+
51+
52+
/**
53+
* Configure the {@link Executor} to use for {@link HttpClient} exchanges.
54+
* The given executor is started and stopped via {@link InitializingBean}
55+
* and {@link DisposableBean}.
56+
* <p>By default, this is set to {@link Executors#newCachedThreadPool(ThreadFactory)},
57+
* which mirrors {@link HttpClient.Builder#executor(Executor)}.
58+
* @param executor the executor to use
59+
*/
60+
public void setExecutor(@Nullable Executor executor) {
61+
this.executor = executor;
62+
}
63+
64+
/**
65+
* Return the configured {@link Executor}.
66+
*/
67+
@Nullable
68+
public Executor getExecutor() {
69+
return this.executor;
70+
}
71+
72+
/**
73+
* Configure the thread prefix to initialize {@link QueuedThreadPool} executor with. This
74+
* is used only when a {@link Executor} instance isn't
75+
* {@link #setExecutor(Executor) provided}.
76+
* <p>By default set to "jetty-http".
77+
* @param threadPrefix the thread prefix to use
78+
*/
79+
public void setThreadPrefix(String threadPrefix) {
80+
Assert.notNull(threadPrefix, "Thread prefix is required");
81+
this.threadPrefix = threadPrefix;
82+
}
83+
84+
85+
@Override
86+
public void afterPropertiesSet() throws Exception {
87+
if (this.executor == null) {
88+
String name = this.threadPrefix + "@" + Integer.toHexString(hashCode());
89+
this.executor = Executors.newCachedThreadPool(new CustomizableThreadFactory(name));
90+
}
91+
if (this.executor instanceof LifeCycle) {
92+
((LifeCycle)this.executor).start();
93+
}
94+
}
95+
96+
@Override
97+
public void destroy() throws Exception {
98+
try {
99+
if (this.executor instanceof LifeCycle) {
100+
((LifeCycle)this.executor).stop();
101+
}
102+
}
103+
catch (Throwable ex) {
104+
// ignore
105+
}
106+
}
107+
108+
}

spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientIntegrationTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,9 +99,9 @@ class WebClientIntegrationTests {
9999
static Stream<ClientHttpConnector> arguments() {
100100
return Stream.of(
101101
new ReactorClientHttpConnector(),
102+
new JdkClientHttpConnector(),
102103
new JettyClientHttpConnector(),
103-
new HttpComponentsClientHttpConnector(),
104-
new JdkClientHttpConnector()
104+
new HttpComponentsClientHttpConnector()
105105
);
106106
}
107107

spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/SseIntegrationTests.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -75,21 +75,21 @@ class SseIntegrationTests extends AbstractHttpHandlerIntegrationTests {
7575
static Object[][] arguments() {
7676
return new Object[][] {
7777
{new JettyHttpServer(), new ReactorClientHttpConnector()},
78+
{new JettyHttpServer(), new JdkClientHttpConnector()},
7879
{new JettyHttpServer(), new JettyClientHttpConnector()},
7980
{new JettyHttpServer(), new HttpComponentsClientHttpConnector()},
80-
{new JettyHttpServer(), new JdkClientHttpConnector()},
8181
{new ReactorHttpServer(), new ReactorClientHttpConnector()},
82+
{new ReactorHttpServer(), new JdkClientHttpConnector()},
8283
{new ReactorHttpServer(), new JettyClientHttpConnector()},
8384
{new ReactorHttpServer(), new HttpComponentsClientHttpConnector()},
84-
{new ReactorHttpServer(), new JdkClientHttpConnector()},
8585
{new TomcatHttpServer(), new ReactorClientHttpConnector()},
86+
{new TomcatHttpServer(), new JdkClientHttpConnector()},
8687
{new TomcatHttpServer(), new JettyClientHttpConnector()},
8788
{new TomcatHttpServer(), new HttpComponentsClientHttpConnector()},
88-
{new TomcatHttpServer(), new JdkClientHttpConnector()},
8989
{new UndertowHttpServer(), new ReactorClientHttpConnector()},
90+
{new UndertowHttpServer(), new JdkClientHttpConnector()},
9091
{new UndertowHttpServer(), new JettyClientHttpConnector()},
9192
{new UndertowHttpServer(), new HttpComponentsClientHttpConnector()},
92-
{new UndertowHttpServer(), new JdkClientHttpConnector()},
9393
};
9494
}
9595

0 commit comments

Comments
 (0)