diff --git a/driver-core/src/main/com/mongodb/internal/async/AsyncBatchCursor.java b/driver-core/src/main/com/mongodb/internal/async/AsyncBatchCursor.java
index 89260ac7b52..bd8d6c64a3f 100644
--- a/driver-core/src/main/com/mongodb/internal/async/AsyncBatchCursor.java
+++ b/driver-core/src/main/com/mongodb/internal/async/AsyncBatchCursor.java
@@ -19,8 +19,11 @@
 import com.mongodb.internal.operation.BatchCursor;
 
 import java.io.Closeable;
+import java.util.ArrayList;
 import java.util.List;
 
+import static com.mongodb.internal.async.AsyncRunnable.beginAsync;
+
 /**
  * MongoDB returns query results as batches, and this interface provides an asynchronous iterator over those batches.  The first call to
  * the {@code next} method will return the first batch, and subsequent calls will trigger an asynchronous request to get the next batch
@@ -72,4 +75,22 @@ public interface AsyncBatchCursor<T> extends Closeable {
      */
     @Override
     void close();
+
+    default void exhaust(final SingleResultCallback<List<List<T>>> finalCallback) {
+        List<List<T>> results = new ArrayList<>();
+
+        beginAsync().thenRunDoWhileLoop(iterationCallback -> {
+            beginAsync().<List<T>>thenSupply(c -> {
+                next(c);
+            }).thenConsume((batch, c) -> {
+                if (!batch.isEmpty()) {
+                    results.add(batch);
+                }
+                c.complete(c);
+            }).finish(iterationCallback);
+        }, () -> !this.isClosed()
+        ).<List<List<T>>>thenSupply(c -> {
+            c.complete(results);
+        }).finish(finalCallback);
+     }
 }
diff --git a/driver-core/src/main/com/mongodb/internal/async/AsyncRunnable.java b/driver-core/src/main/com/mongodb/internal/async/AsyncRunnable.java
index d4ead3c5b96..e404e2b8152 100644
--- a/driver-core/src/main/com/mongodb/internal/async/AsyncRunnable.java
+++ b/driver-core/src/main/com/mongodb/internal/async/AsyncRunnable.java
@@ -17,9 +17,12 @@
 package com.mongodb.internal.async;
 
 import com.mongodb.internal.TimeoutContext;
+import com.mongodb.internal.async.function.AsyncCallbackLoop;
+import com.mongodb.internal.async.function.LoopState;
 import com.mongodb.internal.async.function.RetryState;
 import com.mongodb.internal.async.function.RetryingAsyncCallbackSupplier;
 
+import java.util.function.BooleanSupplier;
 import java.util.function.Predicate;
 import java.util.function.Supplier;
 
@@ -120,49 +123,6 @@ static AsyncRunnable beginAsync() {
         return (c) -> c.complete(c);
     }
 
-    /**
-     * Must be invoked at end of async chain
-     * @param runnable the sync code to invoke (under non-exceptional flow)
-     *                 prior to the callback
-     * @param callback the callback provided by the method the chain is used in
-     */
-    default void thenRunAndFinish(final Runnable runnable, final SingleResultCallback<Void> callback) {
-        this.finish((r, e) -> {
-            if (e != null) {
-                callback.completeExceptionally(e);
-                return;
-            }
-            try {
-                runnable.run();
-            } catch (Throwable t) {
-                callback.completeExceptionally(t);
-                return;
-            }
-            callback.complete(callback);
-        });
-    }
-
-    /**
-     * See {@link #thenRunAndFinish(Runnable, SingleResultCallback)}, but the runnable
-     * will always be executed, including on the exceptional path.
-     * @param runnable the runnable
-     * @param callback the callback
-     */
-    default void thenAlwaysRunAndFinish(final Runnable runnable, final SingleResultCallback<Void> callback) {
-        this.finish((r, e) -> {
-            try {
-                runnable.run();
-            } catch (Throwable t) {
-                if (e != null) {
-                    t.addSuppressed(e);
-                }
-                callback.completeExceptionally(t);
-                return;
-            }
-            callback.onResult(r, e);
-        });
-    }
-
     /**
      * @param runnable The async runnable to run after this runnable
      * @return the composition of this runnable and the runnable, a runnable
@@ -282,4 +242,33 @@ default AsyncRunnable thenRunRetryingWhile(
             ).get(callback);
         });
     }
+
+    /**
+     * This method is equivalent to a do-while loop, where the loop body is executed first and
+     * then the condition is checked to determine whether the loop should continue.
+     *
+     * @param loopBodyRunnable the asynchronous task to be executed in each iteration of the loop
+     * @param whileCheck a condition to check after each iteration; the loop continues as long as this condition returns true
+     * @return the composition of this and the looping branch
+     * @see AsyncCallbackLoop
+     */
+    default AsyncRunnable thenRunDoWhileLoop(final AsyncRunnable loopBodyRunnable, final BooleanSupplier whileCheck) {
+        return thenRun(finalCallback -> {
+            LoopState loopState = new LoopState();
+            new AsyncCallbackLoop(loopState, iterationCallback -> {
+
+                loopBodyRunnable.finish((result, t) -> {
+                    if (t != null) {
+                        iterationCallback.completeExceptionally(t);
+                        return;
+                    }
+                    if (loopState.breakAndCompleteIf(() -> !whileCheck.getAsBoolean(), iterationCallback)) {
+                        return;
+                    }
+                    iterationCallback.complete(iterationCallback);
+                });
+
+            }).run(finalCallback);
+        });
+    }
 }
diff --git a/driver-core/src/main/com/mongodb/internal/async/AsyncSupplier.java b/driver-core/src/main/com/mongodb/internal/async/AsyncSupplier.java
index 77c289c8723..6dd89e4d9b0 100644
--- a/driver-core/src/main/com/mongodb/internal/async/AsyncSupplier.java
+++ b/driver-core/src/main/com/mongodb/internal/async/AsyncSupplier.java
@@ -81,6 +81,49 @@ default void finish(final SingleResultCallback<T> callback) {
         }
     }
 
+    /**
+     * Must be invoked at end of async chain
+     * @param runnable the sync code to invoke (under non-exceptional flow)
+     *                 prior to the callback
+     * @param callback the callback provided by the method the chain is used in
+     */
+    default void thenRunAndFinish(final Runnable runnable, final SingleResultCallback<T> callback) {
+        this.finish((r, e) -> {
+            if (e != null) {
+                callback.completeExceptionally(e);
+                return;
+            }
+            try {
+                runnable.run();
+            } catch (Throwable t) {
+                callback.completeExceptionally(t);
+                return;
+            }
+            callback.onResult(r, null);
+        });
+    }
+
+    /**
+     * See {@link #thenRunAndFinish(Runnable, SingleResultCallback)}, but the runnable
+     * will always be executed, including on the exceptional path.
+     * @param runnable the runnable
+     * @param callback the callback
+     */
+    default void thenAlwaysRunAndFinish(final Runnable runnable, final SingleResultCallback<T> callback) {
+        this.finish((r, e) -> {
+            try {
+                runnable.run();
+            } catch (Throwable t) {
+                if (e != null) {
+                    t.addSuppressed(e);
+                }
+                callback.completeExceptionally(t);
+                return;
+            }
+            callback.onResult(r, e);
+        });
+    }
+
     /**
      * @param function The async function to run after this supplier
      * @return the composition of this supplier and the function, a supplier
diff --git a/driver-core/src/main/com/mongodb/internal/async/MutableValue.java b/driver-core/src/main/com/mongodb/internal/async/MutableValue.java
new file mode 100644
index 00000000000..0ee793788ea
--- /dev/null
+++ b/driver-core/src/main/com/mongodb/internal/async/MutableValue.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2008-present MongoDB, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.mongodb.internal.async;
+
+import com.mongodb.annotations.NotThreadSafe;
+import com.mongodb.lang.Nullable;
+
+import static com.mongodb.assertions.Assertions.assertNotNull;
+
+@NotThreadSafe
+public final class MutableValue<T> {
+    private T value;
+
+    public MutableValue(@Nullable final T value) {
+        this.value = value;
+    }
+
+    public MutableValue() {
+        this(null);
+    }
+
+    public T get() {
+        return assertNotNull(value);
+    }
+
+    @Nullable
+    public T getNullable() {
+        return value;
+    }
+
+    public void set(@Nullable final T value) {
+        this.value = value;
+    }
+}
diff --git a/driver-core/src/main/com/mongodb/internal/async/function/AsyncCallbackSupplier.java b/driver-core/src/main/com/mongodb/internal/async/function/AsyncCallbackSupplier.java
index 40bfd34de3d..1d98fb91a83 100644
--- a/driver-core/src/main/com/mongodb/internal/async/function/AsyncCallbackSupplier.java
+++ b/driver-core/src/main/com/mongodb/internal/async/function/AsyncCallbackSupplier.java
@@ -15,7 +15,7 @@
  */
 package com.mongodb.internal.async.function;
 
-import com.mongodb.annotations.NotThreadSafe;
+import com.mongodb.internal.async.MutableValue;
 import com.mongodb.internal.async.SingleResultCallback;
 
 import java.util.function.Supplier;
