Skip to content

Support any number of Document Sequence Sections in CommandMessage#getCommandDocument #1456

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jul 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,38 +53,31 @@ final class ByteBufBsonDocument extends BsonDocument {

private final transient ByteBuf byteBuf;

static List<ByteBufBsonDocument> createList(final ByteBufferBsonOutput bsonOutput, final int startPosition) {
List<ByteBuf> duplicateByteBuffers = bsonOutput.getByteBuffers();
CompositeByteBuf outputByteBuf = new CompositeByteBuf(duplicateByteBuffers);
outputByteBuf.position(startPosition);
/**
* Create a list of ByteBufBsonDocument from a buffer positioned at the start of the first document of an OP_MSG Section
* of type Document Sequence (Kind 1).
* <p>
* The provided buffer will be positioned at the end of the section upon normal completion of the method
*/
static List<ByteBufBsonDocument> createList(final ByteBuf outputByteBuf) {
List<ByteBufBsonDocument> documents = new ArrayList<>();
int curDocumentStartPosition = startPosition;
while (outputByteBuf.hasRemaining()) {
int documentSizeInBytes = outputByteBuf.getInt();
ByteBuf slice = outputByteBuf.duplicate();
slice.position(curDocumentStartPosition);
slice.limit(curDocumentStartPosition + documentSizeInBytes);
documents.add(new ByteBufBsonDocument(slice));
curDocumentStartPosition += documentSizeInBytes;
outputByteBuf.position(outputByteBuf.position() + documentSizeInBytes - 4);
}
for (ByteBuf byteBuffer : duplicateByteBuffers) {
byteBuffer.release();
ByteBufBsonDocument curDocument = createOne(outputByteBuf);
documents.add(curDocument);
}
return documents;
}

static ByteBufBsonDocument createOne(final ByteBufferBsonOutput bsonOutput, final int startPosition) {
List<ByteBuf> duplicateByteBuffers = bsonOutput.getByteBuffers();
CompositeByteBuf outputByteBuf = new CompositeByteBuf(duplicateByteBuffers);
outputByteBuf.position(startPosition);
/**
* Create a ByteBufBsonDocument from a buffer positioned at the start of a BSON document.
* The provided buffer will be positioned at the end of the document upon normal completion of the method
*/
static ByteBufBsonDocument createOne(final ByteBuf outputByteBuf) {
int documentStart = outputByteBuf.position();
int documentSizeInBytes = outputByteBuf.getInt();
ByteBuf slice = outputByteBuf.duplicate();
slice.position(startPosition);
slice.limit(startPosition + documentSizeInBytes);
for (ByteBuf byteBuffer : duplicateByteBuffers) {
byteBuffer.release();
}
int documentEnd = documentStart + documentSizeInBytes;
ByteBuf slice = outputByteBuf.duplicate().position(documentStart).limit(documentEnd);
outputByteBuf.position(documentEnd);
return new ByteBufBsonDocument(slice);
}

Expand Down Expand Up @@ -138,10 +131,6 @@ <T> T findInDocument(final Finder<T> finder) {
return finder.notFound();
}

int getSizeInBytes() {
return byteBuf.getInt(byteBuf.position());
}

BsonDocument toBaseBsonDocument() {
ByteBuf duplicateByteBuf = byteBuf.duplicate();
try (BsonBinaryReader bsonReader = new BsonBinaryReader(new ByteBufferBsonInput(duplicateByteBuf))) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.mongodb.internal.connection;

import com.mongodb.MongoClientException;
import com.mongodb.MongoInternalException;
import com.mongodb.MongoNamespace;
import com.mongodb.ReadPreference;
import com.mongodb.ServerApi;
Expand All @@ -30,9 +31,12 @@
import org.bson.BsonElement;
import org.bson.BsonInt64;
import org.bson.BsonString;
import org.bson.ByteBuf;
import org.bson.FieldNameValidator;
import org.bson.io.BsonOutput;

import java.io.ByteArrayOutputStream;
import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -46,6 +50,8 @@
import static com.mongodb.connection.ServerType.SHARD_ROUTER;
import static com.mongodb.connection.ServerType.STANDALONE;
import static com.mongodb.internal.connection.BsonWriterHelper.writePayload;
import static com.mongodb.internal.connection.ByteBufBsonDocument.createList;
import static com.mongodb.internal.connection.ByteBufBsonDocument.createOne;
import static com.mongodb.internal.connection.ReadConcernHelper.getReadConcernDocument;
import static com.mongodb.internal.operation.ServerVersionHelper.FOUR_DOT_TWO_WIRE_VERSION;
import static com.mongodb.internal.operation.ServerVersionHelper.FOUR_DOT_ZERO_WIRE_VERSION;
Expand Down Expand Up @@ -107,30 +113,76 @@ public final class CommandMessage extends RequestMessage {
this.serverApi = serverApi;
}

/**
* Create a BsonDocument representing the logical document encoded by an OP_MSG.
* <p>
* The returned document will contain all the fields from the Body (Kind 0) Section, as well as all fields represented by
* OP_MSG Document Sequence (Kind 1) Sections.
*/
BsonDocument getCommandDocument(final ByteBufferBsonOutput bsonOutput) {
ByteBufBsonDocument byteBufBsonDocument = ByteBufBsonDocument.createOne(bsonOutput,
getEncodingMetadata().getFirstDocumentPosition());
BsonDocument commandBsonDocument;

if (containsPayload()) {
commandBsonDocument = byteBufBsonDocument.toBaseBsonDocument();

int payloadStartPosition = getEncodingMetadata().getFirstDocumentPosition()
+ byteBufBsonDocument.getSizeInBytes()
+ 1 // payload type
+ 4 // payload size
+ payload.getPayloadName().getBytes(StandardCharsets.UTF_8).length + 1; // null-terminated UTF-8 payload name
commandBsonDocument.append(payload.getPayloadName(),
new BsonArray(ByteBufBsonDocument.createList(bsonOutput, payloadStartPosition)));
} else {
commandBsonDocument = byteBufBsonDocument;
List<ByteBuf> byteBuffers = bsonOutput.getByteBuffers();
try {
CompositeByteBuf byteBuf = new CompositeByteBuf(byteBuffers);
try {
byteBuf.position(getEncodingMetadata().getFirstDocumentPosition());
ByteBufBsonDocument byteBufBsonDocument = createOne(byteBuf);

// If true, it means there is at least one Kind 1:Document Sequence in the OP_MSG
if (byteBuf.hasRemaining()) {
BsonDocument commandBsonDocument = byteBufBsonDocument.toBaseBsonDocument();

// Each loop iteration processes one Document Sequence
// When there are no more bytes remaining, there are no more Document Sequences
while (byteBuf.hasRemaining()) {
// skip reading the payload type, we know it is 1
byteBuf.position(byteBuf.position() + 1);
int sequenceStart = byteBuf.position();
int sequenceSizeInBytes = byteBuf.getInt();
int sectionEnd = sequenceStart + sequenceSizeInBytes;

String fieldName = getSequenceIdentifier(byteBuf);
// If this assertion fires, it means that the driver has started using document sequences for nested fields. If
// so, this method will need to change in order to append the value to the correct nested document.
assertFalse(fieldName.contains("."));

ByteBuf documentsByteBufSlice = byteBuf.duplicate().limit(sectionEnd);
try {
commandBsonDocument.append(fieldName, new BsonArray(createList(documentsByteBufSlice)));
} finally {
documentsByteBufSlice.release();
}
byteBuf.position(sectionEnd);
}
return commandBsonDocument;
} else {
return byteBufBsonDocument;
}
} finally {
byteBuf.release();
}
} finally {
byteBuffers.forEach(ByteBuf::release);
}

return commandBsonDocument;
}

boolean containsPayload() {
return payload != null;
/**
* Get the field name from a buffer positioned at the start of the document sequence identifier of an OP_MSG Section of type
* Document Sequence (Kind 1).
* <p>
* Upon normal completion of the method, the buffer will be positioned at the start of the first BSON object in the sequence.
*/
private String getSequenceIdentifier(final ByteBuf byteBuf) {
ByteArrayOutputStream sequenceIdentifierBytes = new ByteArrayOutputStream();
byte curByte = byteBuf.get();
while (curByte != 0) {
sequenceIdentifierBytes.write(curByte);
curByte = byteBuf.get();
}
try {
return sequenceIdentifierBytes.toString(StandardCharsets.UTF_8.name());
} catch (UnsupportedEncodingException e) {
throw new MongoInternalException("Unexpected exception", e);
}
}

boolean isResponseExpected() {
Expand Down