From 5c30dfd90ed0f340b9f6c9099d37743cd2224d7d Mon Sep 17 00:00:00 2001 From: jyemin Date: Thu, 27 Oct 2022 22:20:21 -0400 Subject: [PATCH] Add AsyncOperations to match SyncOperations JAVA-4795 --- .../internal/operation/AsyncOperations.java | 274 ++++++++++++++++++ .../internal/operation/Operations.java | 84 +++--- .../internal/operation/SyncOperations.java | 2 +- .../internal/AggregatePublisherImpl.java | 4 +- .../client/internal/BatchCursorPublisher.java | 4 +- .../client/internal/FindPublisherImpl.java | 4 +- .../internal/MongoOperationPublisher.java | 8 +- .../client/internal/TestHelper.java | 3 - 8 files changed, 326 insertions(+), 57 deletions(-) create mode 100644 driver-core/src/main/com/mongodb/internal/operation/AsyncOperations.java diff --git a/driver-core/src/main/com/mongodb/internal/operation/AsyncOperations.java b/driver-core/src/main/com/mongodb/internal/operation/AsyncOperations.java new file mode 100644 index 00000000000..8d1e2643025 --- /dev/null +++ b/driver-core/src/main/com/mongodb/internal/operation/AsyncOperations.java @@ -0,0 +1,274 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mongodb.internal.operation; + +import com.mongodb.AutoEncryptionSettings; +import com.mongodb.MongoNamespace; +import com.mongodb.ReadConcern; +import com.mongodb.ReadPreference; +import com.mongodb.WriteConcern; +import com.mongodb.bulk.BulkWriteResult; +import com.mongodb.client.model.BulkWriteOptions; +import com.mongodb.client.model.Collation; +import com.mongodb.client.model.CountOptions; +import com.mongodb.client.model.CreateIndexOptions; +import com.mongodb.client.model.DeleteOptions; +import com.mongodb.client.model.DropCollectionOptions; +import com.mongodb.client.model.DropIndexOptions; +import com.mongodb.client.model.EstimatedDocumentCountOptions; +import com.mongodb.client.model.FindOneAndDeleteOptions; +import com.mongodb.client.model.FindOneAndReplaceOptions; +import com.mongodb.client.model.FindOneAndUpdateOptions; +import com.mongodb.client.model.IndexModel; +import com.mongodb.client.model.InsertManyOptions; +import com.mongodb.client.model.InsertOneOptions; +import com.mongodb.client.model.RenameCollectionOptions; +import com.mongodb.client.model.ReplaceOptions; +import com.mongodb.client.model.UpdateOptions; +import com.mongodb.client.model.WriteModel; +import com.mongodb.internal.async.AsyncBatchCursor; +import com.mongodb.internal.client.model.AggregationLevel; +import com.mongodb.internal.client.model.FindOptions; +import org.bson.BsonValue; +import org.bson.codecs.configuration.CodecRegistry; +import org.bson.conversions.Bson; + +import java.util.List; + +/** + *

This class is not part of the public API and may be removed or changed at any time

+ */ +public final class AsyncOperations { + private final Operations operations; + + public AsyncOperations(final MongoNamespace namespace, final Class documentClass, final ReadPreference readPreference, + final CodecRegistry codecRegistry, final ReadConcern readConcern, final WriteConcern writeConcern, + final boolean retryWrites, final boolean retryReads) { + this.operations = new Operations<>(namespace, documentClass, readPreference, codecRegistry, readConcern, writeConcern, + retryWrites, retryReads); + } + + public MongoNamespace getNamespace() { + return operations.getNamespace(); + } + + public Class getDocumentClass() { + return operations.getDocumentClass(); + } + + public ReadPreference getReadPreference() { + return operations.getReadPreference(); + } + + public CodecRegistry getCodecRegistry() { + return operations.getCodecRegistry(); + } + + public ReadConcern getReadConcern() { + return operations.getReadConcern(); + } + + public WriteConcern getWriteConcern() { + return operations.getWriteConcern(); + } + + public boolean isRetryWrites() { + return operations.isRetryWrites(); + } + + public boolean isRetryReads() { + return operations.isRetryReads(); + } + + public AsyncReadOperation countDocuments(final Bson filter, final CountOptions options) { + return operations.countDocuments(filter, options); + } + + public AsyncReadOperation estimatedDocumentCount(final EstimatedDocumentCountOptions options) { + return operations.estimatedDocumentCount(options); + } + + public AsyncReadOperation> findFirst(final Bson filter, final Class resultClass, + final FindOptions options) { + return operations.findFirst(filter, resultClass, options); + } + + public AsyncExplainableReadOperation> find(final Bson filter, final Class resultClass, + final FindOptions options) { + return operations.find(filter, resultClass, options); + } + + public AsyncReadOperation> find(final MongoNamespace findNamespace, final Bson filter, + final Class resultClass, final FindOptions options) { + return operations.find(findNamespace, filter, resultClass, options); + } + + public AsyncReadOperation> distinct(final String fieldName, final Bson filter, + final Class resultClass, final long maxTimeMS, + final Collation collation, final BsonValue comment) { + return operations.distinct(fieldName, filter, resultClass, maxTimeMS, collation, comment); + } + + public AsyncExplainableReadOperation> aggregate(final List pipeline, + final Class resultClass, + final long maxTimeMS, final long maxAwaitTimeMS, + final Integer batchSize, + final Collation collation, final Bson hint, + final String hintString, + final BsonValue comment, + final Bson variables, + final Boolean allowDiskUse, + final AggregationLevel aggregationLevel) { + return operations.aggregate(pipeline, resultClass, maxTimeMS, maxAwaitTimeMS, batchSize, collation, hint, hintString, comment, + variables, allowDiskUse, aggregationLevel); + } + + public AsyncReadOperation aggregateToCollection(final List pipeline, final long maxTimeMS, + final Boolean allowDiskUse, final Boolean bypassDocumentValidation, + final Collation collation, final Bson hint, final String hintString, final BsonValue comment, + final Bson variables, final AggregationLevel aggregationLevel) { + return operations.aggregateToCollection(pipeline, maxTimeMS, allowDiskUse, bypassDocumentValidation, collation, hint, hintString, + comment, variables, aggregationLevel); + } + + @SuppressWarnings("deprecation") + public AsyncWriteOperation mapReduceToCollection(final String databaseName, final String collectionName, + final String mapFunction, final String reduceFunction, + final String finalizeFunction, final Bson filter, final int limit, + final long maxTimeMS, final boolean jsMode, final Bson scope, + final Bson sort, final boolean verbose, + final com.mongodb.client.model.MapReduceAction action, + final boolean nonAtomic, final boolean sharded, + final Boolean bypassDocumentValidation, final Collation collation) { + return operations.mapReduceToCollection(databaseName, collectionName, mapFunction, reduceFunction, finalizeFunction, filter, limit, + maxTimeMS, jsMode, scope, sort, verbose, action, nonAtomic, sharded, bypassDocumentValidation, collation); + } + + public AsyncReadOperation> mapReduce(final String mapFunction, final String reduceFunction, + final String finalizeFunction, final Class resultClass, + final Bson filter, final int limit, + final long maxTimeMS, final boolean jsMode, final Bson scope, + final Bson sort, final boolean verbose, + final Collation collation) { + return operations.mapReduce(mapFunction, reduceFunction, finalizeFunction, resultClass, filter, limit, maxTimeMS, jsMode, scope, + sort, verbose, collation); + } + + public AsyncWriteOperation findOneAndDelete(final Bson filter, final FindOneAndDeleteOptions options) { + return operations.findOneAndDelete(filter, options); + } + + public AsyncWriteOperation findOneAndReplace(final Bson filter, final TDocument replacement, + final FindOneAndReplaceOptions options) { + return operations.findOneAndReplace(filter, replacement, options); + } + + public AsyncWriteOperation findOneAndUpdate(final Bson filter, final Bson update, final FindOneAndUpdateOptions options) { + return operations.findOneAndUpdate(filter, update, options); + } + + public AsyncWriteOperation findOneAndUpdate(final Bson filter, final List update, + final FindOneAndUpdateOptions options) { + return operations.findOneAndUpdate(filter, update, options); + } + + public AsyncWriteOperation insertOne(final TDocument document, final InsertOneOptions options) { + return operations.insertOne(document, options); + } + + + public AsyncWriteOperation replaceOne(final Bson filter, final TDocument replacement, final ReplaceOptions options) { + return operations.replaceOne(filter, replacement, options); + } + + public AsyncWriteOperation deleteOne(final Bson filter, final DeleteOptions options) { + return operations.deleteOne(filter, options); + } + + public AsyncWriteOperation deleteMany(final Bson filter, final DeleteOptions options) { + return operations.deleteMany(filter, options); + } + + public AsyncWriteOperation updateOne(final Bson filter, final Bson update, final UpdateOptions updateOptions) { + return operations.updateOne(filter, update, updateOptions); + } + + public AsyncWriteOperation updateOne(final Bson filter, final List update, + final UpdateOptions updateOptions) { + return operations.updateOne(filter, update, updateOptions); + } + + public AsyncWriteOperation updateMany(final Bson filter, final Bson update, final UpdateOptions updateOptions) { + return operations.updateMany(filter, update, updateOptions); + } + + public AsyncWriteOperation updateMany(final Bson filter, final List update, + final UpdateOptions updateOptions) { + return operations.updateMany(filter, update, updateOptions); + } + + public AsyncWriteOperation insertMany(final List documents, + final InsertManyOptions options) { + return operations.insertMany(documents, options); + } + + public AsyncWriteOperation bulkWrite(final List> requests, + final BulkWriteOptions options) { + return operations.bulkWrite(requests, options); + } + + + public AsyncWriteOperation dropCollection(final DropCollectionOptions dropCollectionOptions, + final AutoEncryptionSettings autoEncryptionSettings) { + return operations.dropCollection(dropCollectionOptions, autoEncryptionSettings); + } + + public AsyncWriteOperation renameCollection(final MongoNamespace newCollectionNamespace, + final RenameCollectionOptions options) { + return operations.renameCollection(newCollectionNamespace, options); + } + + public AsyncWriteOperation createIndexes(final List indexes, final CreateIndexOptions options) { + return operations.createIndexes(indexes, options); + } + + public AsyncWriteOperation dropIndex(final String indexName, final DropIndexOptions options) { + return operations.dropIndex(indexName, options); + } + + public AsyncWriteOperation dropIndex(final Bson keys, final DropIndexOptions options) { + return operations.dropIndex(keys, options); + } + + public AsyncReadOperation> listCollections(final String databaseName, final Class resultClass, + final Bson filter, final boolean collectionNamesOnly, + final Integer batchSize, final long maxTimeMS, + final BsonValue comment) { + return operations.listCollections(databaseName, resultClass, filter, collectionNamesOnly, batchSize, maxTimeMS, comment); + } + + public AsyncReadOperation> listDatabases(final Class resultClass, final Bson filter, + final Boolean nameOnly, final long maxTimeMS, + final Boolean authorizedDatabases, final BsonValue comment) { + return operations.listDatabases(resultClass, filter, nameOnly, maxTimeMS, authorizedDatabases, comment); + } + + public AsyncReadOperation> listIndexes(final Class resultClass, final Integer batchSize, + final long maxTimeMS, final BsonValue comment) { + return operations.listIndexes(resultClass, batchSize, maxTimeMS, comment); + } +} diff --git a/driver-core/src/main/com/mongodb/internal/operation/Operations.java b/driver-core/src/main/com/mongodb/internal/operation/Operations.java index 1c22c49fbe9..acf0b9b5b17 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/Operations.java +++ b/driver-core/src/main/com/mongodb/internal/operation/Operations.java @@ -74,7 +74,7 @@ import static java.util.Collections.singletonList; import static java.util.concurrent.TimeUnit.MILLISECONDS; -public final class Operations { +final class Operations { private final MongoNamespace namespace; private final Class documentClass; private final ReadPreference readPreference; @@ -84,7 +84,7 @@ public final class Operations { private final boolean retryWrites; private final boolean retryReads; - public Operations(final MongoNamespace namespace, final Class documentClass, final ReadPreference readPreference, + Operations(final MongoNamespace namespace, final Class documentClass, final ReadPreference readPreference, final CodecRegistry codecRegistry, final ReadConcern readConcern, final WriteConcern writeConcern, final boolean retryWrites, final boolean retryReads) { this.namespace = namespace; @@ -97,39 +97,39 @@ public Operations(final MongoNamespace namespace, final Class documen this.retryReads = retryReads; } - public MongoNamespace getNamespace() { + MongoNamespace getNamespace() { return namespace; } - public Class getDocumentClass() { + Class getDocumentClass() { return documentClass; } - public ReadPreference getReadPreference() { + ReadPreference getReadPreference() { return readPreference; } - public CodecRegistry getCodecRegistry() { + CodecRegistry getCodecRegistry() { return codecRegistry; } - public ReadConcern getReadConcern() { + ReadConcern getReadConcern() { return readConcern; } - public WriteConcern getWriteConcern() { + WriteConcern getWriteConcern() { return writeConcern; } - public boolean isRetryWrites() { + boolean isRetryWrites() { return retryWrites; } - public boolean isRetryReads() { + boolean isRetryReads() { return retryReads; } - public CountDocumentsOperation countDocuments(final Bson filter, final CountOptions options) { + CountDocumentsOperation countDocuments(final Bson filter, final CountOptions options) { CountDocumentsOperation operation = new CountDocumentsOperation(namespace) .retryReads(retryReads) .filter(toBsonDocument(filter)) @@ -146,24 +146,24 @@ public CountDocumentsOperation countDocuments(final Bson filter, final CountOpti return operation; } - public EstimatedDocumentCountOperation estimatedDocumentCount(final EstimatedDocumentCountOptions options) { + EstimatedDocumentCountOperation estimatedDocumentCount(final EstimatedDocumentCountOptions options) { return new EstimatedDocumentCountOperation(namespace) .retryReads(retryReads) .maxTime(options.getMaxTime(MILLISECONDS), MILLISECONDS) .comment(options.getComment()); } - public FindOperation findFirst(final Bson filter, final Class resultClass, + FindOperation findFirst(final Bson filter, final Class resultClass, final FindOptions options) { return createFindOperation(namespace, filter, resultClass, options).batchSize(0).limit(-1); } - public FindOperation find(final Bson filter, final Class resultClass, + FindOperation find(final Bson filter, final Class resultClass, final FindOptions options) { return createFindOperation(namespace, filter, resultClass, options); } - public FindOperation find(final MongoNamespace findNamespace, final Bson filter, + FindOperation find(final MongoNamespace findNamespace, final Bson filter, final Class resultClass, final FindOptions options) { return createFindOperation(findNamespace, filter, resultClass, options); } @@ -201,7 +201,7 @@ private FindOperation createFindOperation(final MongoNamespac return operation; } - public DistinctOperation distinct(final String fieldName, final Bson filter, + DistinctOperation distinct(final String fieldName, final Bson filter, final Class resultClass, final long maxTimeMS, final Collation collation, final BsonValue comment) { return new DistinctOperation<>(namespace, fieldName, codecRegistry.get(resultClass)) @@ -213,7 +213,7 @@ public DistinctOperation distinct(final String fieldName, fin } - public AggregateOperation aggregate(final List pipeline, final Class resultClass, + AggregateOperation aggregate(final List pipeline, final Class resultClass, final long maxTimeMS, final long maxAwaitTimeMS, final Integer batchSize, final Collation collation, final Bson hint, final String hintString, final BsonValue comment, @@ -231,7 +231,7 @@ public AggregateOperation aggregate(final List pipeline, final long maxTimeMS, + AggregateToCollectionOperation aggregateToCollection(final List pipeline, final long maxTimeMS, final Boolean allowDiskUse, final Boolean bypassDocumentValidation, final Collation collation, final Bson hint, final String hintString, final BsonValue comment, final Bson variables, final AggregationLevel aggregationLevel) { @@ -246,7 +246,7 @@ public AggregateToCollectionOperation aggregateToCollection(final List MapReduceWithInlineResultsOperation mapReduce(final String mapFunction, final String reduceFunction, + MapReduceWithInlineResultsOperation mapReduce(final String mapFunction, final String reduceFunction, final String finalizeFunction, final Class resultClass, final Bson filter, final int limit, final long maxTimeMS, final boolean jsMode, final Bson scope, @@ -301,7 +301,7 @@ public MapReduceWithInlineResultsOperation mapReduce(final St return operation; } - public FindAndDeleteOperation findOneAndDelete(final Bson filter, final FindOneAndDeleteOptions options) { + FindAndDeleteOperation findOneAndDelete(final Bson filter, final FindOneAndDeleteOptions options) { return new FindAndDeleteOperation<>(namespace, writeConcern, retryWrites, getCodec()) .filter(toBsonDocument(filter)) .projection(toBsonDocument(options.getProjection())) @@ -314,7 +314,7 @@ public FindAndDeleteOperation findOneAndDelete(final Bson filter, fin .let(toBsonDocument(options.getLet())); } - public FindAndReplaceOperation findOneAndReplace(final Bson filter, final TDocument replacement, + FindAndReplaceOperation findOneAndReplace(final Bson filter, final TDocument replacement, final FindOneAndReplaceOptions options) { return new FindAndReplaceOperation<>(namespace, writeConcern, retryWrites, getCodec(), documentToBsonDocument(replacement)) @@ -332,7 +332,7 @@ public FindAndReplaceOperation findOneAndReplace(final Bson filter, f .let(toBsonDocument(options.getLet())); } - public FindAndUpdateOperation findOneAndUpdate(final Bson filter, final Bson update, final FindOneAndUpdateOptions options) { + FindAndUpdateOperation findOneAndUpdate(final Bson filter, final Bson update, final FindOneAndUpdateOptions options) { return new FindAndUpdateOperation<>(namespace, writeConcern, retryWrites, getCodec(), toBsonDocument(update)) .filter(toBsonDocument(filter)) @@ -350,7 +350,7 @@ public FindAndUpdateOperation findOneAndUpdate(final Bson filter, fin .let(toBsonDocument(options.getLet())); } - public FindAndUpdateOperation findOneAndUpdate(final Bson filter, final List update, + FindAndUpdateOperation findOneAndUpdate(final Bson filter, final List update, final FindOneAndUpdateOptions options) { return new FindAndUpdateOperation<>(namespace, writeConcern, retryWrites, getCodec(), toBsonDocumentList(update)) @@ -370,53 +370,53 @@ public FindAndUpdateOperation findOneAndUpdate(final Bson filter, fin } - public MixedBulkWriteOperation insertOne(final TDocument document, final InsertOneOptions options) { + MixedBulkWriteOperation insertOne(final TDocument document, final InsertOneOptions options) { return bulkWrite(singletonList(new InsertOneModel<>(document)), new BulkWriteOptions().bypassDocumentValidation(options.getBypassDocumentValidation()).comment(options.getComment())); } - public MixedBulkWriteOperation replaceOne(final Bson filter, final TDocument replacement, final ReplaceOptions options) { + MixedBulkWriteOperation replaceOne(final Bson filter, final TDocument replacement, final ReplaceOptions options) { return bulkWrite(singletonList(new ReplaceOneModel<>(filter, replacement, options)), new BulkWriteOptions().bypassDocumentValidation(options.getBypassDocumentValidation()) .comment(options.getComment()).let(options.getLet())); } - public MixedBulkWriteOperation deleteOne(final Bson filter, final DeleteOptions options) { + MixedBulkWriteOperation deleteOne(final Bson filter, final DeleteOptions options) { return bulkWrite(singletonList(new DeleteOneModel<>(filter, options)), new BulkWriteOptions().comment(options.getComment()).let(options.getLet())); } - public MixedBulkWriteOperation deleteMany(final Bson filter, final DeleteOptions options) { + MixedBulkWriteOperation deleteMany(final Bson filter, final DeleteOptions options) { return bulkWrite(singletonList(new DeleteManyModel<>(filter, options)), new BulkWriteOptions().comment(options.getComment()).let(options.getLet())); } - public MixedBulkWriteOperation updateOne(final Bson filter, final Bson update, final UpdateOptions options) { + MixedBulkWriteOperation updateOne(final Bson filter, final Bson update, final UpdateOptions options) { return bulkWrite(singletonList(new UpdateOneModel<>(filter, update, options)), new BulkWriteOptions().bypassDocumentValidation(options.getBypassDocumentValidation()) .comment(options.getComment()).let(options.getLet())); } - public MixedBulkWriteOperation updateOne(final Bson filter, final List update, final UpdateOptions options) { + MixedBulkWriteOperation updateOne(final Bson filter, final List update, final UpdateOptions options) { return bulkWrite(singletonList(new UpdateOneModel<>(filter, update, options)), new BulkWriteOptions().bypassDocumentValidation(options.getBypassDocumentValidation()) .comment(options.getComment()).let(options.getLet())); } - public MixedBulkWriteOperation updateMany(final Bson filter, final Bson update, final UpdateOptions options) { + MixedBulkWriteOperation updateMany(final Bson filter, final Bson update, final UpdateOptions options) { return bulkWrite(singletonList(new UpdateManyModel<>(filter, update, options)), new BulkWriteOptions().bypassDocumentValidation(options.getBypassDocumentValidation()) .comment(options.getComment()).let(options.getLet())); } - public MixedBulkWriteOperation updateMany(final Bson filter, final List update, final UpdateOptions options) { + MixedBulkWriteOperation updateMany(final Bson filter, final List update, final UpdateOptions options) { return bulkWrite(singletonList(new UpdateManyModel<>(filter, update, options)), new BulkWriteOptions().bypassDocumentValidation(options.getBypassDocumentValidation()) .comment(options.getComment()).let(options.getLet())); } - public MixedBulkWriteOperation insertMany(final List documents, + MixedBulkWriteOperation insertMany(final List documents, final InsertManyOptions options) { notNull("documents", documents); List requests = new ArrayList<>(documents.size()); @@ -435,7 +435,7 @@ public MixedBulkWriteOperation insertMany(final List docume } @SuppressWarnings("unchecked") - public MixedBulkWriteOperation bulkWrite(final List> requests, + MixedBulkWriteOperation bulkWrite(final List> requests, final BulkWriteOptions options) { notNull("requests", requests); List writeRequests = new ArrayList<>(requests.size()); @@ -505,7 +505,7 @@ public MixedBulkWriteOperation bulkWrite(final List indexes, final CreateIndexOptions createIndexOptions) { + CreateIndexesOperation createIndexes(final List indexes, final CreateIndexOptions createIndexOptions) { notNull("indexes", indexes); notNull("createIndexOptions", createIndexOptions); List indexRequests = new ArrayList<>(indexes.size()); @@ -566,17 +566,17 @@ public CreateIndexesOperation createIndexes(final List indexes, fina .commitQuorum(createIndexOptions.getCommitQuorum()); } - public DropIndexOperation dropIndex(final String indexName, final DropIndexOptions dropIndexOptions) { + DropIndexOperation dropIndex(final String indexName, final DropIndexOptions dropIndexOptions) { return new DropIndexOperation(namespace, indexName, writeConcern) .maxTime(dropIndexOptions.getMaxTime(MILLISECONDS), MILLISECONDS); } - public DropIndexOperation dropIndex(final Bson keys, final DropIndexOptions dropIndexOptions) { + DropIndexOperation dropIndex(final Bson keys, final DropIndexOptions dropIndexOptions) { return new DropIndexOperation(namespace, keys.toBsonDocument(BsonDocument.class, codecRegistry), writeConcern) .maxTime(dropIndexOptions.getMaxTime(MILLISECONDS), MILLISECONDS); } - public ListCollectionsOperation listCollections(final String databaseName, final Class resultClass, + ListCollectionsOperation listCollections(final String databaseName, final Class resultClass, final Bson filter, final boolean collectionNamesOnly, final Integer batchSize, final long maxTimeMS, final BsonValue comment) { return new ListCollectionsOperation<>(databaseName, codecRegistry.get(resultClass)) @@ -588,7 +588,7 @@ public ListCollectionsOperation listCollections(final String .comment(comment); } - public ListDatabasesOperation listDatabases(final Class resultClass, final Bson filter, + ListDatabasesOperation listDatabases(final Class resultClass, final Bson filter, final Boolean nameOnly, final long maxTimeMS, final Boolean authorizedDatabasesOnly, final BsonValue comment) { return new ListDatabasesOperation<>(codecRegistry.get(resultClass)).maxTime(maxTimeMS, MILLISECONDS) @@ -599,7 +599,7 @@ public ListDatabasesOperation listDatabases(final Class ListIndexesOperation listIndexes(final Class resultClass, final Integer batchSize, + ListIndexesOperation listIndexes(final Class resultClass, final Integer batchSize, final long maxTimeMS, final BsonValue comment) { return new ListIndexesOperation<>(namespace, codecRegistry.get(resultClass)) .retryReads(retryReads) diff --git a/driver-core/src/main/com/mongodb/internal/operation/SyncOperations.java b/driver-core/src/main/com/mongodb/internal/operation/SyncOperations.java index 9a8fe683b81..10e7187bc50 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/SyncOperations.java +++ b/driver-core/src/main/com/mongodb/internal/operation/SyncOperations.java @@ -49,7 +49,7 @@ import java.util.List; /** - * This class is NOT part of the public API. It may change at any time without notification. + *

This class is not part of the public API and may be removed or changed at any time

*/ public final class SyncOperations { private final Operations operations; diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/AggregatePublisherImpl.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/AggregatePublisherImpl.java index 1be9f7236f4..f9160b030f0 100644 --- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/AggregatePublisherImpl.java +++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/AggregatePublisherImpl.java @@ -22,7 +22,7 @@ import com.mongodb.internal.async.AsyncBatchCursor; import com.mongodb.internal.client.model.AggregationLevel; import com.mongodb.internal.client.model.FindOptions; -import com.mongodb.internal.operation.AggregateOperation; +import com.mongodb.internal.operation.AsyncExplainableReadOperation; import com.mongodb.internal.operation.AsyncReadOperation; import com.mongodb.lang.Nullable; import com.mongodb.reactivestreams.client.AggregatePublisher; @@ -185,7 +185,7 @@ AsyncReadOperation> asAsyncReadOperation(final int initialBa } } - private AggregateOperation asAggregateOperation(final int initialBatchSize) { + private AsyncExplainableReadOperation> asAggregateOperation(final int initialBatchSize) { return getOperations() .aggregate(pipeline, getDocumentClass(), maxTimeMS, maxAwaitTimeMS, initialBatchSize, collation, hint, hintString, comment, variables, allowDiskUse, aggregationLevel); diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/BatchCursorPublisher.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/BatchCursorPublisher.java index 94949e9ddcb..b784dab03d5 100644 --- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/BatchCursorPublisher.java +++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/BatchCursorPublisher.java @@ -19,8 +19,8 @@ import com.mongodb.MongoNamespace; import com.mongodb.ReadPreference; import com.mongodb.internal.async.AsyncBatchCursor; +import com.mongodb.internal.operation.AsyncOperations; import com.mongodb.internal.operation.AsyncReadOperation; -import com.mongodb.internal.operation.Operations; import com.mongodb.lang.Nullable; import com.mongodb.reactivestreams.client.ClientSession; import org.bson.codecs.configuration.CodecRegistry; @@ -63,7 +63,7 @@ MongoOperationPublisher getMongoOperationPublisher() { return mongoOperationPublisher; } - Operations getOperations() { + AsyncOperations getOperations() { return mongoOperationPublisher.getOperations(); } diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/FindPublisherImpl.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/FindPublisherImpl.java index 28d1787ac11..f1b0f116f63 100644 --- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/FindPublisherImpl.java +++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/FindPublisherImpl.java @@ -23,7 +23,6 @@ import com.mongodb.internal.client.model.FindOptions; import com.mongodb.internal.operation.AsyncExplainableReadOperation; import com.mongodb.internal.operation.AsyncReadOperation; -import com.mongodb.internal.operation.FindOperation; import com.mongodb.lang.Nullable; import com.mongodb.reactivestreams.client.ClientSession; import com.mongodb.reactivestreams.client.FindPublisher; @@ -220,8 +219,7 @@ private Publisher publishExplain(final Class explainResultClass, @Null @Override AsyncExplainableReadOperation> asAsyncReadOperation(final int initialBatchSize) { - FindOperation operation = getOperations().find(filter, getDocumentClass(), findOptions.withBatchSize(initialBatchSize)); - return operation; + return getOperations().find(filter, getDocumentClass(), findOptions.withBatchSize(initialBatchSize)); } @Override diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MongoOperationPublisher.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MongoOperationPublisher.java index e72ec83b480..d24b04de1fb 100644 --- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MongoOperationPublisher.java +++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MongoOperationPublisher.java @@ -58,6 +58,7 @@ import com.mongodb.client.result.UpdateResult; import com.mongodb.internal.async.SingleResultCallback; import com.mongodb.internal.bulk.WriteRequest; +import com.mongodb.internal.operation.AsyncOperations; import com.mongodb.internal.operation.AsyncReadOperation; import com.mongodb.internal.operation.AsyncWriteOperation; import com.mongodb.internal.operation.CommandReadOperation; @@ -65,7 +66,6 @@ import com.mongodb.internal.operation.CreateViewOperation; import com.mongodb.internal.operation.DropDatabaseOperation; import com.mongodb.internal.operation.IndexHelper; -import com.mongodb.internal.operation.Operations; import com.mongodb.lang.Nullable; import com.mongodb.reactivestreams.client.ClientSession; import org.bson.BsonDocument; @@ -95,7 +95,7 @@ */ public final class MongoOperationPublisher { - private final Operations operations; + private final AsyncOperations operations; private final UuidRepresentation uuidRepresentation; private final AutoEncryptionSettings autoEncryptionSettings; private final OperationExecutor executor; @@ -116,7 +116,7 @@ public final class MongoOperationPublisher { final boolean retryWrites, final boolean retryReads, final UuidRepresentation uuidRepresentation, @Nullable final AutoEncryptionSettings autoEncryptionSettings, final OperationExecutor executor) { - this.operations = new Operations<>(namespace, notNull("documentClass", documentClass), + this.operations = new AsyncOperations<>(namespace, notNull("documentClass", documentClass), notNull("readPreference", readPreference), notNull("codecRegistry", codecRegistry), notNull("readConcern", readConcern), notNull("writeConcern", writeConcern), retryWrites, retryReads); @@ -157,7 +157,7 @@ Class getDocumentClass() { return operations.getDocumentClass(); } - public Operations getOperations() { + public AsyncOperations getOperations() { return operations; } diff --git a/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/TestHelper.java b/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/TestHelper.java index 36e5a80c9b0..b5f77d39941 100644 --- a/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/TestHelper.java +++ b/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/TestHelper.java @@ -28,7 +28,6 @@ import com.mongodb.internal.client.model.FindOptions; import com.mongodb.internal.operation.AsyncReadOperation; import com.mongodb.internal.operation.AsyncWriteOperation; -import com.mongodb.internal.operation.Operations; import com.mongodb.lang.NonNull; import com.mongodb.lang.Nullable; import org.bson.Document; @@ -171,8 +170,6 @@ private static Object checkValueTypes(final Object instance) { Object actual = instance instanceof Optional ? ((Optional) instance).orElse(instance) : instance; if (actual instanceof AsyncReadOperation || actual instanceof AsyncWriteOperation) { return getClassPrivateFieldValues(actual); - } else if (actual instanceof Operations) { - return getClassPrivateFieldValues(actual); } else if (actual.getClass().getSimpleName().equals("ChangeStreamDocumentCodec")) { return getClassGetterValues(actual); } else if (actual instanceof FindOptions) {