Skip to content

Commit ddc587e

Browse files
committed
#2 - Introduce ConnectionAccessor.
Declare interface exposing inConnection(…) methods to encapsulate types implementing functionality that is applies within a connection scope. Move FetchSpec and SqlResult implementations to top-level types.
1 parent b8375f5 commit ddc587e

File tree

8 files changed

+333
-155
lines changed

8 files changed

+333
-155
lines changed
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Copyright 2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.data.jdbc.core.function;
17+
18+
import io.r2dbc.spi.Connection;
19+
import reactor.core.publisher.Flux;
20+
import reactor.core.publisher.Mono;
21+
22+
import java.util.function.Function;
23+
24+
import org.springframework.dao.DataAccessException;
25+
26+
/**
27+
* Interface declaring methods that accept callback {@link Function} to operate within the scope of a
28+
* {@link Connection}. Callback functions operate on a provided connection and must not close the connection as the
29+
* connections may be pooled or be subject to other kinds of resource management.
30+
* <p/>
31+
* Callback functions are responsible for creating a {@link org.reactivestreams.Publisher} that defines the scope of how
32+
* long the allocated {@link Connection} is valid. Connections are released after the publisher terminates.
33+
*
34+
* @author Mark Paluch
35+
*/
36+
public interface ConnectionAccessor {
37+
38+
/**
39+
* Execute a callback {@link Function} within a {@link Connection} scope. The function is responsible for creating a
40+
* {@link Mono}. The connection is released after the {@link Mono} terminates (or the subscription is cancelled).
41+
* Connection resources must not be passed outside of the {@link Function} closure, otherwise resources may get
42+
* defunct.
43+
*
44+
* @param action must not be {@literal null}.
45+
* @return the resulting {@link Mono}.
46+
* @throws DataAccessException
47+
*/
48+
<T> Mono<T> inConnection(Function<Connection, Mono<T>> action) throws DataAccessException;
49+
50+
/**
51+
* Execute a callback {@link Function} within a {@link Connection} scope. The function is responsible for creating a
52+
* {@link Flux}. The connection is released after the {@link Flux} terminates (or the subscription is cancelled).
53+
* Connection resources must not be passed outside of the {@link Function} closure, otherwise resources may get
54+
* defunct.
55+
*
56+
* @param action must not be {@literal null}.
57+
* @return the resulting {@link Flux}.
58+
* @throws DataAccessException
59+
*/
60+
<T> Flux<T> inConnectionMany(Function<Connection, Flux<T>> action) throws DataAccessException;
61+
62+
}

src/main/java/org/springframework/data/jdbc/core/function/DatabaseClient.java

Lines changed: 3 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@
1515
*/
1616
package org.springframework.data.jdbc.core.function;
1717

18-
import reactor.core.publisher.Flux;
18+
import io.r2dbc.spi.ConnectionFactory;
19+
import io.r2dbc.spi.Row;
20+
import io.r2dbc.spi.RowMetadata;
1921
import reactor.core.publisher.Mono;
2022

2123
import java.util.Map;
@@ -28,10 +30,6 @@
2830
import org.springframework.data.domain.Sort;
2931
import org.springframework.jdbc.support.SQLExceptionTranslator;
3032

31-
import io.r2dbc.spi.ConnectionFactory;
32-
import io.r2dbc.spi.Row;
33-
import io.r2dbc.spi.RowMetadata;
34-
3533
/**
3634
* A non-blocking, reactive client for performing database calls requests with Reactive Streams back pressure. Provides
3735
* a higher level, common API over R2DBC client libraries.
@@ -412,40 +410,4 @@ interface BindSpec<S extends BindSpec<S>> {
412410
*/
413411
S bind(Object bean);
414412
}
415-
416-
/**
417-
* Contract for fetching results.
418-
*/
419-
interface FetchSpec<T> {
420-
421-
/**
422-
* Get exactly zero or one result.
423-
*
424-
* @return {@link Mono#empty()} if no match found. Never {@literal null}.
425-
* @throws org.springframework.dao.IncorrectResultSizeDataAccessException if more than one match found.
426-
*/
427-
Mono<T> one();
428-
429-
/**
430-
* Get the first or no result.
431-
*
432-
* @return {@link Mono#empty()} if no match found. Never {@literal null}.
433-
*/
434-
Mono<T> first();
435-
436-
/**
437-
* Get all matching elements.
438-
*
439-
* @return never {@literal null}.
440-
*/
441-
Flux<T> all();
442-
443-
/**
444-
* Get the number of updated rows.
445-
*
446-
* @return {@link Mono} emitting the number of updated rows. Never {@literal null}.
447-
*/
448-
Mono<Integer> rowsUpdated();
449-
}
450-
451413
}

