Skip to content

Commit 4655b4c

Browse files
committed
Kotlin coroutine update.
As the Flow interface is not stable for inheritance in 3rd party libraries. Using AbstractFlow as recommended instead via the sealed MongoAbstractFlow class. JAVA-4950
1 parent 77b549d commit 4655b4c

File tree

10 files changed

+48
-48
lines changed

10 files changed

+48
-48
lines changed

config/detekt/baseline.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
<ID>MaxLineLength:MapReduceIterable.kt$MapReduceIterable$*</ID>
1111
<ID>SwallowedException:MockitoHelper.kt$MockitoHelper.DeepReflectionEqMatcher$e: Throwable</ID>
1212
<ID>TooManyFunctions:ClientSession.kt$ClientSession : jClientSession</ID>
13-
<ID>TooManyFunctions:FindFlow.kt$FindFlow&lt;T : Any> : Flow</ID>
13+
<ID>TooManyFunctions:FindFlow.kt$FindFlow&lt;T : Any> : MongoAbstractFlow</ID>
1414
<ID>TooManyFunctions:FindIterable.kt$FindIterable&lt;T : Any> : MongoIterable</ID>
1515
<ID>TooManyFunctions:MongoCollection.kt$MongoCollection&lt;T : Any></ID>
1616
<ID>TooManyFunctions:MongoDatabase.kt$MongoDatabase</ID>

driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/AggregateFlow.kt

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,6 @@ import com.mongodb.ExplainVerbosity
1919
import com.mongodb.client.model.Collation
2020
import com.mongodb.reactivestreams.client.AggregatePublisher
2121
import java.util.concurrent.TimeUnit
22-
import kotlinx.coroutines.flow.Flow
23-
import kotlinx.coroutines.flow.FlowCollector
24-
import kotlinx.coroutines.reactive.asFlow
2522
import kotlinx.coroutines.reactive.awaitFirstOrNull
2623
import kotlinx.coroutines.reactive.awaitSingle
2724
import org.bson.BsonValue
@@ -34,7 +31,7 @@ import org.bson.conversions.Bson
3431
* @param T The type of the result.
3532
* @see [Aggregation command](https://www.mongodb.com/docs/manual/reference/command/aggregate)
3633
*/
37-
public class AggregateFlow<T : Any>(private val wrapped: AggregatePublisher<T>) : Flow<T> {
34+
public class AggregateFlow<T : Any>(private val wrapped: AggregatePublisher<T>) : MongoAbstractFlow<T>(wrapped) {
3835

3936
/**
4037
* Sets the number of documents to return per batch.
@@ -167,7 +164,6 @@ public class AggregateFlow<T : Any>(private val wrapped: AggregatePublisher<T>)
167164
/**
168165
* Explain the execution plan for this operation with the given verbosity level
169166
*
170-
* @param R the type of the document class
171167
* @param verbosity the verbosity of the explanation
172168
* @return the execution plan
173169
* @see [Explain command](https://www.mongodb.com/docs/manual/reference/command/explain/)
@@ -198,6 +194,4 @@ public class AggregateFlow<T : Any>(private val wrapped: AggregatePublisher<T>)
198194
*/
199195
public suspend inline fun <reified R : Any> explain(verbosity: ExplainVerbosity? = null): R =
200196
explain(R::class.java, verbosity)
201-
202-
public override suspend fun collect(collector: FlowCollector<T>): Unit = wrapped.asFlow().collect(collector)
203197
}

driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/ChangeStreamFlow.kt

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import com.mongodb.client.model.changestream.FullDocumentBeforeChange
2222
import com.mongodb.reactivestreams.client.ChangeStreamPublisher
2323
import java.util.concurrent.TimeUnit
2424
import kotlinx.coroutines.flow.Flow
25-
import kotlinx.coroutines.flow.FlowCollector
2625
import kotlinx.coroutines.reactive.asFlow
2726
import org.bson.BsonDocument
2827
import org.bson.BsonTimestamp
@@ -37,7 +36,8 @@ import org.bson.BsonValue
3736
*
3837
* @param T The type of the result.
3938
*/
40-
public class ChangeStreamFlow<T : Any>(private val wrapped: ChangeStreamPublisher<T>) : Flow<ChangeStreamDocument<T>> {
39+
public class ChangeStreamFlow<T : Any>(private val wrapped: ChangeStreamPublisher<T>) :
40+
MongoAbstractFlow<ChangeStreamDocument<T>>(wrapped) {
4141

4242
/**
4343
* Sets the fullDocument value.
@@ -173,6 +173,4 @@ public class ChangeStreamFlow<T : Any>(private val wrapped: ChangeStreamPublishe
173173
public fun showExpandedEvents(showExpandedEvents: Boolean): ChangeStreamFlow<T> = apply {
174174
wrapped.showExpandedEvents(showExpandedEvents)
175175
}
176-
public override suspend fun collect(collector: FlowCollector<ChangeStreamDocument<T>>): Unit =
177-
wrapped.asFlow().collect(collector)
178176
}

driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/DistinctFlow.kt

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,6 @@ package com.mongodb.kotlin.client.coroutine
1818
import com.mongodb.client.model.Collation
1919
import com.mongodb.reactivestreams.client.DistinctPublisher
2020
import java.util.concurrent.TimeUnit
21-
import kotlinx.coroutines.flow.Flow
22-
import kotlinx.coroutines.flow.FlowCollector
23-
import kotlinx.coroutines.reactive.asFlow
2421
import org.bson.BsonValue
2522
import org.bson.conversions.Bson
2623

@@ -30,7 +27,7 @@ import org.bson.conversions.Bson
3027
* @param T The type of the result.
3128
* @see [Distinct command](https://www.mongodb.com/docs/manual/reference/command/distinct/)
3229
*/
33-
public class DistinctFlow<T : Any>(private val wrapped: DistinctPublisher<T>) : Flow<T> {
30+
public class DistinctFlow<T : Any>(private val wrapped: DistinctPublisher<T>) : MongoAbstractFlow<T>(wrapped) {
3431

3532
/**
3633
* Sets the number of documents to return per batch.
@@ -86,6 +83,4 @@ public class DistinctFlow<T : Any>(private val wrapped: DistinctPublisher<T>) :
8683
* @return this
8784
*/
8885
public fun comment(comment: BsonValue?): DistinctFlow<T> = apply { wrapped.comment(comment) }
89-
90-
public override suspend fun collect(collector: FlowCollector<T>): Unit = wrapped.asFlow().collect(collector)
9186
}

driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/FindFlow.kt

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,6 @@ import com.mongodb.ExplainVerbosity
2020
import com.mongodb.client.model.Collation
2121
import com.mongodb.reactivestreams.client.FindPublisher
2222
import java.util.concurrent.TimeUnit
23-
import kotlinx.coroutines.flow.Flow
24-
import kotlinx.coroutines.flow.FlowCollector
25-
import kotlinx.coroutines.reactive.asFlow
2623
import kotlinx.coroutines.reactive.awaitSingle
2724
import org.bson.BsonValue
2825
import org.bson.Document
@@ -34,7 +31,7 @@ import org.bson.conversions.Bson
3431
* @param T The type of the result.
3532
* @see [Collection filter](https://www.mongodb.com/docs/manual/reference/method/db.collection.find/)
3633
*/
37-
public class FindFlow<T : Any>(private val wrapped: FindPublisher<T>) : Flow<T> {
34+
public class FindFlow<T : Any>(private val wrapped: FindPublisher<T>) : MongoAbstractFlow<T>(wrapped) {
3835

3936
/**
4037
* Sets the number of documents to return per batch.
@@ -292,6 +289,4 @@ public class FindFlow<T : Any>(private val wrapped: FindPublisher<T>) : Flow<T>
292289
*/
293290
public suspend inline fun <reified R : Any> explain(verbosity: ExplainVerbosity? = null): R =
294291
explain(R::class.java, verbosity)
295-
296-
public override suspend fun collect(collector: FlowCollector<T>): Unit = wrapped.asFlow().collect(collector)
297292
}

driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/ListCollectionsFlow.kt

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,6 @@ package com.mongodb.kotlin.client.coroutine
1717

1818
import com.mongodb.reactivestreams.client.ListCollectionsPublisher
1919
import java.util.concurrent.TimeUnit
20-
import kotlinx.coroutines.flow.Flow
21-
import kotlinx.coroutines.flow.FlowCollector
22-
import kotlinx.coroutines.reactive.asFlow
2320
import org.bson.BsonValue
2421
import org.bson.conversions.Bson
2522

@@ -29,7 +26,8 @@ import org.bson.conversions.Bson
2926
* @param T The type of the result.
3027
* @see [List collections](https://www.mongodb.com/docs/manual/reference/command/listCollections/)
3128
*/
32-
public class ListCollectionsFlow<T : Any>(private val wrapped: ListCollectionsPublisher<T>) : Flow<T> {
29+
public class ListCollectionsFlow<T : Any>(private val wrapped: ListCollectionsPublisher<T>) :
30+
MongoAbstractFlow<T>(wrapped) {
3331
/**
3432
* Sets the maximum execution time on the server for this operation.
3533
*
@@ -74,6 +72,4 @@ public class ListCollectionsFlow<T : Any>(private val wrapped: ListCollectionsPu
7472
* @return this
7573
*/
7674
public fun comment(comment: BsonValue?): ListCollectionsFlow<T> = apply { wrapped.comment(comment) }
77-
78-
public override suspend fun collect(collector: FlowCollector<T>): Unit = wrapped.asFlow().collect(collector)
7975
}

driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/ListDatabasesFlow.kt

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,6 @@ package com.mongodb.kotlin.client.coroutine
1717

1818
import com.mongodb.reactivestreams.client.ListDatabasesPublisher
1919
import java.util.concurrent.TimeUnit
20-
import kotlinx.coroutines.flow.Flow
21-
import kotlinx.coroutines.flow.FlowCollector
22-
import kotlinx.coroutines.reactive.asFlow
2320
import org.bson.BsonValue
2421
import org.bson.conversions.Bson
2522

@@ -29,7 +26,8 @@ import org.bson.conversions.Bson
2926
* @param T The type of the result.
3027
* @see [List databases](https://www.mongodb.com/docs/manual/reference/command/listDatabases/)
3128
*/
32-
public class ListDatabasesFlow<T : Any>(private val wrapped: ListDatabasesPublisher<T>) : Flow<T> {
29+
public class ListDatabasesFlow<T : Any>(private val wrapped: ListDatabasesPublisher<T>) :
30+
MongoAbstractFlow<T>(wrapped) {
3331
/**
3432
* Sets the maximum execution time on the server for this operation.
3533
*
@@ -93,6 +91,4 @@ public class ListDatabasesFlow<T : Any>(private val wrapped: ListDatabasesPublis
9391
* @return this
9492
*/
9593
public fun comment(comment: BsonValue?): ListDatabasesFlow<T> = apply { wrapped.comment(comment) }
96-
97-
public override suspend fun collect(collector: FlowCollector<T>): Unit = wrapped.asFlow().collect(collector)
9894
}

driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/ListIndexesFlow.kt

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,6 @@ package com.mongodb.kotlin.client.coroutine
1717

1818
import com.mongodb.reactivestreams.client.ListIndexesPublisher
1919
import java.util.concurrent.TimeUnit
20-
import kotlinx.coroutines.flow.Flow
21-
import kotlinx.coroutines.flow.FlowCollector
22-
import kotlinx.coroutines.reactive.asFlow
2320
import org.bson.BsonValue
2421

2522
/**
@@ -28,7 +25,7 @@ import org.bson.BsonValue
2825
* @param T The type of the result.
2926
* @see [List indexes](https://www.mongodb.com/docs/manual/reference/command/listIndexes/)
3027
*/
31-
public class ListIndexesFlow<T : Any>(private val wrapped: ListIndexesPublisher<T>) : Flow<T> {
28+
public class ListIndexesFlow<T : Any>(private val wrapped: ListIndexesPublisher<T>) : MongoAbstractFlow<T>(wrapped) {
3229
/**
3330
* Sets the maximum execution time on the server for this operation.
3431
*
@@ -65,6 +62,4 @@ public class ListIndexesFlow<T : Any>(private val wrapped: ListIndexesPublisher<
6562
* @return this
6663
*/
6764
public fun comment(comment: BsonValue?): ListIndexesFlow<T> = apply { wrapped.comment(comment) }
68-
69-
public override suspend fun collect(collector: FlowCollector<T>): Unit = wrapped.asFlow().collect(collector)
7065
}

driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/MapReduceFlow.kt

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,6 @@ import com.mongodb.client.model.Collation
2121
import com.mongodb.client.model.MapReduceAction
2222
import com.mongodb.reactivestreams.client.MapReducePublisher
2323
import java.util.concurrent.TimeUnit
24-
import kotlinx.coroutines.flow.Flow
25-
import kotlinx.coroutines.flow.FlowCollector
26-
import kotlinx.coroutines.reactive.asFlow
2724
import kotlinx.coroutines.reactive.awaitFirstOrNull
2825
import org.bson.conversions.Bson
2926

@@ -36,7 +33,7 @@ import org.bson.conversions.Bson
3633
* @see [Map Reduce](https://www.mongodb.com/docs/manual/reference/command/mapReduce/)
3734
*/
3835
@Deprecated("Map Reduce has been deprecated. Use Aggregation instead", replaceWith = ReplaceWith(""))
39-
public class MapReduceFlow<T : Any>(private val wrapped: MapReducePublisher<T>) : Flow<T> {
36+
public class MapReduceFlow<T : Any>(private val wrapped: MapReducePublisher<T>) : MongoAbstractFlow<T>(wrapped) {
4037
/**
4138
* Sets the number of documents to return per batch.
4239
*
@@ -209,6 +206,4 @@ public class MapReduceFlow<T : Any>(private val wrapped: MapReducePublisher<T>)
209206
* @return this
210207
*/
211208
public fun collation(collation: Collation?): MapReduceFlow<T> = apply { wrapped.collation(collation) }
212-
213-
public override suspend fun collect(collector: FlowCollector<T>): Unit = wrapped.asFlow().collect(collector)
214209
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Copyright 2008-present MongoDB, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.mongodb.kotlin.client.coroutine
17+
18+
import kotlinx.coroutines.FlowPreview
19+
import kotlinx.coroutines.flow.AbstractFlow
20+
import kotlinx.coroutines.flow.FlowCollector
21+
import kotlinx.coroutines.reactive.asFlow
22+
import org.reactivestreams.Publisher
23+
24+
/**
25+
* The Mongo Abstract Flow implementation
26+
*
27+
* @param T The type of the result.
28+
* @param wrapped the underlying publisher
29+
*/
30+
@OptIn(FlowPreview::class)
31+
public sealed class MongoAbstractFlow<T : Any>(private val wrapped: Publisher<T>) : AbstractFlow<T>() {
32+
33+
override suspend fun collectSafely(collector: FlowCollector<T>) {
34+
wrapped.asFlow().collect(collector)
35+
}
36+
}

0 commit comments

Comments
 (0)