Skip to content

Commit dee0358

Browse files
authored
Add $documents stage (#1061)
JAVA-4654
1 parent 0c174ae commit dee0358

File tree

5 files changed

+154
-22
lines changed

5 files changed

+154
-22
lines changed

driver-core/src/main/com/mongodb/client/model/Aggregates.java

Lines changed: 56 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -299,9 +299,14 @@ public static Bson lookup(final String from, final String localField, final Stri
299299
}
300300

301301
/**
302-
* Creates a $lookup pipeline stage, joining the current collection with the one specified in from using the given pipeline
303-
*
304-
* @param from the name of the collection in the same database to perform the join with.
302+
* Creates a $lookup pipeline stage, joining the current collection with the
303+
* one specified in from using the given pipeline. If the first stage in the
304+
* pipeline is a {@link Aggregates#documents(List) $documents} stage, then
305+
* the {@code from} collection is ignored.
306+
*
307+
* @param from the name of the collection in the same database to
308+
* perform the join with. May be {$code null} if the
309+
* first pipeline stage is $documents.
305310
* @param pipeline the pipeline to run on the joined collection.
306311
* @param as the name of the new array field to add to the input documents.
307312
* @return the $lookup pipeline stage
@@ -310,15 +315,20 @@ public static Bson lookup(final String from, final String localField, final Stri
310315
* @since 3.7
311316
*
312317
*/
313-
public static Bson lookup(final String from, final List<? extends Bson> pipeline, final String as) {
318+
public static Bson lookup(@Nullable final String from, final List<? extends Bson> pipeline, final String as) {
314319
return lookup(from, null, pipeline, as);
315320
}
316321

317322
/**
318-
* Creates a $lookup pipeline stage, joining the current collection with the one specified in from using the given pipeline
323+
* Creates a $lookup pipeline stage, joining the current collection with the
324+
* one specified in from using the given pipeline. If the first stage in the
325+
* pipeline is a {@link Aggregates#documents(List) $documents} stage, then
326+
* the {@code from} collection is ignored.
319327
*
320328
* @param <TExpression> the Variable value expression type
321-
* @param from the name of the collection in the same database to perform the join with.
329+
* @param from the name of the collection in the same database to
330+
* perform the join with. May be {$code null} if the
331+
* first pipeline stage is $documents.
322332
* @param let the variables to use in the pipeline field stages.
323333
* @param pipeline the pipeline to run on the joined collection.
324334
* @param as the name of the new array field to add to the input documents.
@@ -327,7 +337,7 @@ public static Bson lookup(final String from, final List<? extends Bson> pipeline
327337
* @mongodb.server.release 3.6
328338
* @since 3.7
329339
*/
330-
public static <TExpression> Bson lookup(final String from, @Nullable final List<Variable<TExpression>> let,
340+
public static <TExpression> Bson lookup(@Nullable final String from, @Nullable final List<Variable<TExpression>> let,
331341
final List<? extends Bson> pipeline, final String as) {
332342
return new LookupStage<>(from, let, pipeline, as);
333343
}
@@ -928,7 +938,7 @@ public static Bson searchMeta(final SearchCollector collector, final SearchOptio
928938
*
929939
* @param fields the fields to exclude. May use dot notation.
930940
* @return the $unset pipeline stage
931-
* @mongodb.driver.manual reference/operator/aggregation/project/ $unset
941+
* @mongodb.driver.manual reference/operator/aggregation/unset/ $unset
932942
* @mongodb.server.release 4.2
933943
* @since 4.8
934944
*/
@@ -941,7 +951,7 @@ public static Bson unset(final String... fields) {
941951
*
942952
* @param fields the fields to exclude. May use dot notation.
943953
* @return the $unset pipeline stage
944-
* @mongodb.driver.manual reference/operator/aggregation/project/ $unset
954+
* @mongodb.driver.manual reference/operator/aggregation/unset/ $unset
945955
* @mongodb.server.release 4.2
946956
* @since 4.8
947957
*/
@@ -962,7 +972,7 @@ public static Bson unset(final List<String> fields) {
962972
* To specify a field within an embedded document, use dot notation.
963973
* @param options {@link GeoNearOptions}
964974
* @return the $geoNear pipeline stage
965-
* @mongodb.driver.manual reference/operator/aggregation/project/ $geoNear
975+
* @mongodb.driver.manual reference/operator/aggregation/geoNear/ $geoNear
966976
* @since 4.8
967977
*/
968978
public static Bson geoNear(
@@ -1012,7 +1022,7 @@ public String toString() {
10121022
* @param distanceField The output field that contains the calculated distance.
10131023
* To specify a field within an embedded document, use dot notation.
10141024
* @return the $geoNear pipeline stage
1015-
* @mongodb.driver.manual reference/operator/aggregation/project/ $geoNear
1025+
* @mongodb.driver.manual reference/operator/aggregation/geoNear/ $geoNear
10161026
* @since 4.8
10171027
*/
10181028
public static Bson geoNear(
@@ -1021,6 +1031,33 @@ public static Bson geoNear(
10211031
return geoNear(near, distanceField, geoNearOptions());
10221032
}
10231033

1034+
/**
1035+
* Creates a $documents pipeline stage.
1036+
*
1037+
* @param documents the documents.
1038+
* @return the $documents pipeline stage.
1039+
* @mongodb.driver.manual reference/operator/aggregation/documents/ $documents
1040+
* @mongodb.server.release 5.1
1041+
* @since 4.9
1042+
*/
1043+
public static Bson documents(final List<? extends Bson> documents) {
1044+
notNull("documents", documents);
1045+
return new Bson() {
1046+
@Override
1047+
public <TDocument> BsonDocument toBsonDocument(final Class<TDocument> documentClass, final CodecRegistry codecRegistry) {
1048+
BsonDocumentWriter writer = new BsonDocumentWriter(new BsonDocument());
1049+
writer.writeStartDocument();
1050+
writer.writeStartArray("$documents");
1051+
for (Bson bson : documents) {
1052+
BuildersHelper.encodeValue(writer, bson, codecRegistry);
1053+
}
1054+
writer.writeEndArray();
1055+
writer.writeEndDocument();
1056+
return writer.getDocument();
1057+
}
1058+
};
1059+
}
1060+
10241061
static void writeBucketOutput(final CodecRegistry codecRegistry, final BsonDocumentWriter writer,
10251062
@Nullable final List<BsonField> output) {
10261063
if (output != null) {
@@ -1242,8 +1279,11 @@ private static final class LookupStage<TExpression> implements Bson {
12421279
private final List<? extends Bson> pipeline;
12431280
private final String as;
12441281

1245-
private LookupStage(final String from, @Nullable final List<Variable<TExpression>> let, final List<? extends Bson> pipeline,
1246-
final String as) {
1282+
private LookupStage(
1283+
@Nullable final String from,
1284+
@Nullable final List<Variable<TExpression>> let,
1285+
final List<? extends Bson> pipeline,
1286+
final String as) {
12471287
this.from = from;
12481288
this.let = let;
12491289
this.pipeline = pipeline;
@@ -1258,7 +1298,9 @@ public <TDocument> BsonDocument toBsonDocument(final Class<TDocument> tDocumentC
12581298

12591299
writer.writeStartDocument("$lookup");
12601300

1261-
writer.writeString("from", from);
1301+
if (from != null) {
1302+
writer.writeString("from", from);
1303+
}
12621304

12631305
if (let != null) {
12641306
writer.writeStartDocument("let");

driver-core/src/test/functional/com/mongodb/client/model/AggregatesTest.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.bson.BsonDocument;
2222
import org.bson.Document;
2323

24+
import java.util.Arrays;
2425
import java.util.Collections;
2526
import java.util.List;
2627

@@ -160,4 +161,53 @@ public void testGeoNear() {
160161
+ " }\n"
161162
+ "}]");
162163
}
164+
165+
@Test
166+
public void testDocuments() {
167+
assumeTrue(serverVersionAtLeast(5, 1));
168+
Bson stage = Aggregates.documents(asList(
169+
Document.parse("{a: 1, b: {$add: [1, 1]} }"),
170+
BsonDocument.parse("{a: 3, b: 4}")));
171+
assertPipeline(
172+
"{$documents: [{a: 1, b: {$add: [1, 1]}}, {a: 3, b: 4}]}",
173+
stage);
174+
175+
List<Bson> pipeline = Arrays.asList(stage);
176+
getCollectionHelper().aggregateDb(pipeline);
177+
178+
assertEquals(
179+
parseToList("[{a: 1, b: 2}, {a: 3, b: 4}]"),
180+
getCollectionHelper().aggregateDb(pipeline));
181+
182+
// accepts lists of Documents and BsonDocuments
183+
List<BsonDocument> documents = Arrays.asList(BsonDocument.parse("{a: 1, b: 2}"));
184+
assertPipeline("{$documents: [{a: 1, b: 2}]}", Aggregates.documents(documents));
185+
List<BsonDocument> bsonDocuments = Arrays.asList(BsonDocument.parse("{a: 1, b: 2}"));
186+
assertPipeline("{$documents: [{a: 1, b: 2}]}", Aggregates.documents(bsonDocuments));
187+
}
188+
189+
@Test
190+
public void testDocumentsLookup() {
191+
assumeTrue(serverVersionAtLeast(5, 1));
192+
193+
getCollectionHelper().insertDocuments("[{_id: 1, a: 8}, {_id: 2, a: 9}]");
194+
Bson documentsStage = Aggregates.documents(asList(Document.parse("{a: 5}")));
195+
196+
Bson lookupStage = Aggregates.lookup("ignored", Arrays.asList(documentsStage), "added");
197+
assertPipeline(
198+
"{'$lookup': {'from': 'ignored', 'pipeline': [{'$documents': [{'a': 5}]}], 'as': 'added'}}",
199+
lookupStage);
200+
assertEquals(
201+
parseToList("[{_id:1, a:8, added: [{a: 5}]}, {_id:2, a:9, added: [{a: 5}]}]"),
202+
getCollectionHelper().aggregate(Arrays.asList(lookupStage)));
203+
204+
// null variant
205+
Bson lookupStageNull = Aggregates.lookup(null, Arrays.asList(documentsStage), "added");
206+
assertPipeline(
207+
"{'$lookup': {'pipeline': [{'$documents': [{'a': 5}]}], 'as': 'added'}}",
208+
lookupStageNull);
209+
assertEquals(
210+
parseToList("[{_id:1, a:8, added: [{a: 5}]}, {_id:2, a:9, added: [{a: 5}]}]"),
211+
getCollectionHelper().aggregate(Arrays.asList(lookupStageNull)));
212+
}
163213
}

driver-core/src/test/functional/com/mongodb/client/test/CollectionHelper.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import com.mongodb.internal.bulk.InsertRequest;
3434
import com.mongodb.internal.bulk.UpdateRequest;
3535
import com.mongodb.internal.bulk.WriteRequest;
36+
import com.mongodb.internal.client.model.AggregationLevel;
3637
import com.mongodb.internal.operation.AggregateOperation;
3738
import com.mongodb.internal.operation.BatchCursor;
3839
import com.mongodb.internal.operation.CommandReadOperation;
@@ -281,12 +282,20 @@ public List<T> aggregate(final List<Bson> pipeline) {
281282
}
282283

283284
public <D> List<D> aggregate(final List<Bson> pipeline, final Decoder<D> decoder) {
284-
List<BsonDocument> bsonDocumentPipeline = new ArrayList<>();
285+
return aggregate(pipeline, decoder, AggregationLevel.COLLECTION);
286+
}
287+
288+
public List<T> aggregateDb(final List<Bson> pipeline) {
289+
return aggregate(pipeline, codec, AggregationLevel.DATABASE);
290+
}
291+
292+
private <D> List<D> aggregate(final List<Bson> pipeline, final Decoder<D> decoder, final AggregationLevel level) {
293+
List<BsonDocument> bsonDocumentPipeline = new ArrayList<BsonDocument>();
285294
for (Bson cur : pipeline) {
286295
bsonDocumentPipeline.add(cur.toBsonDocument(Document.class, registry));
287296
}
288-
BatchCursor<D> cursor = new AggregateOperation<>(namespace, bsonDocumentPipeline, decoder)
289-
.execute(getBinding());
297+
BatchCursor<D> cursor = new AggregateOperation<D>(namespace, bsonDocumentPipeline, decoder, level)
298+
.execute(getBinding());
290299
List<D> results = new ArrayList<>();
291300
while (cursor.hasNext()) {
292301
results.addAll(cursor.next());

driver-scala/src/main/scala/org/mongodb/scala/model/Aggregates.scala

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -324,9 +324,14 @@ object Aggregates {
324324
JAggregates.lookup(from, localField, foreignField, as)
325325

326326
/**
327-
* Creates a `\$lookup` pipeline stage, joining the current collection with the one specified in from using the given pipeline
328-
*
329-
* @param from the name of the collection in the same database to perform the join with.
327+
* Creates a `\$lookup` pipeline stage, joining the current collection with
328+
* the one specified in from using the given pipeline. If the first stage in
329+
* the pipeline is a `\$documents` stage, then the "from" collection is
330+
* ignored.
331+
*
332+
* @param from the name of the collection in the same database to
333+
* perform the join with. May be null if the
334+
* first pipeline stage is `\$documents`.
330335
* @param pipeline the pipeline to run on the joined collection.
331336
* @param as the name of the new array field to add to the input documents.
332337
* @return the `\$lookup` pipeline stage:
@@ -338,9 +343,14 @@ object Aggregates {
338343
JAggregates.lookup(from, pipeline.asJava, as)
339344

340345
/**
341-
* Creates a `\$lookup` pipeline stage, joining the current collection with the one specified in from using the given pipeline
346+
* Creates a `\$lookup` pipeline stage, joining the current collection with
347+
* the one specified in from using the given pipeline. If the first stage in
348+
* the pipeline is a `\$documents` stage, then the "from" collection is
349+
* ignored.
342350
*
343-
* @param from the name of the collection in the same database to perform the join with.
351+
* @param from the name of the collection in the same database to
352+
* perform the join with. May be null if the
353+
* first pipeline stage is `\$documents`.
344354
* @param let the variables to use in the pipeline field stages.
345355
* @param pipeline the pipeline to run on the joined collection.
346356
* @param as the name of the new array field to add to the input documents.
@@ -746,4 +756,14 @@ object Aggregates {
746756
*/
747757
def geoNear(near: Point, distanceField: String): Bson =
748758
JAggregates.geoNear(near, distanceField)
759+
760+
/**
761+
* Creates a `\$documents` pipeline stage.
762+
*
763+
* @param documents the documents.
764+
* @return the `\$documents` pipeline stage
765+
* @see [[https://www.mongodb.com/docs/manual/reference/operator/aggregation/documents/ \$documents]]
766+
* @since 4.9
767+
*/
768+
def documents(documents: Bson*): Bson = JAggregates.documents(documents.asJava)
749769
}

driver-scala/src/test/scala/org/mongodb/scala/model/AggregatesSpec.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -805,4 +805,15 @@ class AggregatesSpec extends BaseSpec {
805805
|}""".stripMargin)
806806
)
807807
}
808+
809+
it should "render $documents" in {
810+
toBson(
811+
Aggregates.documents(
812+
org.mongodb.scala.bson.BsonDocument("""{a: 1, b: {$add: [1, 1]} }"""),
813+
Document("""{a: 3, b: 4}""")
814+
)
815+
) should equal(
816+
Document("""{$documents: [{a: 1, b: {$add: [1, 1]}}, {a: 3, b: 4}]}""")
817+
)
818+
}
808819
}

0 commit comments

Comments
 (0)