src/main/java/org/springframework/data/jdbc/core/function/DefaultDatabaseClient.java

Lines changed: 21 additions & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@
4646
import org.apache.commons.logging.LogFactory;
4747
import org.reactivestreams.Publisher;
4848
import org.springframework.dao.DataAccessException;
49-
import org.springframework.dao.IncorrectResultSizeDataAccessException;
5049
import org.springframework.dao.UncategorizedDataAccessException;
5150
import org.springframework.data.jdbc.core.function.connectionfactory.ConnectionProxy;
5251
import org.springframework.data.util.Pair;
@@ -60,7 +59,7 @@
6059
*
6160
* @author Mark Paluch
6261
*/
63-
class DefaultDatabaseClient implements DatabaseClient {
62+
class DefaultDatabaseClient implements DatabaseClient, ConnectionAccessor {
6463

6564
/** Logger available to subclasses */
6665
protected final Log logger = LogFactory.getLog(getClass());
@@ -104,54 +103,56 @@ public InsertIntoSpec insert() {
104103

105104
/**
106105
* Execute a callback {@link Function} within a {@link Connection} scope. The function is responsible for creating a
107-
* {@link Flux}. The connection is released after the {@link Flux} terminates (or the subscription is cancelled).
106+
* {@link Mono}. The connection is released after the {@link Mono} terminates (or the subscription is cancelled).
108107
* Connection resources must not be passed outside of the {@link Function} closure, otherwise resources may get
109108
* defunct.
110109
*
111110
* @param action must not be {@literal null}.
112-
* @return the resulting {@link Flux}.
111+
* @return the resulting {@link Mono}.
113112
* @throws DataAccessException
114113
*/
115-
public <T> Flux<T> executeMany(Function<Connection, Flux<T>> action) throws DataAccessException {
114+
@Override
115+
public <T> Mono<T> inConnection(Function<Connection, Mono<T>> action) throws DataAccessException {
116116

117117
Assert.notNull(action, "Callback object must not be null");
118118

119119
Mono<Connection> connectionMono = getConnection();
120120
// Create close-suppressing Connection proxy, also preparing returned Statements.
121121

122-
return Flux.usingWhen(connectionMono, it -> {
122+
return Mono.usingWhen(connectionMono, it -> {
123123

124124
Connection connectionToUse = createConnectionProxy(it);
125125

126-
return doInConnectionMany(connectionToUse, action);
126+
return doInConnection(connectionToUse, action);
127127
}, this::closeConnection, this::closeConnection, this::closeConnection) //
128-
.onErrorMap(SQLException.class, ex -> translateException("executeMany", getSql(action), ex));
128+
.onErrorMap(SQLException.class, ex -> translateException("execute", getSql(action), ex));
129129
}
130130

131131
/**
132132
* Execute a callback {@link Function} within a {@link Connection} scope. The function is responsible for creating a
133-
* {@link Mono}. The connection is released after the {@link Mono} terminates (or the subscription is cancelled).
133+
* {@link Flux}. The connection is released after the {@link Flux} terminates (or the subscription is cancelled).
134134
* Connection resources must not be passed outside of the {@link Function} closure, otherwise resources may get
135135
* defunct.
136136
*
137137
* @param action must not be {@literal null}.
138-
* @return the resulting {@link Mono}.
138+
* @return the resulting {@link Flux}.
139139
* @throws DataAccessException
140140
*/
141-
public <T> Mono<T> execute(Function<Connection, Mono<T>> action) throws DataAccessException {
141+
@Override
142+
public <T> Flux<T> inConnectionMany(Function<Connection, Flux<T>> action) throws DataAccessException {
142143

143144
Assert.notNull(action, "Callback object must not be null");
144145

145146
Mono<Connection> connectionMono = getConnection();
146147
// Create close-suppressing Connection proxy, also preparing returned Statements.
147148

148-
return Mono.usingWhen(connectionMono, it -> {
149+
return Flux.usingWhen(connectionMono, it -> {
149150

150151
Connection connectionToUse = createConnectionProxy(it);
151152

152-
return doInConnection(connectionToUse, action);
153+
return doInConnectionMany(connectionToUse, action);
153154
}, this::closeConnection, this::closeConnection, this::closeConnection) //
154-
.onErrorMap(SQLException.class, ex -> translateException("execute", getSql(action), ex));
155+
.onErrorMap(SQLException.class, ex -> translateException("executeMany", getSql(action), ex));
155156
}
156157

157158
/**
@@ -292,7 +293,8 @@ protected <T> SqlResult<T> exchange(String sql, BiFunction<Row, RowMetadata, T>
292293

293294
Function<Connection, Flux<Result>> resultFunction = it -> Flux.from(executeFunction.apply(it).execute());
294295

295-
return new DefaultSqlResultFunctions<>(sql, //
296+
return new DefaultSqlResult<>(DefaultDatabaseClient.this, //
297+
sql, //
296298
resultFunction, //
297299
it -> resultFunction.apply(it).flatMap(Result::getRowsUpdated).next(), //
298300
mappingFunction);
@@ -564,7 +566,8 @@ private <T> SqlResult<T> exchange(BiFunction<Row, RowMetadata, T> mappingFunctio
564566
Function<Connection, Flux<Result>> resultFunction = it -> Flux
565567
.from(insertFunction.apply(it).executeReturningGeneratedKeys());
566568

567-
return new DefaultSqlResultFunctions<>(sql, //
569+
return new DefaultSqlResult<>(DefaultDatabaseClient.this, //
570+
sql, //
568571
resultFunction, //
569572
it -> resultFunction.apply(it).flatMap(Result::getRowsUpdated).next(), //
570573
mappingFunction);
@@ -680,102 +683,14 @@ private <R> SqlResult<R> exchange(Object toInsert, BiFunction<Row, RowMetadata,
680683
Function<Connection, Flux<Result>> resultFunction = it -> Flux
681684
.from(insertFunction.apply(it).executeReturningGeneratedKeys());
682685

683-
return new DefaultSqlResultFunctions<>(sql, //
686+
return new DefaultSqlResult<>(DefaultDatabaseClient.this, //
687+
sql, //
684688
resultFunction, //
685689
it -> resultFunction.apply(it).flatMap(Result::getRowsUpdated).next(), //
686690
mappingFunction);
687691
}
688692
}
689693

690-
/**
691-
* Default {@link org.springframework.data.jdbc.core.function.SqlResult} implementation.
692-
*/
693-
class DefaultSqlResultFunctions<T> implements SqlResult<T> {
694-
695-
private final String sql;
696-
private final Function<Connection, Flux<Result>> resultFunction;
697-
private final Function<Connection, Mono<Integer>> updatedRowsFunction;
698-
private final FetchSpec<T> fetchSpec;
699-
700-
DefaultSqlResultFunctions(String sql, Function<Connection, Flux<Result>> resultFunction,
701-
Function<Connection, Mono<Integer>> updatedRowsFunction, BiFunction<Row, RowMetadata, T> mappingFunction) {
702-
703-
this.sql = sql;
704-
this.resultFunction = resultFunction;
705-
this.updatedRowsFunction = updatedRowsFunction;
706-
707-
this.fetchSpec = new DefaultFetchFunctions<>(sql,
708-
it -> resultFunction.apply(it).flatMap(result -> result.map(mappingFunction)), updatedRowsFunction);
709-
}
710-
711-
@Override
712-
public <R> SqlResult<R> extract(BiFunction<Row, RowMetadata, R> mappingFunction) {
713-
return new DefaultSqlResultFunctions<>(sql, resultFunction, updatedRowsFunction, mappingFunction);
714-
}
715-
716-
@Override
717-
public Mono<T> one() {
718-
return fetchSpec.one();
719-
}
720-
721-
@Override
722-
public Mono<T> first() {
723-
return fetchSpec.first();
724-
}
725-
726-
@Override
727-
public Flux<T> all() {
728-
return fetchSpec.all();
729-
}
730-
731-
@Override
732-
public Mono<Integer> rowsUpdated() {
733-
return fetchSpec.rowsUpdated();
734-
}
735-
}
736-
737-
@RequiredArgsConstructor
738-
class DefaultFetchFunctions<T> implements FetchSpec<T> {
739-
740-
private final String sql;
741-
private final Function<Connection, Flux<T>> resultFunction;
742-
private final Function<Connection, Mono<Integer>> updatedRowsFunction;
743-
744-
@Override
745-
public Mono<T> one() {
746-
747-
return all().buffer(2) //
748-
.flatMap(it -> {
749-
750-
if (it.isEmpty()) {
751-
return Mono.empty();
752-
}
753-
754-
if (it.size() > 1) {
755-
return Mono.error(new IncorrectResultSizeDataAccessException(
756-
String.format("Query [%s] returned non unique result.", this.sql), 1));
757-
}
758-
759-
return Mono.just(it.get(0));
760-
}).next();
761-
}
762-
763-
@Override
764-
public Mono<T> first() {
765-
return all().next();
766-
}
767-
768-
@Override
769-
public Flux<T> all() {
770-
return executeMany(resultFunction);
771-
}
772-
773-
@Override
774-
public Mono<Integer> rowsUpdated() {
775-
return execute(updatedRowsFunction);
776-
}
777-
}
778-
779694
private static <T> Flux<T> doInConnectionMany(Connection connection, Function<Connection, Flux<T>> action) {
780695

781696
try {

0 commit comments

Comments
 (0)