Skip to content

Commit 1464df9

Browse files
committed
#2 - Add transactional support.
We now support transaction hosting and transaction management via TransactionalDatabaseClient. TransactionalDatabaseClient databaseClient = TransactionalDatabaseClient.create(connectionFactory); Flux<Integer> integerFlux = databaseClient.inTransaction(db -> { return db.execute().sql("INSERT INTO legoset (id, name, manual) VALUES($1, $2, $3)") // .bind(0, 42055) // .bind(1, "SCHAUFELRADBAGGER") // .bindNull("$3") // .fetch().rowsUpdated(); });
1 parent 853b3fb commit 1464df9

13 files changed

+1357
-6
lines changed

src/main/java/org/springframework/data/r2dbc/function/DefaultDatabaseClientBuilder.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,9 @@
3333
*/
3434
class DefaultDatabaseClientBuilder implements DatabaseClient.Builder {
3535

36-
private @Nullable ConnectionFactory connector;
37-
private @Nullable R2dbcExceptionTranslator exceptionTranslator;
38-
private ReactiveDataAccessStrategy accessStrategy = new DefaultReactiveDataAccessStrategy();
36+
@Nullable ConnectionFactory connector;
37+
@Nullable R2dbcExceptionTranslator exceptionTranslator;
38+
ReactiveDataAccessStrategy accessStrategy = new DefaultReactiveDataAccessStrategy();
3939

4040
DefaultDatabaseClientBuilder() {}
4141

@@ -44,7 +44,7 @@ class DefaultDatabaseClientBuilder implements DatabaseClient.Builder {
4444
Assert.notNull(other, "DefaultDatabaseClientBuilder must not be null!");
4545

4646
this.connector = other.connector;
47-
this.exceptionTranslator = exceptionTranslator;
47+
this.exceptionTranslator = other.exceptionTranslator;
4848
}
4949

5050
@Override
@@ -83,8 +83,12 @@ public DatabaseClient build() {
8383
exceptionTranslator = new SqlErrorCodeR2dbcExceptionTranslator(connector);
8484
}
8585

86-
return new DefaultDatabaseClient(this.connector, exceptionTranslator, accessStrategy,
87-
new DefaultDatabaseClientBuilder(this));
86+
return doBuild(this.connector, exceptionTranslator, this.accessStrategy, new DefaultDatabaseClientBuilder(this));
87+
}
88+
89+
protected DatabaseClient doBuild(ConnectionFactory connector, R2dbcExceptionTranslator exceptionTranslator,
90+
ReactiveDataAccessStrategy accessStrategy, DefaultDatabaseClientBuilder builder) {
91+
return new DefaultDatabaseClient(connector, exceptionTranslator, accessStrategy, builder);
8892
}
8993

9094
@Override
Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
/*
2+
* Copyright 2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.data.r2dbc.function;
17+
18+
import io.r2dbc.spi.Connection;
19+
import io.r2dbc.spi.ConnectionFactory;
20+
import reactor.core.publisher.Flux;
21+
import reactor.core.publisher.Mono;
22+
import reactor.util.context.Context;
23+
import reactor.util.function.Tuple2;
24+
25+
import java.util.function.Function;
26+
27+
import org.reactivestreams.Publisher;
28+
import org.springframework.data.r2dbc.function.connectionfactory.ConnectionFactoryUtils;
29+
import org.springframework.data.r2dbc.function.connectionfactory.ReactiveTransactionSynchronization;
30+
import org.springframework.data.r2dbc.function.connectionfactory.TransactionResources;
31+
import org.springframework.data.r2dbc.support.R2dbcExceptionTranslator;
32+
import org.springframework.transaction.NoTransactionException;
33+
34+
/**
35+
* Default implementation of a {@link TransactionalDatabaseClient}.
36+
*
37+
* @author Mark Paluch
38+
*/
39+
class DefaultTransactionalDatabaseClient extends DefaultDatabaseClient implements TransactionalDatabaseClient {
40+
41+
DefaultTransactionalDatabaseClient(ConnectionFactory connector, R2dbcExceptionTranslator exceptionTranslator,
42+
ReactiveDataAccessStrategy dataAccessStrategy, DefaultDatabaseClientBuilder builder) {
43+
super(connector, exceptionTranslator, dataAccessStrategy, builder);
44+
}
45+
46+
@Override
47+
public TransactionalDatabaseClient.Builder mutate() {
48+
return (TransactionalDatabaseClient.Builder) super.mutate();
49+
}
50+
51+
/* (non-Javadoc)
52+
* @see org.springframework.data.r2dbc.function.TransactionalDatabaseClient#beginTransaction()
53+
*/
54+
@Override
55+
public Mono<Void> beginTransaction() {
56+
57+
Mono<TransactionResources> transactional = ConnectionFactoryUtils.currentReactiveTransactionSynchronization() //
58+
.map(synchronization -> {
59+
60+
TransactionResources transactionResources = TransactionResources.create();
61+
// TODO: This Tx management code creating a TransactionContext. Find a better place.
62+
synchronization.registerTransaction(transactionResources);
63+
return transactionResources;
64+
});
65+
66+
return transactional.flatMap(it -> {
67+
return ConnectionFactoryUtils.doGetConnection(obtainConnectionFactory());
68+
}).flatMap(it -> Mono.from(it.getT1().beginTransaction()));
69+
}
70+
71+
/* (non-Javadoc)
72+
* @see org.springframework.data.r2dbc.function.TransactionalDatabaseClient#commitTransaction()
73+
*/
74+
@Override
75+
public Mono<Void> commitTransaction() {
76+
return cleanup(Connection::commitTransaction);
77+
}
78+
79+
/* (non-Javadoc)
80+
* @see org.springframework.data.r2dbc.function.TransactionalDatabaseClient#rollbackTransaction()
81+
*/
82+
@Override
83+
public Mono<Void> rollbackTransaction() {
84+
return cleanup(Connection::rollbackTransaction);
85+
}
86+
87+
/* (non-Javadoc)
88+
* @see org.springframework.data.r2dbc.function.TransactionalDatabaseClient#inTransaction(java.util.function.Function)
89+
*/
90+
@Override
91+
public <T> Flux<T> inTransaction(Function<DatabaseClient, ? extends Publisher<? extends T>> callback) {
92+
93+
return Flux.usingWhen(beginTransaction().thenReturn(this), callback, //
94+
DefaultTransactionalDatabaseClient::commitTransaction, //
95+
DefaultTransactionalDatabaseClient::rollbackTransaction, //
96+
DefaultTransactionalDatabaseClient::rollbackTransaction) //
97+
.subscriberContext(DefaultTransactionalDatabaseClient::withTransactionSynchronization);
98+
}
99+
100+
/* (non-Javadoc)
101+
* @see org.springframework.data.r2dbc.function.DefaultDatabaseClient#getConnection()
102+
*/
103+
@Override
104+
protected Mono<Connection> getConnection() {
105+
return ConnectionFactoryUtils.getConnection(obtainConnectionFactory()).map(Tuple2::getT1);
106+
}
107+
108+
/* (non-Javadoc)
109+
* @see org.springframework.data.r2dbc.function.DefaultDatabaseClient#closeConnection(io.r2dbc.spi.Connection)
110+
*/
111+
@Override
112+
protected Publisher<Void> closeConnection(Connection connection) {
113+
114+
return Mono.subscriberContext().flatMap(context -> {
115+
116+
if (context.hasKey(ReactiveTransactionSynchronization.class)) {
117+
118+
return ConnectionFactoryUtils.currentConnectionFactory()
119+
.flatMap(it -> ConnectionFactoryUtils.releaseConnection(connection, it));
120+
}
121+
122+
return Mono.from(connection.close());
123+
});
124+
}
125+
126+
/**
127+
* Execute a transactional cleanup. Also, deregister the current {@link TransactionResources synchronization} element.
128+
*/
129+
private static Mono<Void> cleanup(Function<Connection, ? extends Publisher<Void>> callback) {
130+
131+
return ConnectionFactoryUtils.currentActiveReactiveTransactionSynchronization() //
132+
.flatMap(synchronization -> {
133+
134+
TransactionResources currentSynchronization = synchronization.getCurrentTransaction();
135+
136+
ConnectionFactory connectionFactory = currentSynchronization.getResource(ConnectionFactory.class);
137+
138+
if (connectionFactory == null) {
139+
throw new NoTransactionException("No ConnectionFactory attached");
140+
}
141+
142+
return Mono.from(connectionFactory.create())
143+
.flatMap(connection -> Mono.from(callback.apply(connection))
144+
.then(ConnectionFactoryUtils.releaseConnection(connection, connectionFactory))
145+
.then(ConnectionFactoryUtils.closeConnection(connection, connectionFactory))) // TODO: Is this rather
146+
// related to
147+
// TransactionContext
148+
// cleanup?
149+
.doFinally(s -> synchronization.unregisterTransaction(currentSynchronization));
150+
});
151+
}
152+
153+
/**
154+
* Potentially register a {@link ReactiveTransactionSynchronization} in the {@link Context} if no synchronization
155+
* object is registered.
156+
*
157+
* @param context the subscriber context.
158+
* @return subscriber context with a registered synchronization.
159+
*/
160+
static Context withTransactionSynchronization(Context context) {
161+
162+
// associate synchronizer object to host transactional resources.
163+
// TODO: Should be moved to a better place.
164+
return context.put(ReactiveTransactionSynchronization.class,
165+
context.getOrDefault(ReactiveTransactionSynchronization.class, new ReactiveTransactionSynchronization()));
166+
}
167+
}
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
/*
2+
* Copyright 2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.data.r2dbc.function;
17+
18+
import io.r2dbc.spi.ConnectionFactory;
19+
20+
import java.util.function.Consumer;
21+
22+
import org.springframework.data.r2dbc.function.DatabaseClient.Builder;
23+
import org.springframework.data.r2dbc.support.R2dbcExceptionTranslator;
24+
import org.springframework.util.Assert;
25+
26+
/**
27+
* @author Mark Paluch
28+
*/
29+
class DefaultTransactionalDatabaseClientBuilder extends DefaultDatabaseClientBuilder
30+
implements TransactionalDatabaseClient.Builder {
31+
32+
DefaultTransactionalDatabaseClientBuilder() {}
33+
34+
DefaultTransactionalDatabaseClientBuilder(DefaultDatabaseClientBuilder other) {
35+
36+
Assert.notNull(other, "DefaultDatabaseClientBuilder must not be null!");
37+
38+
this.connector = other.connector;
39+
this.exceptionTranslator = other.exceptionTranslator;
40+
}
41+
42+
@Override
43+
public DatabaseClient.Builder clone() {
44+
return new DefaultTransactionalDatabaseClientBuilder(this);
45+
}
46+
47+
/* (non-Javadoc)
48+
* @see org.springframework.data.r2dbc.function.DefaultDatabaseClientBuilder#connectionFactory(io.r2dbc.spi.ConnectionFactory)
49+
*/
50+
@Override
51+
public TransactionalDatabaseClient.Builder connectionFactory(ConnectionFactory factory) {
52+
super.connectionFactory(factory);
53+
return this;
54+
}
55+
56+
/* (non-Javadoc)
57+
* @see org.springframework.data.r2dbc.function.DefaultDatabaseClientBuilder#exceptionTranslator(org.springframework.data.r2dbc.support.R2dbcExceptionTranslator)
58+
*/
59+
@Override
60+
public TransactionalDatabaseClient.Builder exceptionTranslator(R2dbcExceptionTranslator exceptionTranslator) {
61+
super.exceptionTranslator(exceptionTranslator);
62+
return this;
63+
}
64+
65+
/* (non-Javadoc)
66+
* @see org.springframework.data.r2dbc.function.DefaultDatabaseClientBuilder#dataAccessStrategy(org.springframework.data.r2dbc.function.ReactiveDataAccessStrategy)
67+
*/
68+
@Override
69+
public TransactionalDatabaseClient.Builder dataAccessStrategy(ReactiveDataAccessStrategy accessStrategy) {
70+
super.dataAccessStrategy(accessStrategy);
71+
return this;
72+
}
73+
74+
/* (non-Javadoc)
75+
* @see org.springframework.data.r2dbc.function.DefaultDatabaseClientBuilder#apply(java.util.function.Consumer)
76+
*/
77+
@Override
78+
public TransactionalDatabaseClient.Builder apply(Consumer<Builder> builderConsumer) {
79+
super.apply(builderConsumer);
80+
return this;
81+
}
82+
83+
/* (non-Javadoc)
84+
* @see org.springframework.data.r2dbc.function.DefaultDatabaseClientBuilder#build()
85+
*/
86+
@Override
87+
public TransactionalDatabaseClient build() {
88+
return (TransactionalDatabaseClient) super.build();
89+
}
90+
91+
/* (non-Javadoc)
92+
* @see org.springframework.data.r2dbc.function.DefaultDatabaseClientBuilder#doBuild(io.r2dbc.spi.ConnectionFactory, org.springframework.data.r2dbc.support.R2dbcExceptionTranslator, org.springframework.data.r2dbc.function.ReactiveDataAccessStrategy, org.springframework.data.r2dbc.function.DefaultDatabaseClientBuilder)
93+
*/
94+
@Override
95+
protected DatabaseClient doBuild(ConnectionFactory connector, R2dbcExceptionTranslator exceptionTranslator,
96+
ReactiveDataAccessStrategy accessStrategy, DefaultDatabaseClientBuilder builder) {
97+
return new DefaultTransactionalDatabaseClient(connector, exceptionTranslator, accessStrategy, builder);
98+
}
99+
}

0 commit comments

Comments
 (0)