|
17 | 17 | package com.mongodb.internal.connection;
|
18 | 18 |
|
19 | 19 | import com.mongodb.MongoClientException;
|
| 20 | +import com.mongodb.MongoInternalException; |
20 | 21 | import com.mongodb.MongoNamespace;
|
21 | 22 | import com.mongodb.ReadPreference;
|
22 | 23 | import com.mongodb.ServerApi;
|
|
31 | 32 | import org.bson.BsonElement;
|
32 | 33 | import org.bson.BsonInt64;
|
33 | 34 | import org.bson.BsonString;
|
| 35 | +import org.bson.ByteBuf; |
34 | 36 | import org.bson.FieldNameValidator;
|
35 | 37 | import org.bson.io.BsonOutput;
|
36 | 38 |
|
| 39 | +import java.io.ByteArrayOutputStream; |
| 40 | +import java.io.UnsupportedEncodingException; |
37 | 41 | import java.nio.charset.StandardCharsets;
|
38 | 42 | import java.util.ArrayList;
|
39 | 43 | import java.util.List;
|
|
47 | 51 | import static com.mongodb.connection.ServerType.SHARD_ROUTER;
|
48 | 52 | import static com.mongodb.connection.ServerType.STANDALONE;
|
49 | 53 | import static com.mongodb.internal.connection.BsonWriterHelper.writePayload;
|
| 54 | +import static com.mongodb.internal.connection.ByteBufBsonDocument.createList; |
| 55 | +import static com.mongodb.internal.connection.ByteBufBsonDocument.createOne; |
50 | 56 | import static com.mongodb.internal.connection.ReadConcernHelper.getReadConcernDocument;
|
51 | 57 | import static com.mongodb.internal.operation.ServerVersionHelper.FOUR_DOT_TWO_WIRE_VERSION;
|
52 | 58 | import static com.mongodb.internal.operation.ServerVersionHelper.FOUR_DOT_ZERO_WIRE_VERSION;
|
@@ -108,30 +114,76 @@ public final class CommandMessage extends RequestMessage {
|
108 | 114 | this.serverApi = serverApi;
|
109 | 115 | }
|
110 | 116 |
|
| 117 | + /** |
| 118 | + * Create a BsonDocument representing the logical document encoded by an OP_MSG. |
| 119 | + * <p> |
| 120 | + * The returned document will contain all the fields from the Body (Kind 0) Section, as well as all fields represented by |
| 121 | + * OP_MSG Document Sequence (Kind 1) Sections. |
| 122 | + */ |
111 | 123 | BsonDocument getCommandDocument(final ByteBufferBsonOutput bsonOutput) {
|
112 |
| - ByteBufBsonDocument byteBufBsonDocument = ByteBufBsonDocument.createOne(bsonOutput, |
113 |
| - getEncodingMetadata().getFirstDocumentPosition()); |
114 |
| - BsonDocument commandBsonDocument; |
115 |
| - |
116 |
| - if (containsPayload()) { |
117 |
| - commandBsonDocument = byteBufBsonDocument.toBaseBsonDocument(); |
118 |
| - |
119 |
| - int payloadStartPosition = getEncodingMetadata().getFirstDocumentPosition() |
120 |
| - + byteBufBsonDocument.getSizeInBytes() |
121 |
| - + 1 // payload type |
122 |
| - + 4 // payload size |
123 |
| - + payload.getPayloadName().getBytes(StandardCharsets.UTF_8).length + 1; // null-terminated UTF-8 payload name |
124 |
| - commandBsonDocument.append(payload.getPayloadName(), |
125 |
| - new BsonArray(ByteBufBsonDocument.createList(bsonOutput, payloadStartPosition))); |
126 |
| - } else { |
127 |
| - commandBsonDocument = byteBufBsonDocument; |
| 124 | + List<ByteBuf> byteBuffers = bsonOutput.getByteBuffers(); |
| 125 | + try { |
| 126 | + CompositeByteBuf byteBuf = new CompositeByteBuf(byteBuffers); |
| 127 | + try { |
| 128 | + byteBuf.position(getEncodingMetadata().getFirstDocumentPosition()); |
| 129 | + ByteBufBsonDocument byteBufBsonDocument = createOne(byteBuf); |
| 130 | + |
| 131 | + // If true, it means there is at least one Kind 1:Document Sequence in the OP_MSG |
| 132 | + if (byteBuf.hasRemaining()) { |
| 133 | + BsonDocument commandBsonDocument = byteBufBsonDocument.toBaseBsonDocument(); |
| 134 | + |
| 135 | + // Each loop iteration processes one Document Sequence |
| 136 | + // When there are no more bytes remaining, there are no more Document Sequences |
| 137 | + while (byteBuf.hasRemaining()) { |
| 138 | + // skip reading the payload type, we know it is 1 |
| 139 | + byteBuf.position(byteBuf.position() + 1); |
| 140 | + int sequenceStart = byteBuf.position(); |
| 141 | + int sequenceSizeInBytes = byteBuf.getInt(); |
| 142 | + int sectionEnd = sequenceStart + sequenceSizeInBytes; |
| 143 | + |
| 144 | + String fieldName = getSequenceIdentifier(byteBuf); |
| 145 | + // If this assertion fires, it means that the driver has started using document sequences for nested fields. If |
| 146 | + // so, this method will need to change in order to append the value to the correct nested document. |
| 147 | + assertFalse(fieldName.contains(".")); |
| 148 | + |
| 149 | + ByteBuf documentsByteBufSlice = byteBuf.duplicate().limit(sectionEnd); |
| 150 | + try { |
| 151 | + commandBsonDocument.append(fieldName, new BsonArray(createList(documentsByteBufSlice))); |
| 152 | + } finally { |
| 153 | + documentsByteBufSlice.release(); |
| 154 | + } |
| 155 | + byteBuf.position(sectionEnd); |
| 156 | + } |
| 157 | + return commandBsonDocument; |
| 158 | + } else { |
| 159 | + return byteBufBsonDocument; |
| 160 | + } |
| 161 | + } finally { |
| 162 | + byteBuf.release(); |
| 163 | + } |
| 164 | + } finally { |
| 165 | + byteBuffers.forEach(ByteBuf::release); |
128 | 166 | }
|
129 |
| - |
130 |
| - return commandBsonDocument; |
131 | 167 | }
|
132 | 168 |
|
133 |
| - boolean containsPayload() { |
134 |
| - return payload != null; |
| 169 | + /** |
| 170 | + * Get the field name from a buffer positioned at the start of the document sequence identifier of an OP_MSG Section of type |
| 171 | + * Document Sequence (Kind 1). |
| 172 | + * <p> |
| 173 | + * Upon normal completion of the method, the buffer will be positioned at the start of the first BSON object in the sequence. |
| 174 | + */ |
| 175 | + private String getSequenceIdentifier(final ByteBuf byteBuf) { |
| 176 | + ByteArrayOutputStream sequenceIdentifierBytes = new ByteArrayOutputStream(); |
| 177 | + byte curByte = byteBuf.get(); |
| 178 | + while (curByte != 0) { |
| 179 | + sequenceIdentifierBytes.write(curByte); |
| 180 | + curByte = byteBuf.get(); |
| 181 | + } |
| 182 | + try { |
| 183 | + return sequenceIdentifierBytes.toString(StandardCharsets.UTF_8.name()); |
| 184 | + } catch (UnsupportedEncodingException e) { |
| 185 | + throw new MongoInternalException("Unexpected exception", e); |
| 186 | + } |
135 | 187 | }
|
136 | 188 |
|
137 | 189 | boolean isResponseExpected() {
|
|
0 commit comments