@@ -68,16 +68,12 @@ public interface AsyncCallbackSupplier<R> {
      * This is a price we have to pay to provide a guarantee similar to that of the {@code finally} block.
      */
     default AsyncCallbackSupplier<R> whenComplete(final Runnable after) {
-        @NotThreadSafe
-        final class MutableBoolean {
-            private boolean value;
-        }
-        MutableBoolean afterExecuted = new MutableBoolean();
+        MutableValue<Boolean> afterExecuted = new MutableValue<>(false);
         Runnable trackableAfter = () -> {
             try {
                 after.run();
             } finally {
-                afterExecuted.value = true;
+                afterExecuted.set(true);
             }
         };
         return callback -> {
@@ -103,7 +99,7 @@ final class MutableBoolean {
                 primaryUnexpectedException = unexpectedException;
                 throw unexpectedException;
             } finally {
-                if (primaryUnexpectedException != null && !afterExecuted.value) {
+                if (primaryUnexpectedException != null && !afterExecuted.get()) {
                     try {
                         trackableAfter.run();
                     } catch (Throwable afterException) {
diff --git a/driver-core/src/main/com/mongodb/internal/operation/AsyncOperationHelper.java b/driver-core/src/main/com/mongodb/internal/operation/AsyncOperationHelper.java
index 072ae8e0d9f..f158b3944ae 100644
--- a/driver-core/src/main/com/mongodb/internal/operation/AsyncOperationHelper.java
+++ b/driver-core/src/main/com/mongodb/internal/operation/AsyncOperationHelper.java
@@ -344,7 +344,7 @@ static <T> CommandReadTransformerAsync<BsonDocument, AsyncBatchCursor<T>> asyncS
     }
 
     static <T> AsyncBatchCursor<T> cursorDocumentToAsyncBatchCursor(final TimeoutMode timeoutMode, final BsonDocument cursorDocument,
-            final int batchSize, final Decoder<T> decoder, final BsonValue comment, final AsyncConnectionSource source,
+            final int batchSize, final Decoder<T> decoder, @Nullable final BsonValue comment, final AsyncConnectionSource source,
             final AsyncConnection connection) {
         return new AsyncCommandBatchCursor<>(timeoutMode, cursorDocument, batchSize, 0, decoder, comment, source, connection);
     }
diff --git a/driver-core/src/main/com/mongodb/internal/operation/AsyncOperations.java b/driver-core/src/main/com/mongodb/internal/operation/AsyncOperations.java
index 77434bd9781..63a3a64ff98 100644
--- a/driver-core/src/main/com/mongodb/internal/operation/AsyncOperations.java
+++ b/driver-core/src/main/com/mongodb/internal/operation/AsyncOperations.java
@@ -44,6 +44,9 @@
 import com.mongodb.client.model.SearchIndexModel;
 import com.mongodb.client.model.UpdateOptions;
 import com.mongodb.client.model.WriteModel;
+import com.mongodb.client.model.bulk.ClientBulkWriteOptions;
+import com.mongodb.client.model.bulk.ClientBulkWriteResult;
+import com.mongodb.client.model.bulk.ClientNamespacedWriteModel;
 import com.mongodb.client.model.changestream.FullDocument;
 import com.mongodb.client.model.changestream.FullDocumentBeforeChange;
 import com.mongodb.internal.TimeoutSettings;
@@ -293,6 +296,12 @@ public AsyncWriteOperation<BulkWriteResult> bulkWrite(final List<? extends Write
         return operations.bulkWrite(requests, options);
     }
 
+    public AsyncWriteOperation<ClientBulkWriteResult> clientBulkWriteOperation(
+            final List<? extends ClientNamespacedWriteModel> clientWriteModels,
+            @Nullable final ClientBulkWriteOptions options) {
+        return operations.clientBulkWriteOperation(clientWriteModels, options);
+    }
+
     public <TResult> AsyncReadOperation<TResult> commandRead(final Bson command, final Class<TResult> resultClass) {
         return operations.commandRead(command, resultClass);
     }
diff --git a/driver-core/src/main/com/mongodb/internal/operation/BatchCursor.java b/driver-core/src/main/com/mongodb/internal/operation/BatchCursor.java
index 5f86eb1f8fb..1463798ef64 100644
--- a/driver-core/src/main/com/mongodb/internal/operation/BatchCursor.java
+++ b/driver-core/src/main/com/mongodb/internal/operation/BatchCursor.java
@@ -25,6 +25,12 @@
 import java.util.Iterator;
 import java.util.List;
 
+import static java.util.Spliterator.IMMUTABLE;
+import static java.util.Spliterator.ORDERED;
+import static java.util.Spliterators.spliteratorUnknownSize;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.StreamSupport.stream;
+
 /**
  * MongoDB returns query results as batches, and this interface provideds an iterator over those batches.  The first call to
  * the {@code next} method will return the first batch, and subsequent calls will trigger a  request to get the next batch
@@ -98,4 +104,9 @@ public interface BatchCursor<T> extends Iterator<List<T>>, Closeable {
     ServerCursor getServerCursor();
 
     ServerAddress getServerAddress();
+
+    default List<List<T>> exhaust() {
+        return stream(spliteratorUnknownSize(this, ORDERED | IMMUTABLE), false)
+                .collect(toList());
+    }
 }
diff --git a/driver-core/src/main/com/mongodb/internal/operation/ClientBulkWriteOperation.java b/driver-core/src/main/com/mongodb/internal/operation/ClientBulkWriteOperation.java
index b48031c06c6..6592fcbaed3 100644
--- a/driver-core/src/main/com/mongodb/internal/operation/ClientBulkWriteOperation.java
+++ b/driver-core/src/main/com/mongodb/internal/operation/ClientBulkWriteOperation.java
@@ -41,7 +41,14 @@
 import com.mongodb.connection.ConnectionDescription;
 import com.mongodb.internal.TimeoutContext;
 import com.mongodb.internal.VisibleForTesting;
+import com.mongodb.internal.async.AsyncBatchCursor;
+import com.mongodb.internal.async.AsyncSupplier;
+import com.mongodb.internal.async.MutableValue;
+import com.mongodb.internal.async.SingleResultCallback;
+import com.mongodb.internal.async.function.AsyncCallbackSupplier;
 import com.mongodb.internal.async.function.RetryState;
+import com.mongodb.internal.binding.AsyncConnectionSource;
+import com.mongodb.internal.binding.AsyncWriteBinding;
 import com.mongodb.internal.binding.ConnectionSource;
 import com.mongodb.internal.binding.WriteBinding;
 import com.mongodb.internal.client.model.bulk.AbstractClientDeleteModel;
@@ -70,6 +77,7 @@
 import com.mongodb.internal.client.model.bulk.ConcreteClientUpdateOptions;
 import com.mongodb.internal.client.model.bulk.ConcreteClientUpdateResult;
 import com.mongodb.internal.client.model.bulk.UnacknowledgedClientBulkWriteResult;
+import com.mongodb.internal.connection.AsyncConnection;
 import com.mongodb.internal.connection.Connection;
 import com.mongodb.internal.connection.DualMessageSequences;
 import com.mongodb.internal.connection.IdHoldingBsonWriter;
@@ -113,8 +121,12 @@
 import static com.mongodb.assertions.Assertions.fail;
 import static com.mongodb.internal.VisibleForTesting.AccessModifier.PACKAGE;
 import static com.mongodb.internal.VisibleForTesting.AccessModifier.PRIVATE;
+import static com.mongodb.internal.async.AsyncRunnable.beginAsync;
 import static com.mongodb.internal.connection.DualMessageSequences.WritersProviderAndLimitsChecker.WriteResult.FAIL_LIMIT_EXCEEDED;
 import static com.mongodb.internal.connection.DualMessageSequences.WritersProviderAndLimitsChecker.WriteResult.OK_LIMIT_NOT_REACHED;
+import static com.mongodb.internal.operation.AsyncOperationHelper.cursorDocumentToAsyncBatchCursor;
+import static com.mongodb.internal.operation.AsyncOperationHelper.decorateWriteWithRetriesAsync;
+import static com.mongodb.internal.operation.AsyncOperationHelper.withAsyncSourceAndConnection;
 import static com.mongodb.internal.operation.BulkWriteBatch.logWriteModelDoesNotSupportRetries;
 import static com.mongodb.internal.operation.CommandOperationHelper.commandWriteConcern;
 import static com.mongodb.internal.operation.CommandOperationHelper.initialRetryState;
@@ -128,22 +140,20 @@
 import static java.util.Collections.emptyMap;
 import static java.util.Collections.singletonList;
 import static java.util.Optional.ofNullable;
-import static java.util.Spliterator.IMMUTABLE;
-import static java.util.Spliterator.ORDERED;
-import static java.util.Spliterators.spliteratorUnknownSize;
 import static java.util.stream.Collectors.toList;
 import static java.util.stream.Collectors.toSet;
-import static java.util.stream.StreamSupport.stream;
 
 /**
  * This class is not part of the public API and may be removed or changed at any time.
  */
-public final class ClientBulkWriteOperation implements WriteOperation<ClientBulkWriteResult> {
+public final class ClientBulkWriteOperation implements WriteOperation<ClientBulkWriteResult>, AsyncWriteOperation<ClientBulkWriteResult> {
     private static final ConcreteClientBulkWriteOptions EMPTY_OPTIONS = new ConcreteClientBulkWriteOptions();
     private static final String BULK_WRITE_COMMAND_NAME = "bulkWrite";
     private static final EncoderContext DEFAULT_ENCODER_CONTEXT = EncoderContext.builder().build();
     private static final EncoderContext COLLECTIBLE_DOCUMENT_ENCODER_CONTEXT = EncoderContext.builder()
             .isEncodingCollectibleDocument(true).build();
+    private static final int INITIAL_BATCH_MODEL_START_INDEX = 0;
+    private static final int SERVER_DEFAULT_CURSOR_BATCH_SIZE = 0;
 
     private final List<? extends ClientNamespacedWriteModel> models;
     private final ConcreteClientBulkWriteOptions options;
@@ -172,6 +182,7 @@ public ClientBulkWriteResult execute(final WriteBinding binding) throws ClientBu
         WriteConcern effectiveWriteConcern = validateAndGetEffectiveWriteConcern(binding.getOperationContext().getSessionContext());
         ResultAccumulator resultAccumulator = new ResultAccumulator();
         MongoException transformedTopLevelError = null;
+
         try {
             executeAllBatches(effectiveWriteConcern, binding, resultAccumulator);
         } catch (MongoException topLevelError) {
@@ -180,6 +191,24 @@ public ClientBulkWriteResult execute(final WriteBinding binding) throws ClientBu
         return resultAccumulator.build(transformedTopLevelError, effectiveWriteConcern);
     }
 
+
+    @Override
+    public void executeAsync(final AsyncWriteBinding binding,
+                             final SingleResultCallback<ClientBulkWriteResult> finalCallback) {
+        WriteConcern effectiveWriteConcern = validateAndGetEffectiveWriteConcern(binding.getOperationContext().getSessionContext());
+        ResultAccumulator resultAccumulator = new ResultAccumulator();
+        MutableValue<MongoException> transformedTopLevelError = new MutableValue<>();
+
+        beginAsync().<Void>thenSupply(c -> {
+            executeAllBatchesAsync(effectiveWriteConcern, binding, resultAccumulator, c);
+        }).onErrorIf(topLevelError -> topLevelError instanceof MongoException, (topLevelError, c) -> {
+            transformedTopLevelError.set(transformWriteException((MongoException) topLevelError));
+            c.complete(c);
+        }).<ClientBulkWriteResult>thenApply((ignored, c) -> {
+            c.complete(resultAccumulator.build(transformedTopLevelError.getNullable(), effectiveWriteConcern));
+        }).finish(finalCallback);
+    }
+
     /**
      * To execute a batch means:
      * <ul>
@@ -187,18 +216,40 @@ public ClientBulkWriteResult execute(final WriteBinding binding) throws ClientBu
      *     <li>consume the cursor, which may involve executing `getMore` commands.</li>
      * </ul>
      *
-     * @throws MongoException When a {@linkplain ClientBulkWriteException#getError() top-level error} happens.
+     * @throws MongoException When a {@linkplain ClientBulkWriteException#getCause() top-level error} happens.
      */
     private void executeAllBatches(
             final WriteConcern effectiveWriteConcern,
             final WriteBinding binding,
             final ResultAccumulator resultAccumulator) throws MongoException {
-        Integer nextBatchStartModelIndex = 0;
+        Integer nextBatchStartModelIndex = INITIAL_BATCH_MODEL_START_INDEX;
+
         do {
             nextBatchStartModelIndex = executeBatch(nextBatchStartModelIndex, effectiveWriteConcern, binding, resultAccumulator);
         } while (nextBatchStartModelIndex != null);
     }
 
+    /**
+     * @see #executeAllBatches(WriteConcern, WriteBinding, ResultAccumulator)
+     */
+    private void executeAllBatchesAsync(
+            final WriteConcern effectiveWriteConcern,
+            final AsyncWriteBinding binding,
+            final ResultAccumulator resultAccumulator,
+            final SingleResultCallback<Void> finalCallback) {
+        MutableValue<Integer> nextBatchStartModelIndex = new MutableValue<>(INITIAL_BATCH_MODEL_START_INDEX);
+
+        beginAsync().thenRunDoWhileLoop(iterationCallback -> {
+            beginAsync().<Integer>thenSupply(c -> {
+                executeBatchAsync(nextBatchStartModelIndex.get(), effectiveWriteConcern, binding, resultAccumulator, c);
+            }).<Void>thenApply((nextBatchStartModelIdx, c) -> {
+                nextBatchStartModelIndex.set(nextBatchStartModelIdx);
+                c.complete(c);
+            }).finish(iterationCallback);
+        }, () -> nextBatchStartModelIndex.getNullable() != null
+        ).finish(finalCallback);
+    }
+
     /**
      * @return The start model index of the next batch, provided that the operation
      * {@linkplain ExhaustiveClientBulkWriteCommandOkResponse#operationMayContinue(ConcreteClientBulkWriteOptions) may continue}
@@ -217,27 +268,30 @@ private Integer executeBatch(
         TimeoutContext timeoutContext = operationContext.getTimeoutContext();
         RetryState retryState = initialRetryState(retryWritesSetting, timeoutContext);
         BatchEncoder batchEncoder = new BatchEncoder();
+
         Supplier<ExhaustiveClientBulkWriteCommandOkResponse> retryingBatchExecutor = decorateWriteWithRetries(
                 retryState, operationContext,
                 // Each batch re-selects a server and re-checks out a connection because this is simpler,
                 // and it is allowed by https://jira.mongodb.org/browse/DRIVERS-2502.
-                // If connection pinning is required, {@code binding} handles that,
+                // If connection pinning is required, `binding` handles that,
                 // and `ClientSession`, `TransactionContext` are aware of that.
-                () -> withSourceAndConnection(binding::getWriteConnectionSource, true, (connectionSource, connection) -> {
-                    ConnectionDescription connectionDescription = connection.getDescription();
-                    boolean effectiveRetryWrites = isRetryableWrite(
-                            retryWritesSetting, effectiveWriteConcern, connectionDescription, sessionContext);
-                    retryState.breakAndThrowIfRetryAnd(() -> !effectiveRetryWrites);
-                    resultAccumulator.onNewServerAddress(connectionDescription.getServerAddress());
-                    retryState.attach(AttachmentKeys.maxWireVersion(), connectionDescription.getMaxWireVersion(), true)
-                            .attach(AttachmentKeys.commandDescriptionSupplier(), () -> BULK_WRITE_COMMAND_NAME, false);
-                    ClientBulkWriteCommand bulkWriteCommand = createBulkWriteCommand(
-                            retryState, effectiveRetryWrites, effectiveWriteConcern, sessionContext, unexecutedModels, batchEncoder,
-                            () -> retryState.attach(AttachmentKeys.retryableCommandFlag(), true, true));
-                    return executeBulkWriteCommandAndExhaustOkResponse(
-                            retryState, connectionSource, connection, bulkWriteCommand, effectiveWriteConcern, operationContext);
-                })
+                () -> withSourceAndConnection(binding::getWriteConnectionSource, true,
+                        (connectionSource, connection) -> {
+                            ConnectionDescription connectionDescription = connection.getDescription();
+                            boolean effectiveRetryWrites = isRetryableWrite(
+                                    retryWritesSetting, effectiveWriteConcern, connectionDescription, sessionContext);
+                            retryState.breakAndThrowIfRetryAnd(() -> !effectiveRetryWrites);
+                            resultAccumulator.onNewServerAddress(connectionDescription.getServerAddress());
+                            retryState.attach(AttachmentKeys.maxWireVersion(), connectionDescription.getMaxWireVersion(), true)
+                                    .attach(AttachmentKeys.commandDescriptionSupplier(), () -> BULK_WRITE_COMMAND_NAME, false);
+                            ClientBulkWriteCommand bulkWriteCommand = createBulkWriteCommand(
+                                    retryState, effectiveRetryWrites, effectiveWriteConcern, sessionContext, unexecutedModels, batchEncoder,
+                                    () -> retryState.attach(AttachmentKeys.retryableCommandFlag(), true, true));
+                            return executeBulkWriteCommandAndExhaustOkResponse(
+                                    retryState, connectionSource, connection, bulkWriteCommand, effectiveWriteConcern, operationContext);
+                        })
         );
+
         try {
             ExhaustiveClientBulkWriteCommandOkResponse bulkWriteCommandOkResponse = retryingBatchExecutor.get();
             return resultAccumulator.onBulkWriteCommandOkResponseOrNoResponse(
@@ -248,16 +302,84 @@ private Integer executeBatch(
         } catch (MongoCommandException bulkWriteCommandException) {
             resultAccumulator.onBulkWriteCommandErrorResponse(bulkWriteCommandException);
             throw bulkWriteCommandException;
-        } catch (MongoException e) {
+        } catch (MongoException mongoException) {
             // The server does not have a chance to add "RetryableWriteError" label to `e`,
             // and if it is the last attempt failure, `RetryingSyncSupplier` also may not have a chance
             // to add the label. So we do that explicitly.
-            shouldAttemptToRetryWriteAndAddRetryableLabel(retryState, e);
-            resultAccumulator.onBulkWriteCommandErrorWithoutResponse(e);
-            throw e;
+            shouldAttemptToRetryWriteAndAddRetryableLabel(retryState, mongoException);
+            resultAccumulator.onBulkWriteCommandErrorWithoutResponse(mongoException);
+            throw mongoException;
         }
     }
 
+    /**
+     * @see #executeBatch(int, WriteConcern, WriteBinding, ResultAccumulator)
+     */
+    private void executeBatchAsync(
+            final int batchStartModelIndex,
+            final WriteConcern effectiveWriteConcern,
+            final AsyncWriteBinding binding,
+            final ResultAccumulator resultAccumulator,
+            final SingleResultCallback<Integer> finalCallback) {
+        List<? extends ClientNamespacedWriteModel> unexecutedModels = models.subList(batchStartModelIndex, models.size());
+        assertFalse(unexecutedModels.isEmpty());
+        OperationContext operationContext = binding.getOperationContext();
+        SessionContext sessionContext = operationContext.getSessionContext();
+        TimeoutContext timeoutContext = operationContext.getTimeoutContext();
+        RetryState retryState = initialRetryState(retryWritesSetting, timeoutContext);
+        BatchEncoder batchEncoder = new BatchEncoder();
+
+        AsyncCallbackSupplier<ExhaustiveClientBulkWriteCommandOkResponse> retryingBatchExecutor = decorateWriteWithRetriesAsync(
+                retryState, operationContext,
+                // Each batch re-selects a server and re-checks out a connection because this is simpler,
+                // and it is allowed by https://jira.mongodb.org/browse/DRIVERS-2502.
+                // If connection pinning is required, `binding` handles that,
+                // and `ClientSession`, `TransactionContext` are aware of that.
+                funcCallback -> withAsyncSourceAndConnection(binding::getWriteConnectionSource, true, funcCallback,
+                        (connectionSource, connection, resultCallback) -> {
+                            ConnectionDescription connectionDescription = connection.getDescription();
+                            boolean effectiveRetryWrites = isRetryableWrite(
+                                    retryWritesSetting, effectiveWriteConcern, connectionDescription, sessionContext);
+                            retryState.breakAndThrowIfRetryAnd(() -> !effectiveRetryWrites);
+                            resultAccumulator.onNewServerAddress(connectionDescription.getServerAddress());
+                            retryState.attach(AttachmentKeys.maxWireVersion(), connectionDescription.getMaxWireVersion(), true)
+                                    .attach(AttachmentKeys.commandDescriptionSupplier(), () -> BULK_WRITE_COMMAND_NAME, false);
+                            ClientBulkWriteCommand bulkWriteCommand = createBulkWriteCommand(
+                                    retryState, effectiveRetryWrites, effectiveWriteConcern, sessionContext, unexecutedModels, batchEncoder,
+                                    () -> retryState.attach(AttachmentKeys.retryableCommandFlag(), true, true));
+                            executeBulkWriteCommandAndExhaustOkResponseAsync(
+                                    retryState, connectionSource, connection, bulkWriteCommand, effectiveWriteConcern, operationContext, resultCallback);
+                        })
+        );
+
+        beginAsync().<ExhaustiveClientBulkWriteCommandOkResponse>thenSupply(callback -> {
+            retryingBatchExecutor.get(callback);
+        }).<Integer>thenApply((bulkWriteCommandOkResponse, callback) -> {
+            callback.complete(resultAccumulator.onBulkWriteCommandOkResponseOrNoResponse(
+                    batchStartModelIndex, bulkWriteCommandOkResponse, batchEncoder.intoEncodedBatchInfo()));
+        }).onErrorIf(throwable -> true, (t, callback) -> {
+            if (t instanceof MongoWriteConcernWithResponseException) {
+                MongoWriteConcernWithResponseException mongoWriteConcernWithOkResponseException = (MongoWriteConcernWithResponseException) t;
+                callback.complete(resultAccumulator.onBulkWriteCommandOkResponseWithWriteConcernError(
+                        batchStartModelIndex, mongoWriteConcernWithOkResponseException, batchEncoder.intoEncodedBatchInfo()));
+            } else if (t instanceof MongoCommandException) {
+                MongoCommandException bulkWriteCommandException = (MongoCommandException) t;
+                resultAccumulator.onBulkWriteCommandErrorResponse(bulkWriteCommandException);
+                callback.completeExceptionally(t);
+            } else if (t instanceof MongoException) {
+                MongoException mongoException = (MongoException) t;
+                // The server does not have a chance to add "RetryableWriteError" label to `e`,
+                // and if it is the last attempt failure, `RetryingSyncSupplier` also may not have a chance
+                // to add the label. So we do that explicitly.
+                shouldAttemptToRetryWriteAndAddRetryableLabel(retryState, mongoException);
+                resultAccumulator.onBulkWriteCommandErrorWithoutResponse(mongoException);
+                callback.completeExceptionally(mongoException);
+            } else {
+                callback.completeExceptionally(t);
+            }
+        }).finish(finalCallback);
+    }
+
     /**
      * @throws MongoWriteConcernWithResponseException This internal exception must be handled to avoid it being observed by an application.
      * It {@linkplain MongoWriteConcernWithResponseException#getResponse() bears} the OK response to the {@code bulkWriteCommand},
@@ -287,11 +409,61 @@ private ExhaustiveClientBulkWriteCommandOkResponse executeBulkWriteCommandAndExh
         }
         List<List<BsonDocument>> cursorExhaustBatches = doWithRetriesDisabledForCommand(retryState, "getMore", () ->
                 exhaustBulkWriteCommandOkResponseCursor(connectionSource, connection, bulkWriteCommandOkResponse));
-        ExhaustiveClientBulkWriteCommandOkResponse exhaustiveBulkWriteCommandOkResponse = new ExhaustiveClientBulkWriteCommandOkResponse(
-                bulkWriteCommandOkResponse, cursorExhaustBatches);
+        return createExhaustiveClientBulkWriteCommandOkResponse(
+                bulkWriteCommandOkResponse,
+                cursorExhaustBatches,
+                connection.getDescription());
+    }
+
+    /**
+     * @see #executeBulkWriteCommandAndExhaustOkResponse(RetryState, ConnectionSource, Connection, ClientBulkWriteCommand, WriteConcern, OperationContext)
+     */
+    private void executeBulkWriteCommandAndExhaustOkResponseAsync(
+            final RetryState retryState,
+            final AsyncConnectionSource connectionSource,
+            final AsyncConnection connection,
+            final ClientBulkWriteCommand bulkWriteCommand,
+            final WriteConcern effectiveWriteConcern,
+            final OperationContext operationContext,
+            final SingleResultCallback<ExhaustiveClientBulkWriteCommandOkResponse> finalCallback) {
+        beginAsync().<BsonDocument>thenSupply(callback -> {
+            connection.commandAsync(
+                    "admin",
+                    bulkWriteCommand.getCommandDocument(),
+                    NoOpFieldNameValidator.INSTANCE,
+                    null,
+                    CommandResultDocumentCodec.create(codecRegistry.get(BsonDocument.class), CommandBatchCursorHelper.FIRST_BATCH),
+                    operationContext,
+                    effectiveWriteConcern.isAcknowledged(),
+                    bulkWriteCommand.getOpsAndNsInfo(), callback);
+        }).<ExhaustiveClientBulkWriteCommandOkResponse>thenApply((bulkWriteCommandOkResponse, callback) -> {
+            if (bulkWriteCommandOkResponse == null) {
+                callback.complete((ExhaustiveClientBulkWriteCommandOkResponse) null);
+                return;
+            }
+            beginAsync().<List<List<BsonDocument>>>thenSupply(c -> {
+                doWithRetriesDisabledForCommandAsync(retryState, "getMore", (c1) -> {
+                    exhaustBulkWriteCommandOkResponseCursorAsync(connectionSource, connection, bulkWriteCommandOkResponse, c1);
+                }, c);
+            }).<ExhaustiveClientBulkWriteCommandOkResponse>thenApply((cursorExhaustBatches, c) -> {
+                c.complete(createExhaustiveClientBulkWriteCommandOkResponse(
+                        bulkWriteCommandOkResponse,
+                        cursorExhaustBatches,
+                        connection.getDescription()));
+            }).finish(callback);
+        }).finish(finalCallback);
+    }
+
+    private static ExhaustiveClientBulkWriteCommandOkResponse createExhaustiveClientBulkWriteCommandOkResponse(
+            final BsonDocument bulkWriteCommandOkResponse, final List<List<BsonDocument>> cursorExhaustBatches,
+            final ConnectionDescription connectionDescription) {
+        ExhaustiveClientBulkWriteCommandOkResponse exhaustiveBulkWriteCommandOkResponse =
+                new ExhaustiveClientBulkWriteCommandOkResponse(
+                        bulkWriteCommandOkResponse, cursorExhaustBatches);
+
         // `Connection.command` does not throw `MongoWriteConcernException`, so we have to construct it ourselves
         MongoWriteConcernException writeConcernException = Exceptions.createWriteConcernException(
-                bulkWriteCommandOkResponse, connection.getDescription().getServerAddress());
+                bulkWriteCommandOkResponse, connectionDescription.getServerAddress());
         if (writeConcernException != null) {
             throw new MongoWriteConcernWithResponseException(writeConcernException, exhaustiveBulkWriteCommandOkResponse);
         }
@@ -305,6 +477,7 @@ private <R> R doWithRetriesDisabledForCommand(
         Optional<Boolean> originalRetryableCommandFlag = retryState.attachment(AttachmentKeys.retryableCommandFlag());
         Supplier<String> originalCommandDescriptionSupplier = retryState.attachment(AttachmentKeys.commandDescriptionSupplier())
                 .orElseThrow(Assertions::fail);
+
         try {
             retryState.attach(AttachmentKeys.retryableCommandFlag(), false, true)
                     .attach(AttachmentKeys.commandDescriptionSupplier(), () -> commandDescription, false);
@@ -315,23 +488,63 @@ private <R> R doWithRetriesDisabledForCommand(
         }
     }
 
+    private <R> void doWithRetriesDisabledForCommandAsync(
+            final RetryState retryState,
+            final String commandDescription,
+            final AsyncSupplier<R> actionWithCommand,
+            final SingleResultCallback<R> finalCallback) {
+        Optional<Boolean> originalRetryableCommandFlag = retryState.attachment(AttachmentKeys.retryableCommandFlag());
+        Supplier<String> originalCommandDescriptionSupplier = retryState.attachment(AttachmentKeys.commandDescriptionSupplier())
+                .orElseThrow(Assertions::fail);
+
+        beginAsync().<R>thenSupply(c -> {
+            retryState.attach(AttachmentKeys.retryableCommandFlag(), false, true)
+                    .attach(AttachmentKeys.commandDescriptionSupplier(), () -> commandDescription, false);
+            actionWithCommand.finish(c);
+        }).thenAlwaysRunAndFinish(() -> {
+            originalRetryableCommandFlag.ifPresent(value -> retryState.attach(AttachmentKeys.retryableCommandFlag(), value, true));
+            retryState.attach(AttachmentKeys.commandDescriptionSupplier(), originalCommandDescriptionSupplier, false);
+        }, finalCallback);
+    }
+
     private List<List<BsonDocument>> exhaustBulkWriteCommandOkResponseCursor(
             final ConnectionSource connectionSource,
             final Connection connection,
             final BsonDocument response) {
-        int serverDefaultCursorBatchSize = 0;
         try (CommandBatchCursor<BsonDocument> cursor = cursorDocumentToBatchCursor(
                 TimeoutMode.CURSOR_LIFETIME,
                 response,
-                serverDefaultCursorBatchSize,
+                SERVER_DEFAULT_CURSOR_BATCH_SIZE,
                 codecRegistry.get(BsonDocument.class),
                 options.getComment().orElse(null),
                 connectionSource,
                 connection)) {
-            return stream(spliteratorUnknownSize(cursor, ORDERED | IMMUTABLE), false).collect(toList());
+
+           return cursor.exhaust();
         }
     }
 
+    private void exhaustBulkWriteCommandOkResponseCursorAsync(final AsyncConnectionSource connectionSource,
+                                                              final AsyncConnection connection,
+                                                              final BsonDocument bulkWriteCommandOkResponse,
+                                                              final SingleResultCallback<List<List<BsonDocument>>> finalCallback) {
+        AsyncBatchCursor<BsonDocument> cursor = cursorDocumentToAsyncBatchCursor(
+                TimeoutMode.CURSOR_LIFETIME,
+                bulkWriteCommandOkResponse,
+                SERVER_DEFAULT_CURSOR_BATCH_SIZE,
+                codecRegistry.get(BsonDocument.class),
+                options.getComment().orElse(null),
+                connectionSource,
+                connection);
+
+        beginAsync().<List<List<BsonDocument>>>thenSupply(callback -> {
+             cursor.exhaust(callback);
+        }).thenAlwaysRunAndFinish(() -> {
+             cursor.close();
+        }, finalCallback);
+    }
+
+
     private ClientBulkWriteCommand createBulkWriteCommand(
             final RetryState retryState,
             final boolean effectiveRetryWrites,
diff --git a/driver-core/src/main/com/mongodb/internal/time/TimePoint.java b/driver-core/src/main/com/mongodb/internal/time/TimePoint.java
index d0b95970511..811065d13a6 100644
--- a/driver-core/src/main/com/mongodb/internal/time/TimePoint.java
+++ b/driver-core/src/main/com/mongodb/internal/time/TimePoint.java
@@ -28,6 +28,7 @@
 
 import static com.mongodb.assertions.Assertions.assertNotNull;
 import static com.mongodb.internal.VisibleForTesting.AccessModifier.PRIVATE;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
 
 /**
@@ -234,7 +235,7 @@ public int hashCode() {
     public String toString() {
         String remainingMs = isInfinite()
                 ? "infinite"
-                : "" + TimeUnit.MILLISECONDS.convert(currentNanos() - assertNotNull(nanos), NANOSECONDS);
+                : "" + remaining(MILLISECONDS);
         return "TimePoint{"
                 + "nanos=" + nanos
                 + ", remainingMs=" + remainingMs
diff --git a/driver-core/src/test/functional/com/mongodb/ClusterFixture.java b/driver-core/src/test/functional/com/mongodb/ClusterFixture.java
index a889856f394..dde9682de8d 100644
--- a/driver-core/src/test/functional/com/mongodb/ClusterFixture.java
+++ b/driver-core/src/test/functional/com/mongodb/ClusterFixture.java
@@ -125,7 +125,7 @@ public final class ClusterFixture {
     private static final String MONGODB_OCSP_SHOULD_SUCCEED = "org.mongodb.test.ocsp.tls.should.succeed";
     private static final String DEFAULT_DATABASE_NAME = "JavaDriverTest";
     private static final int COMMAND_NOT_FOUND_ERROR_CODE = 59;
-    public static final long TIMEOUT = 60L;
+    public static final long TIMEOUT = 120L;
     public static final Duration TIMEOUT_DURATION = Duration.ofSeconds(TIMEOUT);
 
     public static final TimeoutSettings TIMEOUT_SETTINGS = new TimeoutSettings(30_000, 10_000, 0, null, SECONDS.toMillis(5));
diff --git a/driver-core/src/test/functional/com/mongodb/client/test/CollectionHelper.java b/driver-core/src/test/functional/com/mongodb/client/test/CollectionHelper.java
index adce165ee51..3e58712ca9c 100644
--- a/driver-core/src/test/functional/com/mongodb/client/test/CollectionHelper.java
+++ b/driver-core/src/test/functional/com/mongodb/client/test/CollectionHelper.java
@@ -357,9 +357,17 @@ public void replaceOne(final Bson filter, final Bson update, final boolean isUps
     }
 
     public void deleteOne(final Bson filter) {
+        delete(filter, false);
+    }
+
+    public void deleteMany(final Bson filter) {
+        delete(filter, true);
+    }
+
+    private void delete(final Bson filter, final boolean multi) {
         new MixedBulkWriteOperation(namespace,
-                                    singletonList(new DeleteRequest(filter.toBsonDocument(Document.class, registry))),
-                                    true, WriteConcern.ACKNOWLEDGED, false)
+                singletonList(new DeleteRequest(filter.toBsonDocument(Document.class, registry)).multi(multi)),
+                true, WriteConcern.ACKNOWLEDGED, false)
                 .execute(getBinding());
     }
 
diff --git a/driver-core/src/test/functional/com/mongodb/internal/operation/AsyncCommandBatchCursorFunctionalTest.java b/driver-core/src/test/functional/com/mongodb/internal/operation/AsyncCommandBatchCursorFunctionalTest.java
index a272f8b0f67..88dc199ee29 100644
--- a/driver-core/src/test/functional/com/mongodb/internal/operation/AsyncCommandBatchCursorFunctionalTest.java
+++ b/driver-core/src/test/functional/com/mongodb/internal/operation/AsyncCommandBatchCursorFunctionalTest.java
@@ -21,8 +21,10 @@
 import com.mongodb.MongoQueryException;
 import com.mongodb.ReadPreference;
 import com.mongodb.ServerCursor;
+import com.mongodb.async.FutureResultCallback;
 import com.mongodb.client.cursor.TimeoutMode;
 import com.mongodb.client.model.CreateCollectionOptions;
+import com.mongodb.client.model.Filters;
 import com.mongodb.client.model.OperationTest;
 import com.mongodb.internal.binding.AsyncConnectionSource;
 import com.mongodb.internal.connection.AsyncConnection;
@@ -103,6 +105,69 @@ void cleanup() {
         });
     }
 
+    @Test
+    @DisplayName("should exhaust cursor with multiple batches")
+    void shouldExhaustCursorAsyncWithMultipleBatches() {
+        // given
+        BsonDocument commandResult = executeFindCommand(0, 3); // Fetch in batches of size 3
+        cursor = new AsyncCommandBatchCursor<>(TimeoutMode.CURSOR_LIFETIME, commandResult, 3, 0, DOCUMENT_DECODER,
+                null, connectionSource, connection);
+
+       // when
+        FutureResultCallback<List<List<Document>>> futureCallback = new FutureResultCallback<>();
+        cursor.exhaust(futureCallback);
+
+        // then
+        List<List<Document>> resultBatches = futureCallback.get(5, TimeUnit.SECONDS);
+
+        assertTrue(cursor.isClosed(), "Expected cursor to be closed.");
+        assertEquals(4, resultBatches.size(), "Expected 4 batches for 10 documents with batch size of 3.");
+
+        int totalDocuments = resultBatches.stream().mapToInt(List::size).sum();
+        assertEquals(10, totalDocuments, "Expected a total of 10 documents.");
+    }
+
+    @Test
+    @DisplayName("should exhaust cursor with closed cursor")
+    void shouldExhaustCursorAsyncWithClosedCursor() {
+        // given
+        BsonDocument commandResult = executeFindCommand(0, 3);
+        cursor = new AsyncCommandBatchCursor<>(TimeoutMode.CURSOR_LIFETIME, commandResult, 3, 0, DOCUMENT_DECODER,
+                null, connectionSource, connection);
+
+        cursor.close();
+
+        // when
+        FutureResultCallback<List<List<Document>>> futureCallback = new FutureResultCallback<>();
+        cursor.exhaust(futureCallback);
+
+        //then
+        IllegalStateException illegalStateException = assertThrows(IllegalStateException.class, () -> {
+            futureCallback.get(5, TimeUnit.SECONDS);
+        }, "Expected an exception when operating on a closed cursor.");
+        assertEquals("Cursor has been closed", illegalStateException.getMessage());
+    }
+
+    @Test
+    @DisplayName("should exhaust cursor with empty cursor")
+    void shouldExhaustCursorAsyncWithEmptyCursor() {
+        // given
+        getCollectionHelper().deleteMany(Filters.empty());
+
+        BsonDocument commandResult = executeFindCommand(0, 3); // No documents to fetch
+        cursor = new AsyncCommandBatchCursor<>(TimeoutMode.CURSOR_LIFETIME, commandResult, 3, 0, DOCUMENT_DECODER,
+                null, connectionSource, connection);
+
+        // when
+        FutureResultCallback<List<List<Document>>> futureCallback = new FutureResultCallback<>();
+        cursor.exhaust(futureCallback);
+
+        // then
+        List<List<Document>> resultBatches = futureCallback.get(5, TimeUnit.SECONDS);
+        assertTrue(resultBatches.isEmpty(), "Expected no batches for an empty cursor.");
+        assertTrue(cursor.isClosed(), "Expected cursor to be closed.");
+    }
+
     @Test
     @DisplayName("server cursor should not be null")
     void theServerCursorShouldNotBeNull() {
diff --git a/driver-core/src/test/functional/com/mongodb/internal/operation/CommandBatchCursorFunctionalTest.java b/driver-core/src/test/functional/com/mongodb/internal/operation/CommandBatchCursorFunctionalTest.java
index 57caf3bdbfc..d9861c71659 100644
--- a/driver-core/src/test/functional/com/mongodb/internal/operation/CommandBatchCursorFunctionalTest.java
+++ b/driver-core/src/test/functional/com/mongodb/internal/operation/CommandBatchCursorFunctionalTest.java
@@ -22,6 +22,7 @@
 import com.mongodb.ServerCursor;
 import com.mongodb.client.cursor.TimeoutMode;
 import com.mongodb.client.model.CreateCollectionOptions;
+import com.mongodb.client.model.Filters;
 import com.mongodb.client.model.OperationTest;
 import com.mongodb.internal.binding.ConnectionSource;
 import com.mongodb.internal.connection.Connection;
@@ -101,6 +102,55 @@ void cleanup() {
         });
     }
 
+    @Test
+    @DisplayName("should exhaust cursor with multiple batches")
+    void shouldExhaustCursorWithMultipleBatches() {
+        // given
+        BsonDocument commandResult = executeFindCommand(0, 3); // Fetch in batches of size 3
+        cursor = new CommandBatchCursor<>(TimeoutMode.CURSOR_LIFETIME, commandResult, 3, 0, DOCUMENT_DECODER,
+                null, connectionSource, connection);
+
+        // when
+        List<List<Document>> result = cursor.exhaust();
+
+        // then
+        assertEquals(4, result.size(), "Expected 4 batches for 10 documents with batch size of 3.");
+
+        int totalDocuments = result.stream().mapToInt(List::size).sum();
+        assertEquals(10, totalDocuments, "Expected a total of 10 documents.");
+    }
+
+    @Test
+    @DisplayName("should exhaust cursor with closed cursor")
+    void shouldExhaustCursorWithClosedCursor() {
+        // given
+        BsonDocument commandResult = executeFindCommand(0, 3);
+        cursor = new CommandBatchCursor<>(TimeoutMode.CURSOR_LIFETIME, commandResult, 3, 0, DOCUMENT_DECODER,
+                null, connectionSource, connection);
+        cursor.close();
+
+        // when & then
+        IllegalStateException illegalStateException = assertThrows(IllegalStateException.class, cursor::exhaust);
+        assertEquals("Cursor has been closed", illegalStateException.getMessage());
+    }
+
+    @Test
+    @DisplayName("should exhaust cursor with empty cursor")
+    void shouldExhaustCursorWithEmptyCursor() {
+        // given
+        getCollectionHelper().deleteMany(Filters.empty());
+
+        BsonDocument commandResult = executeFindCommand(0, 3); // No documents to fetch
+        cursor = new CommandBatchCursor<>(TimeoutMode.CURSOR_LIFETIME, commandResult, 3, 0, DOCUMENT_DECODER,
+                null, connectionSource, connection);
+
+        // when
+        List<List<Document>> result = cursor.exhaust();
+
+        // then
+        assertTrue(result.isEmpty(), "Expected no batches for an empty cursor.");
+    }
+
     @Test
     @DisplayName("server cursor should not be null")
     void theServerCursorShouldNotBeNull() {
diff --git a/driver-core/src/test/unit/com/mongodb/internal/async/AsyncFunctionsAbstractTest.java b/driver-core/src/test/unit/com/mongodb/internal/async/AsyncFunctionsAbstractTest.java
index 65636e2f842..9a9b7552d3e 100644
--- a/driver-core/src/test/unit/com/mongodb/internal/async/AsyncFunctionsAbstractTest.java
+++ b/driver-core/src/test/unit/com/mongodb/internal/async/AsyncFunctionsAbstractTest.java
@@ -748,6 +748,26 @@ void testRetryLoop() {
                 });
     }
 
+    @Test
+    void testDoWhileLoop() {
+        assertBehavesSameVariations(67,
+                () -> {
+                    do {
+                        plain(0);
+                        sync(1);
+                    } while (plainTest(2));
+                },
+                (finalCallback) -> {
+                    beginAsync().thenRunDoWhileLoop(
+                          callback -> {
+                              plain(0);
+                              async(1, callback);
+                          },
+                          () -> plainTest(2)
+                    ).finish(finalCallback);
+                });
+    }
+
     @Test
     void testFinallyWithPlainInsideTry() {
         // (in try: normal flow + exception + exception) * (in finally: normal + exception) = 6
@@ -793,6 +813,51 @@ void testFinallyWithPlainOutsideTry() {
                 });
     }
 
+    @Test
+    void testSupplyFinallyWithPlainInsideTry() {
+        assertBehavesSameVariations(6,
+                () -> {
+                    try {
+                        plain(1);
+                        return syncReturns(2);
+                    } finally {
+                        plain(3);
+                    }
+                },
+                (callback) -> {
+                    beginAsync().<Integer>thenSupply(c -> {
+                        plain(1);
+                        asyncReturns(2, c);
+                    }).thenAlwaysRunAndFinish(() -> {
+                        plain(3);
+                    }, callback);
+                });
+    }
+
+    @Test
+    void testSupplyFinallyWithPlainOutsideTry() {
+        assertBehavesSameVariations(5,
+                () -> {
+                    plain(1);
+                    try {
+                        return syncReturns(2);
+                    } finally {
+                        plain(3);
+                    }
+                },
+                (callback) -> {
+                    beginAsync().<Integer>thenSupply(c -> {
+                        plain(1);
+                        beginAsync().<Integer>thenSupply(c2 -> {
+                            asyncReturns(2, c2);
+                        }).thenAlwaysRunAndFinish(() -> {
+                            plain(3);
+                        }, c);
+                    }).finish(callback);
+                });
+    }
+
+
     @Test
     void testUsedAsLambda() {
         assertBehavesSameVariations(4,
diff --git a/driver-core/src/test/unit/com/mongodb/internal/async/AsyncFunctionsTestBase.java b/driver-core/src/test/unit/com/mongodb/internal/async/AsyncFunctionsTestBase.java
index 1229dbcfcad..10a58152d9f 100644
--- a/driver-core/src/test/unit/com/mongodb/internal/async/AsyncFunctionsTestBase.java
+++ b/driver-core/src/test/unit/com/mongodb/internal/async/AsyncFunctionsTestBase.java
@@ -256,8 +256,9 @@ private <T> void assertBehavesSame(final Supplier<T> sync, final Runnable betwee
         await(wasCalledFuture, "Callback should have been called");
 
         // The following code can be used to debug variations:
-//        System.out.println("===VARIATION START");
+//        System.out.println("===VARIATION START: " + invocationTracker.getVariationCount());
 //        System.out.println("sync: " + expectedEvents);
+//        System.out.println("sync size: " + expectedEvents.size());
 //        System.out.println("callback called?: " + wasCalledFuture.isDone());
 //        System.out.println("value -- sync: " + expectedValue + " -- async: " + actualValue.get());
 //        System.out.println("excep -- sync: " + expectedException + " -- async: " + actualException.get());
diff --git a/driver-kotlin-coroutine/src/integration/kotlin/com/mongodb/kotlin/client/coroutine/syncadapter/SyncMongoCluster.kt b/driver-kotlin-coroutine/src/integration/kotlin/com/mongodb/kotlin/client/coroutine/syncadapter/SyncMongoCluster.kt
index 4fcb4a8852a..2c377e41d41 100644
--- a/driver-kotlin-coroutine/src/integration/kotlin/com/mongodb/kotlin/client/coroutine/syncadapter/SyncMongoCluster.kt
+++ b/driver-kotlin-coroutine/src/integration/kotlin/com/mongodb/kotlin/client/coroutine/syncadapter/SyncMongoCluster.kt
@@ -115,24 +115,27 @@ internal open class SyncMongoCluster(open val wrapped: MongoCluster) : JMongoClu
         SyncChangeStreamIterable(wrapped.watch(clientSession.unwrapped(), pipeline, resultClass))
 
     override fun bulkWrite(models: MutableList<out ClientNamespacedWriteModel>): ClientBulkWriteResult {
-        org.junit.jupiter.api.Assumptions.assumeTrue(java.lang.Boolean.parseBoolean(toString()), "BULK-TODO implement")
-        TODO("BULK-TODO implement")
+        org.junit.jupiter.api.Assumptions.assumeTrue(
+            java.lang.Boolean.parseBoolean(toString()), "BULK-TODO Kotlin implement")
+        TODO("BULK-TODO Kotlin implement")
     }
 
     override fun bulkWrite(
         models: MutableList<out ClientNamespacedWriteModel>,
         options: ClientBulkWriteOptions
     ): ClientBulkWriteResult {
-        org.junit.jupiter.api.Assumptions.assumeTrue(java.lang.Boolean.parseBoolean(toString()), "BULK-TODO implement")
-        TODO("BULK-TODO implement")
+        org.junit.jupiter.api.Assumptions.assumeTrue(
+            java.lang.Boolean.parseBoolean(toString()), "BULK-TODO Kotlin implement")
+        TODO("BULK-TODO Kotlin implement")
     }
 
     override fun bulkWrite(
         clientSession: ClientSession,
         models: MutableList<out ClientNamespacedWriteModel>
     ): ClientBulkWriteResult {
-        org.junit.jupiter.api.Assumptions.assumeTrue(java.lang.Boolean.parseBoolean(toString()), "BULK-TODO implement")
-        TODO("BULK-TODO implement")
+        org.junit.jupiter.api.Assumptions.assumeTrue(
+            java.lang.Boolean.parseBoolean(toString()), "BULK-TODO Kotlin implement")
+        TODO("BULK-TODO Kotlin implement")
     }
 
     override fun bulkWrite(
@@ -140,8 +143,9 @@ internal open class SyncMongoCluster(open val wrapped: MongoCluster) : JMongoClu
         models: MutableList<out ClientNamespacedWriteModel>,
         options: ClientBulkWriteOptions
     ): ClientBulkWriteResult {
-        org.junit.jupiter.api.Assumptions.assumeTrue(java.lang.Boolean.parseBoolean(toString()), "BULK-TODO implement")
-        TODO("BULK-TODO implement")
+        org.junit.jupiter.api.Assumptions.assumeTrue(
+            java.lang.Boolean.parseBoolean(toString()), "BULK-TODO Kotlin implement")
+        TODO("BULK-TODO Kotlin implement")
     }
 
     private fun ClientSession.unwrapped() = (this as SyncClientSession).wrapped
diff --git a/driver-kotlin-sync/src/integration/kotlin/com/mongodb/kotlin/client/syncadapter/SyncMongoCluster.kt b/driver-kotlin-sync/src/integration/kotlin/com/mongodb/kotlin/client/syncadapter/SyncMongoCluster.kt
index b7235d80479..a4ad9bd1418 100644
--- a/driver-kotlin-sync/src/integration/kotlin/com/mongodb/kotlin/client/syncadapter/SyncMongoCluster.kt
+++ b/driver-kotlin-sync/src/integration/kotlin/com/mongodb/kotlin/client/syncadapter/SyncMongoCluster.kt
@@ -114,24 +114,27 @@ internal open class SyncMongoCluster(open val wrapped: MongoCluster) : JMongoClu
         SyncChangeStreamIterable(wrapped.watch(clientSession.unwrapped(), pipeline, resultClass))
 
     override fun bulkWrite(models: MutableList<out ClientNamespacedWriteModel>): ClientBulkWriteResult {
-        org.junit.jupiter.api.Assumptions.assumeTrue(java.lang.Boolean.parseBoolean(toString()), "BULK-TODO implement")
-        TODO("BULK-TODO implement")
+        org.junit.jupiter.api.Assumptions.assumeTrue(
+            java.lang.Boolean.parseBoolean(toString()), "BULK-TODO Kotlin implement")
+        TODO("BULK-TODO Kotlin implement")
     }
 
     override fun bulkWrite(
         models: MutableList<out ClientNamespacedWriteModel>,
         options: ClientBulkWriteOptions
     ): ClientBulkWriteResult {
-        org.junit.jupiter.api.Assumptions.assumeTrue(java.lang.Boolean.parseBoolean(toString()), "BULK-TODO implement")
-        TODO("BULK-TODO implement")
+        org.junit.jupiter.api.Assumptions.assumeTrue(
+            java.lang.Boolean.parseBoolean(toString()), "BULK-TODO Kotlin implement")
+        TODO("BULK-TODO Kotlin implement")
     }
 
     override fun bulkWrite(
         clientSession: ClientSession,
         models: MutableList<out ClientNamespacedWriteModel>
     ): ClientBulkWriteResult {
-        org.junit.jupiter.api.Assumptions.assumeTrue(java.lang.Boolean.parseBoolean(toString()), "BULK-TODO implement")
-        TODO("BULK-TODO implement")
+        org.junit.jupiter.api.Assumptions.assumeTrue(
+            java.lang.Boolean.parseBoolean(toString()), "BULK-TODO Kotlin implement")
+        TODO("BULK-TODO Kotlin implement")
     }
 
     override fun bulkWrite(
@@ -139,8 +142,9 @@ internal open class SyncMongoCluster(open val wrapped: MongoCluster) : JMongoClu
         models: MutableList<out ClientNamespacedWriteModel>,
         options: ClientBulkWriteOptions
     ): ClientBulkWriteResult {
-        org.junit.jupiter.api.Assumptions.assumeTrue(java.lang.Boolean.parseBoolean(toString()), "BULK-TODO implement")
-        TODO("BULK-TODO implement")
+        org.junit.jupiter.api.Assumptions.assumeTrue(
+            java.lang.Boolean.parseBoolean(toString()), "BULK-TODO Kotlin implement")
+        TODO("BULK-TODO Kotlin implement")
     }
 
     private fun ClientSession.unwrapped() = (this as SyncClientSession).wrapped
diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/MongoCluster.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/MongoCluster.java
index ef7c0ddb79d..edcc8f29408 100644
--- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/MongoCluster.java
+++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/MongoCluster.java
@@ -16,7 +16,10 @@
 
 package com.mongodb.reactivestreams.client;
 
+import com.mongodb.ClientBulkWriteException;
 import com.mongodb.ClientSessionOptions;
+import com.mongodb.MongoClientSettings;
+import com.mongodb.MongoException;
 import com.mongodb.MongoNamespace;
 import com.mongodb.ReadConcern;
 import com.mongodb.ReadPreference;
@@ -24,6 +27,11 @@
 import com.mongodb.annotations.Alpha;
 import com.mongodb.annotations.Immutable;
 import com.mongodb.annotations.Reason;
+import com.mongodb.client.model.bulk.ClientBulkWriteOptions;
+import com.mongodb.client.model.bulk.ClientBulkWriteResult;
+import com.mongodb.client.model.bulk.ClientNamespacedDeleteManyModel;
+import com.mongodb.client.model.bulk.ClientNamespacedUpdateManyModel;
+import com.mongodb.client.model.bulk.ClientNamespacedWriteModel;
 import com.mongodb.lang.Nullable;
 import org.bson.Document;
 import org.bson.codecs.configuration.CodecRegistry;
@@ -353,4 +361,135 @@ public interface MongoCluster {
      * @mongodb.driver.dochub core/changestreams Change Streams
      */
     <TResult> ChangeStreamPublisher<TResult> watch(ClientSession clientSession, List<? extends Bson> pipeline, Class<TResult> resultClass);
+
+    /**
+     * Executes a client-level bulk write operation.
+     * This method is functionally equivalent to {@link #bulkWrite(List, ClientBulkWriteOptions)}
+     * with the {@linkplain ClientBulkWriteOptions#clientBulkWriteOptions() default options}.
+     * <p>
+     * This operation supports {@linkplain MongoClientSettings#getRetryWrites() retryable writes}.
+     * Depending on the number of {@code models}, encoded size of {@code models}, and the size limits in effect,
+     * executing this operation may require multiple {@code bulkWrite} commands.
+     * The eligibility for retries is determined per each {@code bulkWrite} command:
+     * {@link ClientNamespacedUpdateManyModel}, {@link ClientNamespacedDeleteManyModel} in a command render it non-retryable.</p>
+     * <p>
+     * This operation is not supported by MongoDB Atlas Serverless instances.</p>
+     *
+     * @param models The {@linkplain ClientNamespacedWriteModel individual write operations}.
+     * @return The {@link Publisher} signalling at most one element {@link ClientBulkWriteResult} if the operation is successful,
+     * or the following errors:
+     * <ul>
+     *     <li>
+     *     {@link ClientBulkWriteException} - If and only if the operation is unsuccessful or partially unsuccessful,
+     *     and there is at least one of the following pieces of information to report:
+     *     {@link ClientBulkWriteException#getWriteConcernErrors()}, {@link ClientBulkWriteException#getWriteErrors()},
+     *     {@link ClientBulkWriteException#getPartialResult()}.</li>
+     *     <li>
+     *     {@link MongoException} - Only if the operation is unsuccessful.</li>
+     * </ul>
+     * @since 5.3
+     * @mongodb.server.release 8.0
+     * @mongodb.driver.manual reference/command/bulkWrite/ bulkWrite
+     */
+    Publisher<ClientBulkWriteResult> bulkWrite(List<? extends ClientNamespacedWriteModel> models);
+
+    /**
+     * Executes a client-level bulk write operation.
+     * <p>
+     * This operation supports {@linkplain MongoClientSettings#getRetryWrites() retryable writes}.
+     * Depending on the number of {@code models}, encoded size of {@code models}, and the size limits in effect,
+     * executing this operation may require multiple {@code bulkWrite} commands.
+     * The eligibility for retries is determined per each {@code bulkWrite} command:
+     * {@link ClientNamespacedUpdateManyModel}, {@link ClientNamespacedDeleteManyModel} in a command render it non-retryable.</p>
+     * <p>
+     * This operation is not supported by MongoDB Atlas Serverless instances.</p>
+     *
+     * @param models The {@linkplain ClientNamespacedWriteModel individual write operations}.
+     * @param options The options.
+     * @return The {@link Publisher} signalling at most one element {@link ClientBulkWriteResult} if the operation is successful,
+     * or the following errors:
+     * <ul>
+     *     <li>
+     *     {@link ClientBulkWriteException} - If and only if the operation is unsuccessful or partially unsuccessful,
+     *     and there is at least one of the following pieces of information to report:
+     *     {@link ClientBulkWriteException#getWriteConcernErrors()}, {@link ClientBulkWriteException#getWriteErrors()},
+     *     {@link ClientBulkWriteException#getPartialResult()}.</li>
+     *     <li>
+     *     {@link MongoException} - Only if the operation is unsuccessful.</li>
+     * </ul>
+     * @since 5.3
+     * @mongodb.server.release 8.0
+     * @mongodb.driver.manual reference/command/bulkWrite/ bulkWrite
+     */
+    Publisher<ClientBulkWriteResult> bulkWrite(
+            List<? extends ClientNamespacedWriteModel> models,
+            ClientBulkWriteOptions options);
+
+    /**
+     * Executes a client-level bulk write operation.
+     * This method is functionally equivalent to {@link #bulkWrite(ClientSession, List, ClientBulkWriteOptions)}
+     * with the {@linkplain ClientBulkWriteOptions#clientBulkWriteOptions() default options}.
+     * <p>
+     * This operation supports {@linkplain MongoClientSettings#getRetryWrites() retryable writes}.
+     * Depending on the number of {@code models}, encoded size of {@code models}, and the size limits in effect,
+     * executing this operation may require multiple {@code bulkWrite} commands.
+     * The eligibility for retries is determined per each {@code bulkWrite} command:
+     * {@link ClientNamespacedUpdateManyModel}, {@link ClientNamespacedDeleteManyModel} in a command render it non-retryable.</p>
+     * <p>
+     * This operation is not supported by MongoDB Atlas Serverless instances.</p>
+     *
+     * @param clientSession The {@linkplain ClientSession client session} with which to associate this operation.
+     * @param models The {@linkplain ClientNamespacedWriteModel individual write operations}.
+     * @return The {@link Publisher} signalling at most one element {@link ClientBulkWriteResult} if the operation is successful,
+     * or the following errors:
+     * <ul>
+     *     <li>
+     *     {@link ClientBulkWriteException} - If and only if the operation is unsuccessful or partially unsuccessful,
+     *     and there is at least one of the following pieces of information to report:
+     *     {@link ClientBulkWriteException#getWriteConcernErrors()}, {@link ClientBulkWriteException#getWriteErrors()},
+     *     {@link ClientBulkWriteException#getPartialResult()}.</li>
+     *     <li>
+     *     {@link MongoException} - Only if the operation is unsuccessful.</li>
+     * </ul>
+     * @since 5.3
+     * @mongodb.server.release 8.0
+     * @mongodb.driver.manual reference/command/bulkWrite/ bulkWrite
+     */
+    Publisher<ClientBulkWriteResult> bulkWrite(
+            ClientSession clientSession,
+            List<? extends ClientNamespacedWriteModel> models);
+
+    /**
+     * Executes a client-level bulk write operation.
+     * <p>
+     * This operation supports {@linkplain MongoClientSettings#getRetryWrites() retryable writes}.
+     * Depending on the number of {@code models}, encoded size of {@code models}, and the size limits in effect,
+     * executing this operation may require multiple {@code bulkWrite} commands.
+     * The eligibility for retries is determined per each {@code bulkWrite} command:
+     * {@link ClientNamespacedUpdateManyModel}, {@link ClientNamespacedDeleteManyModel} in a command render it non-retryable.</p>
+     * <p>
+     * This operation is not supported by MongoDB Atlas Serverless instances.</p>
+     *
+     * @param clientSession The {@linkplain ClientSession client session} with which to associate this operation.
+     * @param models The {@linkplain ClientNamespacedWriteModel individual write operations}.
+     * @param options The options.
+     * @return The {@link Publisher} signalling at most one element {@link ClientBulkWriteResult} if the operation is successful,
+     * or the following errors:
+     * <ul>
+     *     <li>
+     *     {@link ClientBulkWriteException} - If and only if the operation is unsuccessful or partially unsuccessful,
+     *     and there is at least one of the following pieces of information to report:
+     *     {@link ClientBulkWriteException#getWriteConcernErrors()}, {@link ClientBulkWriteException#getWriteErrors()},
+     *     {@link ClientBulkWriteException#getPartialResult()}.</li>
+     *     <li>
+     *     {@link MongoException} - Only if the operation is unsuccessful.</li>
+     * </ul>
+     * @since 5.3
+     * @mongodb.server.release 8.0
+     * @mongodb.driver.manual reference/command/bulkWrite/ bulkWrite
+     */
+    Publisher<ClientBulkWriteResult> bulkWrite(
+            ClientSession clientSession,
+            List<? extends ClientNamespacedWriteModel> models,
+            ClientBulkWriteOptions options);
 }
diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MongoClientImpl.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MongoClientImpl.java
index 27a0c9195c3..3d4822eb7e3 100644
--- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MongoClientImpl.java
+++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MongoClientImpl.java
@@ -24,6 +24,9 @@
 import com.mongodb.ReadConcern;
 import com.mongodb.ReadPreference;
 import com.mongodb.WriteConcern;
+import com.mongodb.client.model.bulk.ClientBulkWriteOptions;
+import com.mongodb.client.model.bulk.ClientBulkWriteResult;
+import com.mongodb.client.model.bulk.ClientNamespacedWriteModel;
 import com.mongodb.connection.ClusterDescription;
 import com.mongodb.internal.TimeoutSettings;
 import com.mongodb.internal.connection.Cluster;
@@ -229,6 +232,30 @@ public <TResult> ChangeStreamPublisher<TResult> watch(
         return delegate.watch(clientSession, pipeline, resultClass);
     }
 
+    @Override
+    public Publisher<ClientBulkWriteResult> bulkWrite(final List<? extends ClientNamespacedWriteModel> models) {
+        return delegate.bulkWrite(models);
+    }
+
+    @Override
+    public Publisher<ClientBulkWriteResult> bulkWrite(final List<? extends ClientNamespacedWriteModel> models,
+                                                      final ClientBulkWriteOptions options) {
+        return delegate.bulkWrite(models, options);
+    }
+
+    @Override
+    public Publisher<ClientBulkWriteResult> bulkWrite(final ClientSession clientSession,
+                                                      final List<? extends ClientNamespacedWriteModel> models) {
+        return delegate.bulkWrite(clientSession, models);
+    }
+
+    @Override
+    public Publisher<ClientBulkWriteResult> bulkWrite(final ClientSession clientSession,
+                                                      final List<? extends ClientNamespacedWriteModel> models,
+                                                      final ClientBulkWriteOptions options) {
+        return delegate.bulkWrite(clientSession, models, options);
+    }
+
     @Override
     public Publisher<ClientSession> startSession() {
         return delegate.startSession();
diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MongoClusterImpl.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MongoClusterImpl.java
index 72bcf53e303..04028ecc684 100644
--- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MongoClusterImpl.java
+++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MongoClusterImpl.java
@@ -20,6 +20,9 @@
 import com.mongodb.ReadConcern;
 import com.mongodb.ReadPreference;
 import com.mongodb.WriteConcern;
+import com.mongodb.client.model.bulk.ClientBulkWriteOptions;
+import com.mongodb.client.model.bulk.ClientBulkWriteResult;
+import com.mongodb.client.model.bulk.ClientNamespacedWriteModel;
 import com.mongodb.internal.TimeoutSettings;
 import com.mongodb.internal.client.model.changestream.ChangeStreamLevel;
 import com.mongodb.internal.connection.Cluster;
@@ -42,6 +45,7 @@
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
+import static com.mongodb.assertions.Assertions.isTrueArgument;
 import static com.mongodb.assertions.Assertions.notNull;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 
@@ -237,4 +241,40 @@ public <T> ChangeStreamPublisher<T> watch(final ClientSession clientSession, fin
                 resultClass, pipeline, ChangeStreamLevel.CLIENT);
     }
 
+    @Override
+    public Publisher<ClientBulkWriteResult> bulkWrite(final List<? extends ClientNamespacedWriteModel> clientWriteModels) {
+        notNull("clientWriteModels", clientWriteModels);
+        isTrueArgument("`clientWriteModels` must not be empty", !clientWriteModels.isEmpty());
+        return mongoOperationPublisher.clientBulkWrite(null, clientWriteModels, null);
+    }
+
+    @Override
+    public Publisher<ClientBulkWriteResult> bulkWrite(final List<? extends ClientNamespacedWriteModel> clientWriteModels,
+                                                      final ClientBulkWriteOptions options) {
+        notNull("clientWriteModels", clientWriteModels);
+        isTrueArgument("`clientWriteModels` must not be empty", !clientWriteModels.isEmpty());
+        notNull("options", options);
+        return mongoOperationPublisher.clientBulkWrite(null, clientWriteModels, options);
+    }
+
+    @Override
+    public Publisher<ClientBulkWriteResult> bulkWrite(final ClientSession clientSession,
+                                                      final List<? extends ClientNamespacedWriteModel> clientWriteModels) {
+        notNull("clientSession", clientSession);
+        notNull("clientWriteModels", clientWriteModels);
+        isTrueArgument("`clientWriteModels` must not be empty", !clientWriteModels.isEmpty());
+        return mongoOperationPublisher.clientBulkWrite(clientSession, clientWriteModels, null);
+    }
+
+    @Override
+    public Publisher<ClientBulkWriteResult> bulkWrite(final ClientSession clientSession,
+                                                      final List<? extends ClientNamespacedWriteModel> clientWriteModels,
+                                                      final ClientBulkWriteOptions options) {
+        notNull("clientSession", clientSession);
+        notNull("clientWriteModels", clientWriteModels);
+        isTrueArgument("`clientWriteModels` must not be empty", !clientWriteModels.isEmpty());
+        notNull("options", options);
+        return mongoOperationPublisher.clientBulkWrite(clientSession, clientWriteModels, options);
+    }
+
 }
diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MongoOperationPublisher.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MongoOperationPublisher.java
index 5ccea518cb5..58030f75fa9 100644
--- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MongoOperationPublisher.java
+++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MongoOperationPublisher.java
@@ -50,6 +50,9 @@
 import com.mongodb.client.model.SearchIndexModel;
 import com.mongodb.client.model.UpdateOptions;
 import com.mongodb.client.model.WriteModel;
+import com.mongodb.client.model.bulk.ClientBulkWriteOptions;
+import com.mongodb.client.model.bulk.ClientBulkWriteResult;
+import com.mongodb.client.model.bulk.ClientNamespacedWriteModel;
 import com.mongodb.client.result.DeleteResult;
 import com.mongodb.client.result.InsertManyResult;
 import com.mongodb.client.result.InsertOneResult;
@@ -80,6 +83,7 @@
 import java.util.function.Function;
 import java.util.function.Supplier;
 
+import static com.mongodb.assertions.Assertions.isTrue;
 import static com.mongodb.assertions.Assertions.notNull;
 import static java.util.Collections.singletonList;
 import static org.bson.codecs.configuration.CodecRegistries.withUuidRepresentation;
@@ -91,6 +95,7 @@ public final class MongoOperationPublisher<T> {
 
     private final AsyncOperations<T> operations;
     private final UuidRepresentation uuidRepresentation;
+    @Nullable
     private final AutoEncryptionSettings autoEncryptionSettings;
     private final OperationExecutor executor;
 
@@ -289,6 +294,16 @@ Publisher<BulkWriteResult> bulkWrite(
                 () -> operations.bulkWrite(notNull("requests", requests), notNull("options", options)), clientSession);
     }
 
+    Publisher<ClientBulkWriteResult> clientBulkWrite(
+            @Nullable final ClientSession clientSession,
+            final List<? extends ClientNamespacedWriteModel> clientWriteModels,
+            @Nullable final ClientBulkWriteOptions options) {
+        isTrue("`autoEncryptionSettings` is null, as bulkWrite does not currently support automatic encryption", autoEncryptionSettings == null);
+        return createWriteOperationMono(
+                operations::getTimeoutSettings,
+                () -> operations.clientBulkWriteOperation(clientWriteModels, options), clientSession);
+    }
+
     Publisher<InsertOneResult> insertOne(@Nullable final ClientSession clientSession, final T document, final InsertOneOptions options) {
         return createSingleWriteRequestMono(() -> operations.insertOne(notNull("document", document),
                                                                        notNull("options", options)),
diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ClientSideOperationTimeoutProseTest.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ClientSideOperationTimeoutProseTest.java
index fbeffd7b369..75a19536cb7 100644
--- a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ClientSideOperationTimeoutProseTest.java
+++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ClientSideOperationTimeoutProseTest.java
@@ -489,13 +489,6 @@ public void testTimeoutMsISHonoredForNnextOperationWhenSeveralGetMoreExecutedInt
         }
     }
 
-    @DisplayName("11. Multi-batch bulkWrites")
-    @Test
-    @Override
-    protected void test11MultiBatchBulkWrites() {
-        assumeTrue(java.lang.Boolean.parseBoolean(toString()), "BULK-TODO implement");
-    }
-
     private static void assertCommandStartedEventsInOder(final List<String> expectedCommandNames,
                                                          final List<CommandStartedEvent> commandStartedEvents) {
         assertEquals(expectedCommandNames.size(), commandStartedEvents.size(), "Expected: " + expectedCommandNames + ". Actual: "
diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/CrudProseTest.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/CrudProseTest.java
index 22bb1a23d77..81d88e6fdb0 100644
--- a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/CrudProseTest.java
+++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/CrudProseTest.java
@@ -18,15 +18,6 @@
 import com.mongodb.MongoClientSettings;
 import com.mongodb.client.MongoClient;
 import com.mongodb.reactivestreams.client.syncadapter.SyncMongoClient;
-import org.junit.jupiter.api.DisplayName;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.MethodSource;
-import org.junit.jupiter.params.provider.ValueSource;
-
-import java.util.function.Supplier;
-
-import static org.junit.jupiter.api.Assumptions.assumeTrue;
 
 /**
  * See
@@ -37,64 +28,4 @@ final class CrudProseTest extends com.mongodb.client.CrudProseTest {
     protected MongoClient createMongoClient(final MongoClientSettings.Builder mongoClientSettingsBuilder) {
         return new SyncMongoClient(MongoClients.create(mongoClientSettingsBuilder.build()));
     }
-
-    @DisplayName("5. MongoClient.bulkWrite collects WriteConcernErrors across batches")
-    @Test
-    @Override
-    protected void testBulkWriteCollectsWriteConcernErrorsAcrossBatches() {
-        assumeTrue(java.lang.Boolean.parseBoolean(toString()), "BULK-TODO implement");
-    }
-
-    @DisplayName("6. MongoClient.bulkWrite handles individual WriteErrors across batches")
-    @ParameterizedTest
-    @ValueSource(booleans = {false, true})
-    @Override
-    protected void testBulkWriteHandlesWriteErrorsAcrossBatches(final boolean ordered) {
-        assumeTrue(java.lang.Boolean.parseBoolean(toString()), "BULK-TODO implement");
-    }
-
-    @DisplayName("8. MongoClient.bulkWrite handles a cursor requiring getMore within a transaction")
-    @Test
-    @Override
-    protected void testBulkWriteHandlesCursorRequiringGetMoreWithinTransaction() {
-        assumeTrue(java.lang.Boolean.parseBoolean(toString()), "BULK-TODO implement");
-    }
-
-    @DisplayName("11. MongoClient.bulkWrite batch splits when the addition of a new namespace exceeds the maximum message size")
-    @Test
-    @Override
-    protected void testBulkWriteSplitsWhenExceedingMaxMessageSizeBytesDueToNsInfo() {
-        assumeTrue(java.lang.Boolean.parseBoolean(toString()), "BULK-TODO implement");
-    }
-
-    @DisplayName("12. MongoClient.bulkWrite returns an error if no operations can be added to ops")
-    @ParameterizedTest
-    @ValueSource(strings = {"document", "namespace"})
-    @Override
-    protected void testBulkWriteSplitsErrorsForTooLargeOpsOrNsInfo(final String tooLarge) {
-        assumeTrue(java.lang.Boolean.parseBoolean(toString()), "BULK-TODO implement");
-    }
-
-    @DisplayName("13. MongoClient.bulkWrite returns an error if auto-encryption is configured")
-    @Test
-    @Override
-    protected void testBulkWriteErrorsForAutoEncryption() {
-        assumeTrue(java.lang.Boolean.parseBoolean(toString()), "BULK-TODO implement");
-    }
-
-    @DisplayName("15. MongoClient.bulkWrite with unacknowledged write concern uses w:0 for all batches")
-    @Test
-    protected void testWriteConcernOfAllBatchesWhenUnacknowledgedRequested() {
-        assumeTrue(java.lang.Boolean.parseBoolean(toString()), "BULK-TODO implement");
-    }
-
-    @ParameterizedTest
-    @MethodSource("insertMustGenerateIdAtMostOnceArgs")
-    @Override
-    protected <TDocument> void insertMustGenerateIdAtMostOnce(
-            final Class<TDocument> documentClass,
-            final boolean expectIdGenerated,
-            final Supplier<TDocument> documentSupplier) {
-        assumeTrue(java.lang.Boolean.parseBoolean(toString()), "BULK-TODO implement");
-    }
 }
diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/syncadapter/SyncMongoCluster.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/syncadapter/SyncMongoCluster.java
index 8ded1f38865..fc3cad4b6a7 100644
--- a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/syncadapter/SyncMongoCluster.java
+++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/syncadapter/SyncMongoCluster.java
@@ -21,7 +21,6 @@
 import com.mongodb.ReadConcern;
 import com.mongodb.ReadPreference;
 import com.mongodb.WriteConcern;
-import com.mongodb.assertions.Assertions;
 import com.mongodb.client.ChangeStreamIterable;
 import com.mongodb.client.ClientSession;
 import com.mongodb.client.ListDatabasesIterable;
@@ -29,8 +28,8 @@
 import com.mongodb.client.MongoDatabase;
 import com.mongodb.client.MongoIterable;
 import com.mongodb.client.model.bulk.ClientBulkWriteOptions;
-import com.mongodb.client.model.bulk.ClientNamespacedWriteModel;
 import com.mongodb.client.model.bulk.ClientBulkWriteResult;
+import com.mongodb.client.model.bulk.ClientNamespacedWriteModel;
 import org.bson.BsonDocument;
 import org.bson.Document;
 import org.bson.codecs.configuration.CodecRegistry;
@@ -286,24 +285,22 @@ public <TResult> ChangeStreamIterable<TResult> watch(final ClientSession clientS
     @Override
     public ClientBulkWriteResult bulkWrite(
             final List<? extends ClientNamespacedWriteModel> clientWriteModels) throws ClientBulkWriteException {
-        org.junit.jupiter.api.Assumptions.assumeTrue(Boolean.parseBoolean(toString()), "BULK-TODO implement");
-        throw Assertions.fail("BULK-TODO implement");
+        return requireNonNull(Mono.from(wrapped.bulkWrite(clientWriteModels)).contextWrite(CONTEXT).block(TIMEOUT_DURATION));
     }
 
     @Override
     public ClientBulkWriteResult bulkWrite(
             final List<? extends ClientNamespacedWriteModel> clientWriteModels,
             final ClientBulkWriteOptions options) throws ClientBulkWriteException {
-        org.junit.jupiter.api.Assumptions.assumeTrue(Boolean.parseBoolean(toString()), "BULK-TODO implement");
-        throw Assertions.fail("BULK-TODO implement");
+        return requireNonNull(Mono.from(wrapped.bulkWrite(clientWriteModels, options)).contextWrite(CONTEXT).block(TIMEOUT_DURATION));
     }
 
     @Override
     public ClientBulkWriteResult bulkWrite(
             final ClientSession clientSession,
             final List<? extends ClientNamespacedWriteModel> clientWriteModels) throws ClientBulkWriteException {
-        org.junit.jupiter.api.Assumptions.assumeTrue(Boolean.parseBoolean(toString()), "BULK-TODO implement");
-        throw Assertions.fail("BULK-TODO implement");
+        return requireNonNull(
+                Mono.from(wrapped.bulkWrite(unwrap(clientSession), clientWriteModels)).contextWrite(CONTEXT).block(TIMEOUT_DURATION));
     }
 
     @Override
@@ -311,8 +308,8 @@ public ClientBulkWriteResult bulkWrite(
             final ClientSession clientSession,
             final List<? extends ClientNamespacedWriteModel> clientWriteModels,
             final ClientBulkWriteOptions options) throws ClientBulkWriteException {
-        org.junit.jupiter.api.Assumptions.assumeTrue(Boolean.parseBoolean(toString()), "BULK-TODO implement");
-        throw Assertions.fail("BULK-TODO implement");
+        return requireNonNull(Mono.from(wrapped.bulkWrite(unwrap(clientSession), clientWriteModels, options)).contextWrite(CONTEXT)
+                .block(TIMEOUT_DURATION));
     }
 
     private com.mongodb.reactivestreams.client.ClientSession unwrap(final ClientSession clientSession) {
diff --git a/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/PublisherApiTest.java b/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/PublisherApiTest.java
index 5839a7efd8d..09f77743cde 100644
--- a/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/PublisherApiTest.java
+++ b/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/PublisherApiTest.java
@@ -51,7 +51,7 @@ public class PublisherApiTest {
     List<DynamicTest> testPublisherApiMatchesSyncApi() {
         return asList(
                 dynamicTest("Client Session Api", () -> assertApis(com.mongodb.client.ClientSession.class, ClientSession.class)),
-// BULK-TODO uncomment dynamicTest("MongoClient Api", () -> assertApis(com.mongodb.client.MongoClient.class, MongoClient.class)),
+                dynamicTest("MongoClient Api", () -> assertApis(com.mongodb.client.MongoClient.class, MongoClient.class)),
                 dynamicTest("MongoDatabase Api", () -> assertApis(com.mongodb.client.MongoDatabase.class, MongoDatabase.class)),
                 dynamicTest("MongoCollection Api", () -> assertApis(com.mongodb.client.MongoCollection.class, MongoCollection.class)),
                 dynamicTest("Aggregate Api", () -> assertApis(AggregateIterable.class, AggregatePublisher.class)),
diff --git a/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/MongoOperationPublisherTest.java b/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/MongoOperationPublisherTest.java
index 42d6bb14c5c..1c096748c11 100644
--- a/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/MongoOperationPublisherTest.java
+++ b/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/MongoOperationPublisherTest.java
@@ -31,6 +31,7 @@
 
 import java.util.concurrent.TimeUnit;
 
+import static com.mongodb.ClusterFixture.TIMEOUT;
 import static com.mongodb.ClusterFixture.TIMEOUT_SETTINGS_WITH_TIMEOUT;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -113,7 +114,7 @@ public void withReadPreference() {
 
     @Test
     public void withTimeout() {
-        assertEquals(DEFAULT_MOP, DEFAULT_MOP.withTimeout(60_000, TimeUnit.MILLISECONDS));
+        assertEquals(DEFAULT_MOP, DEFAULT_MOP.withTimeout(TIMEOUT, TimeUnit.SECONDS));
         assertEquals(1000, DEFAULT_MOP.withTimeout(1000, TimeUnit.MILLISECONDS).getTimeoutMS());
         assertThrows(IllegalArgumentException.class, () -> DEFAULT_MOP.withTimeout(500, TimeUnit.NANOSECONDS));
     }
diff --git a/driver-scala/src/test/scala/org/mongodb/scala/MongoClientSpec.scala b/driver-scala/src/test/scala/org/mongodb/scala/MongoClientSpec.scala
index 4c721ed8774..d5516b984ae 100644
--- a/driver-scala/src/test/scala/org/mongodb/scala/MongoClientSpec.scala
+++ b/driver-scala/src/test/scala/org/mongodb/scala/MongoClientSpec.scala
@@ -35,7 +35,12 @@ class MongoClientSpec extends BaseSpec with MockitoSugar {
 
     wrapped.foreach((name: String) => {
       val cleanedName = name.stripPrefix("get")
-      assert(local.contains(name) | local.contains(cleanedName.head.toLower + cleanedName.tail), s"Missing: $name")
+
+      // TODO("BULK-TODO remove this if when bulkWrite is implemented and uncomment line 43")
+      if (!cleanedName.contains("bulkWrite")) {
+        assert(local.contains(name) | local.contains(cleanedName.head.toLower + cleanedName.tail), s"Missing: $name")
+      }
+      // assert(local.contains(name) | local.contains(cleanedName.head.toLower + cleanedName.tail), s"Missing: $name")
     })
   }
 
diff --git a/driver-sync/src/test/functional/com/mongodb/client/CrudProseTest.java b/driver-sync/src/test/functional/com/mongodb/client/CrudProseTest.java
index 72e2fdea0e0..101865079e0 100644
--- a/driver-sync/src/test/functional/com/mongodb/client/CrudProseTest.java
+++ b/driver-sync/src/test/functional/com/mongodb/client/CrudProseTest.java
@@ -32,8 +32,8 @@
 import com.mongodb.client.model.Updates;
 import com.mongodb.client.model.ValidationOptions;
 import com.mongodb.client.model.bulk.ClientBulkWriteOptions;
-import com.mongodb.client.model.bulk.ClientNamespacedWriteModel;
 import com.mongodb.client.model.bulk.ClientBulkWriteResult;
+import com.mongodb.client.model.bulk.ClientNamespacedWriteModel;
 import com.mongodb.event.CommandStartedEvent;
 import com.mongodb.internal.connection.TestCommandListener;
 import org.bson.BsonArray;
@@ -171,7 +171,7 @@ void testBulkWriteSplitsWhenExceedingMaxWriteBatchSize() {
             int maxWriteBatchSize = droppedDatabase(client).runCommand(new Document("hello", 1)).getInteger("maxWriteBatchSize");
             ClientBulkWriteResult result = client.bulkWrite(nCopies(
                     maxWriteBatchSize + 1,
-                    ClientNamespacedWriteModel.insertOne(NAMESPACE, new Document("a", "b"))));
+                    insertOne(NAMESPACE, new Document("a", "b"))));
             assertEquals(maxWriteBatchSize + 1, result.getInsertedCount());
             List<CommandStartedEvent> startedBulkWriteCommandEvents = commandListener.getCommandStartedEvents("bulkWrite");
             assertEquals(2, startedBulkWriteCommandEvents.size());
@@ -193,7 +193,7 @@ void testBulkWriteSplitsWhenExceedingMaxMessageSizeBytes() {
             Document helloResponse = droppedDatabase(client).runCommand(new Document("hello", 1));
             int maxBsonObjectSize = helloResponse.getInteger("maxBsonObjectSize");
             int maxMessageSizeBytes = helloResponse.getInteger("maxMessageSizeBytes");
-            ClientNamespacedWriteModel model = ClientNamespacedWriteModel.insertOne(
+            ClientNamespacedWriteModel model = insertOne(
                     NAMESPACE,
                     new Document("a", join("", nCopies(maxBsonObjectSize - 500, "b"))));
             int numModels = maxMessageSizeBytes / maxBsonObjectSize + 1;
@@ -227,7 +227,7 @@ protected void testBulkWriteCollectsWriteConcernErrorsAcrossBatches() throws Int
                 .addCommandListener(commandListener));
              FailPoint ignored = FailPoint.enable(failPointDocument, getPrimary())) {
             int maxWriteBatchSize = droppedDatabase(client).runCommand(new Document("hello", 1)).getInteger("maxWriteBatchSize");
-            ClientNamespacedWriteModel model = ClientNamespacedWriteModel.insertOne(NAMESPACE, new Document("a", "b"));
+            ClientNamespacedWriteModel model = insertOne(NAMESPACE, new Document("a", "b"));
             int numModels = maxWriteBatchSize + 1;
             ClientBulkWriteException error = assertThrows(ClientBulkWriteException.class, () ->
                     client.bulkWrite(nCopies(numModels, model)));
@@ -253,7 +253,7 @@ protected void testBulkWriteHandlesWriteErrorsAcrossBatches(final boolean ordere
             Document document = new Document("_id", 1);
             MongoCollection<Document> collection = droppedCollection(client, Document.class);
             collection.insertOne(document);
-            ClientNamespacedWriteModel model = ClientNamespacedWriteModel.insertOne(collection.getNamespace(), document);
+            ClientNamespacedWriteModel model = insertOne(collection.getNamespace(), document);
             int numModels = maxWriteBatchSize + 1;
             ClientBulkWriteException error = assertThrows(ClientBulkWriteException.class, () ->
                     client.bulkWrite(nCopies(numModels, model), clientBulkWriteOptions().ordered(ordered)));
@@ -305,7 +305,8 @@ private void assertBulkWriteHandlesCursorRequiringGetMore(final boolean transact
                                 clientUpdateOptions().upsert(true))),
                         clientBulkWriteOptions().verboseResults(true)
                 );
-                ClientBulkWriteResult result = transaction ? session.withTransaction(action::get) : action.get();
+
+                ClientBulkWriteResult result = transaction ? runInTransaction(session, action) : action.get();
                 assertEquals(2, result.getUpsertedCount());
                 assertEquals(2, result.getVerboseResults().orElseThrow(Assertions::fail).getUpdateResults().size());
                 assertEquals(1, commandListener.getCommandStartedEvents("bulkWrite").size());
@@ -322,7 +323,7 @@ protected void testBulkWriteSplitsWhenExceedingMaxMessageSizeBytesDueToNsInfo()
                 () -> {
                     // Case 1: No batch-splitting required
                     testBulkWriteSplitsWhenExceedingMaxMessageSizeBytesDueToNsInfo((client, models, commandListener) -> {
-                        models.add(ClientNamespacedWriteModel.insertOne(NAMESPACE, new Document("a", "b")));
+                        models.add(insertOne(NAMESPACE, new Document("a", "b")));
                         ClientBulkWriteResult result = client.bulkWrite(models);
                         assertEquals(models.size(), result.getInsertedCount());
                         List<CommandStartedEvent> startedBulkWriteCommandEvents = commandListener.getCommandStartedEvents("bulkWrite");
@@ -339,7 +340,7 @@ protected void testBulkWriteSplitsWhenExceedingMaxMessageSizeBytesDueToNsInfo()
                     // Case 2: Batch-splitting required
                     testBulkWriteSplitsWhenExceedingMaxMessageSizeBytesDueToNsInfo((client, models, commandListener) -> {
                         MongoNamespace namespace = new MongoNamespace(NAMESPACE.getDatabaseName(), join("", nCopies(200, "c")));
-                        models.add(ClientNamespacedWriteModel.insertOne(namespace, new Document("a", "b")));
+                        models.add(insertOne(namespace, new Document("a", "b")));
                         ClientBulkWriteResult result = client.bulkWrite(models);
                         assertEquals(models.size(), result.getInsertedCount());
                         List<CommandStartedEvent> startedBulkWriteCommandEvents = commandListener.getCommandStartedEvents("bulkWrite");
@@ -371,11 +372,11 @@ private void testBulkWriteSplitsWhenExceedingMaxMessageSizeBytesDueToNsInfo(
             int remainderBytes = opsBytes % maxBsonObjectSize;
             List<ClientNamespacedWriteModel> models = new ArrayList<>(nCopies(
                     numModels,
-                    ClientNamespacedWriteModel.insertOne(
+                    insertOne(
                             NAMESPACE,
                             new Document("a", join("", nCopies(maxBsonObjectSize - 57, "b"))))));
             if (remainderBytes >= 217) {
-                models.add(ClientNamespacedWriteModel.insertOne(
+                models.add(insertOne(
                         NAMESPACE,
                         new Document("a", join("", nCopies(remainderBytes - 57, "b")))));
             }
@@ -394,13 +395,13 @@ protected void testBulkWriteSplitsErrorsForTooLargeOpsOrNsInfo(final String tooL
             ClientNamespacedWriteModel model;
             switch (tooLarge) {
                 case "document": {
-                    model = ClientNamespacedWriteModel.insertOne(
+                    model = insertOne(
                             NAMESPACE,
                             new Document("a", join("", nCopies(maxMessageSizeBytes, "b"))));
                     break;
                 }
                 case "namespace": {
-                    model = ClientNamespacedWriteModel.insertOne(
+                    model = insertOne(
                             new MongoNamespace(NAMESPACE.getDatabaseName(), join("", nCopies(maxMessageSizeBytes, "b"))),
                             new Document("a", "b"));
                     break;
@@ -429,8 +430,9 @@ protected void testBulkWriteErrorsForAutoEncryption() {
             assertTrue(
                     assertThrows(
                             IllegalStateException.class,
-                            () -> client.bulkWrite(singletonList(ClientNamespacedWriteModel.insertOne(NAMESPACE, new Document("a", "b")))))
-                    .getMessage().contains("bulkWrite does not currently support automatic encryption"));
+                            () -> client.bulkWrite(singletonList(insertOne(NAMESPACE, new Document("a", "b"))))
+                    ).getMessage().contains("bulkWrite does not currently support automatic encryption")
+            );
         }
     }
 
@@ -447,7 +449,7 @@ protected void testWriteConcernOfAllBatchesWhenUnacknowledgedRequested() {
             Document helloResponse = database.runCommand(new Document("hello", 1));
             int maxBsonObjectSize = helloResponse.getInteger("maxBsonObjectSize");
             int maxMessageSizeBytes = helloResponse.getInteger("maxMessageSizeBytes");
-            ClientNamespacedWriteModel model = ClientNamespacedWriteModel.insertOne(
+            ClientNamespacedWriteModel model = insertOne(
                     NAMESPACE,
                     new Document("a", join("", nCopies(maxBsonObjectSize - 500, "b"))));
             int numModels = maxMessageSizeBytes / maxBsonObjectSize + 1;
@@ -594,4 +596,21 @@ public int getV() {
     private interface TriConsumer<A1, A2, A3> {
         void accept(A1 a1, A2 a2, A3 a3);
     }
+
+    /**
+     * This method is used instead of {@link ClientSession#withTransaction(TransactionBody)}
+     * because reactive {@code com.mongodb.reactivestreams.client.ClientSession} do not support it.
+     */
+    private static ClientBulkWriteResult runInTransaction(final ClientSession session,
+                                                          final Supplier<ClientBulkWriteResult> action) {
+        session.startTransaction();
+        try {
+            ClientBulkWriteResult result = action.get();
+            session.commitTransaction();
+            return result;
+        } catch (Throwable throwable) {
+            session.abortTransaction();
+            throw throwable;
+        }
+    }
 }
diff --git a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTest.java b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTest.java
index 27ebf2a76bd..1b7c3a40716 100644
--- a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTest.java
+++ b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTest.java
@@ -21,10 +21,6 @@
 import com.mongodb.MongoNamespace;
 import com.mongodb.ReadPreference;
 import com.mongodb.UnixServerAddress;
-import com.mongodb.client.unified.UnifiedTestModifications.TestDef;
-import com.mongodb.event.TestServerMonitorListener;
-import com.mongodb.internal.logging.LogMessage;
-import com.mongodb.logging.TestLoggingInterceptor;
 import com.mongodb.WriteConcern;
 import com.mongodb.client.ClientSession;
 import com.mongodb.client.MongoClient;
@@ -32,16 +28,20 @@
 import com.mongodb.client.gridfs.GridFSBucket;
 import com.mongodb.client.model.Filters;
 import com.mongodb.client.test.CollectionHelper;
+import com.mongodb.client.unified.UnifiedTestModifications.TestDef;
 import com.mongodb.client.vault.ClientEncryption;
 import com.mongodb.connection.ClusterDescription;
 import com.mongodb.connection.ClusterType;
 import com.mongodb.connection.ServerDescription;
 import com.mongodb.event.CommandEvent;
 import com.mongodb.event.CommandStartedEvent;
+import com.mongodb.event.TestServerMonitorListener;
 import com.mongodb.internal.connection.TestCommandListener;
 import com.mongodb.internal.connection.TestConnectionPoolListener;
+import com.mongodb.internal.logging.LogMessage;
 import com.mongodb.lang.NonNull;
 import com.mongodb.lang.Nullable;
+import com.mongodb.logging.TestLoggingInterceptor;
 import com.mongodb.test.AfterBeforeParameterResolver;
 import org.bson.BsonArray;
 import org.bson.BsonBoolean;
@@ -279,16 +279,7 @@ protected void postSetUp(final TestDef def) {
     @AfterEach
     public void cleanUp() {
         for (FailPoint failPoint : failPoints) {
-            try {
-                // BULK-TODO remove the try-catch block
-                failPoint.disableFailPoint();
-            } catch (Throwable e) {
-                for (Throwable suppressed : e.getSuppressed()) {
-                    if (suppressed instanceof TestAbortedException) {
-                        throw (TestAbortedException) suppressed;
-                    }
-                }
-            }
+            failPoint.disableFailPoint();
         }
         entities.close();
         postCleanUp(testDef);
@@ -414,11 +405,15 @@ private static void assertOperationResult(final UnifiedTestContext context, fina
             final OperationResult result) {
         if (result.getException() instanceof org.opentest4j.TestAbortedException) {
             // BULK-TODO remove
-            throw (org.opentest4j.TestAbortedException) result.getException();
+            if (result.getException().getMessage().contains("BULK-TODO Kotlin implement")) {
+                throw (org.opentest4j.TestAbortedException) result.getException();
+            }
         }
         if (result.getException() instanceof org.junit.AssumptionViolatedException) {
             // BULK-TODO remove
-            throw (org.junit.AssumptionViolatedException) result.getException();
+            if (result.getException().getMessage().contains("BULK-TODO Kotlin implement")) {
+                throw (org.junit.AssumptionViolatedException) result.getException();
+            }
         }
         context.getAssertionContext().push(ContextElement.ofCompletedOperation(operation, result, operationIndex));
 
diff --git a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTestModifications.java b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTestModifications.java
index 3687f156c42..67bf394d6cb 100644
--- a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTestModifications.java
+++ b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTestModifications.java
@@ -45,7 +45,6 @@ public static void doSkips(final TestDef def) {
                 .directory("atlas-data-lake-testing");
 
         // change-streams
-
         def.skipNoncompliantReactive("error required from change stream initialization") // TODO reason?
                 .test("change-streams", "change-streams", "Test with document comment - pre 4.4");
         def.skipNoncompliantReactive("event sensitive tests") // TODO reason?
@@ -198,24 +197,11 @@ public static void doSkips(final TestDef def) {
                 .test("retryable-writes", "findOneAndDelete-errorLabels", "FindOneAndDelete succeeds after WriteConcernError ShutdownInProgress")
                 .test("retryable-writes", "findOneAndReplace-errorLabels", "FindOneAndReplace succeeds after WriteConcernError ShutdownInProgress")
                 //.testContains("retryable-writes", "succeeds after retryable writeConcernError")
-                .test("retryable-writes", "client bulkWrite retryable writes", "client bulkWrite with no multi: true operations succeeds after retryable writeConcernError")
                 .test("retryable-writes", "retryable-writes insertOne serverErrors", "InsertOne succeeds after retryable writeConcernError")
                 .test("retryable-writes", "retryable-writes bulkWrite serverErrors", "BulkWrite succeeds after retryable writeConcernError in first batch");
         def.skipJira("https://jira.mongodb.org/browse/JAVA-5341")
                 .when(() -> isDiscoverableReplicaSet() && serverVersionLessThan(4, 4))
                 .test("retryable-writes", "retryable-writes insertOne serverErrors", "RetryableWriteError label is added based on writeConcernError in pre-4.4 mongod response");
-        def.skipJira("https://jira.mongodb.org/browse/JAVA-4586")
-                //.testContains("retryable-writes", "client bulkWrite")
-                .test("retryable-writes", "client bulkWrite retryable writes", "client bulkWrite with no multi: true operations succeeds after retryable top-level error")
-                .test("retryable-writes", "client bulkWrite retryable writes", "client bulkWrite with multi: true operations fails after retryable top-level error")
-                .test("retryable-writes", "client bulkWrite retryable writes", "client bulkWrite with no multi: true operations succeeds after retryable writeConcernError")
-                .test("retryable-writes", "client bulkWrite retryable writes", "client bulkWrite with multi: true operations fails after retryable writeConcernError")
-                .test("retryable-writes", "client bulkWrite retryable writes", "client bulkWrite with retryWrites: false does not retry")
-                .test("retryable-writes", "client bulkWrite retryable writes with client errors", "client bulkWrite with one network error succeeds after retry")
-                .test("retryable-writes", "client bulkWrite retryable writes with client errors", "client bulkWrite with two network errors fails after retry")
-                //.testContains("retryable-writes", "client.clientBulkWrite")
-                .test("retryable-writes", "retryable writes handshake failures", "client.clientBulkWrite succeeds after retryable handshake network error")
-                .test("retryable-writes", "retryable writes handshake failures", "client.clientBulkWrite succeeds after retryable handshake server error (ShutdownInProgress)");
 
         // server-discovery-and-monitoring (SDAM)