|
30 | 30 | import org.bson.BsonElement;
|
31 | 31 | import org.bson.BsonInt64;
|
32 | 32 | import org.bson.BsonString;
|
| 33 | +import org.bson.ByteBuf; |
33 | 34 | import org.bson.FieldNameValidator;
|
34 | 35 | import org.bson.io.BsonOutput;
|
35 | 36 |
|
36 | 37 | import java.nio.charset.StandardCharsets;
|
37 | 38 | import java.util.ArrayList;
|
38 | 39 | import java.util.List;
|
| 40 | +import java.util.stream.IntStream; |
39 | 41 |
|
40 | 42 | import static com.mongodb.ReadPreference.primary;
|
41 | 43 | import static com.mongodb.ReadPreference.primaryPreferred;
|
|
46 | 48 | import static com.mongodb.connection.ServerType.SHARD_ROUTER;
|
47 | 49 | import static com.mongodb.connection.ServerType.STANDALONE;
|
48 | 50 | import static com.mongodb.internal.connection.BsonWriterHelper.writePayload;
|
| 51 | +import static com.mongodb.internal.connection.ByteBufBsonDocument.createList; |
| 52 | +import static com.mongodb.internal.connection.ByteBufBsonDocument.createOne; |
49 | 53 | import static com.mongodb.internal.connection.ReadConcernHelper.getReadConcernDocument;
|
50 | 54 | import static com.mongodb.internal.operation.ServerVersionHelper.FOUR_DOT_TWO_WIRE_VERSION;
|
51 | 55 | import static com.mongodb.internal.operation.ServerVersionHelper.FOUR_DOT_ZERO_WIRE_VERSION;
|
@@ -107,30 +111,49 @@ public final class CommandMessage extends RequestMessage {
|
107 | 111 | this.serverApi = serverApi;
|
108 | 112 | }
|
109 | 113 |
|
| 114 | + // Create a BsonDocument representing the logical document encoded by an OP_MSG. |
| 115 | + // The returned document will contain all the fields, including ones represented by |
| 116 | + // OP_MSG Sections of type Kind 1: Document Sequence |
110 | 117 | BsonDocument getCommandDocument(final ByteBufferBsonOutput bsonOutput) {
|
111 |
| - ByteBufBsonDocument byteBufBsonDocument = ByteBufBsonDocument.createOne(bsonOutput, |
112 |
| - getEncodingMetadata().getFirstDocumentPosition()); |
113 |
| - BsonDocument commandBsonDocument; |
114 |
| - |
115 |
| - if (containsPayload()) { |
116 |
| - commandBsonDocument = byteBufBsonDocument.toBaseBsonDocument(); |
117 |
| - |
118 |
| - int payloadStartPosition = getEncodingMetadata().getFirstDocumentPosition() |
119 |
| - + byteBufBsonDocument.getSizeInBytes() |
120 |
| - + 1 // payload type |
121 |
| - + 4 // payload size |
122 |
| - + payload.getPayloadName().getBytes(StandardCharsets.UTF_8).length + 1; // null-terminated UTF-8 payload name |
123 |
| - commandBsonDocument.append(payload.getPayloadName(), |
124 |
| - new BsonArray(ByteBufBsonDocument.createList(bsonOutput, payloadStartPosition))); |
125 |
| - } else { |
126 |
| - commandBsonDocument = byteBufBsonDocument; |
| 118 | + List<ByteBuf> duplicateByteBuffers = bsonOutput.getByteBuffers(); |
| 119 | + try { |
| 120 | + CompositeByteBuf outputByteBuf = new CompositeByteBuf(duplicateByteBuffers); |
| 121 | + outputByteBuf.position(getEncodingMetadata().getFirstDocumentPosition()); |
| 122 | + ByteBufBsonDocument byteBufBsonDocument = createOne(outputByteBuf); |
| 123 | + |
| 124 | + if (outputByteBuf.hasRemaining()) { |
| 125 | + BsonDocument commandBsonDocument = byteBufBsonDocument.toBaseBsonDocument(); |
| 126 | + |
| 127 | + while (outputByteBuf.hasRemaining()) { |
| 128 | + outputByteBuf.position(outputByteBuf.position() + 1 /* payload type */ + 4 /* payload size */); |
| 129 | + String payloadName = getPayloadName(outputByteBuf); |
| 130 | + assertFalse(payloadName.contains(".")); |
| 131 | + commandBsonDocument.append(payloadName, new BsonArray(createList(outputByteBuf))); |
| 132 | + } |
| 133 | + return commandBsonDocument; |
| 134 | + } else { |
| 135 | + return byteBufBsonDocument; |
| 136 | + } |
| 137 | + } finally { |
| 138 | + duplicateByteBuffers.forEach(ByteBuf::release); |
127 | 139 | }
|
128 |
| - |
129 |
| - return commandBsonDocument; |
130 | 140 | }
|
131 | 141 |
|
132 |
| - boolean containsPayload() { |
133 |
| - return payload != null; |
| 142 | + // Get the field name from a ByteBuf positioned at the start of the document sequence identifier of an OP_MSG Section of type |
| 143 | + // Kind 1: Document Sequence. |
| 144 | + // Upon normal completion of the method, the ByteBuf will be positioned at the start of the first BSON object in the sequence. |
| 145 | + private String getPayloadName(final ByteBuf outputByteBuf) { |
| 146 | + List<Byte> payloadNameBytes = new ArrayList<>(); |
| 147 | + byte curByte = outputByteBuf.get(); |
| 148 | + while (curByte != 0) { |
| 149 | + payloadNameBytes.add(curByte); |
| 150 | + curByte = outputByteBuf.get(); |
| 151 | + } |
| 152 | + |
| 153 | + // Convert List<Byte> to byte[] |
| 154 | + byte[] byteArray = new byte[payloadNameBytes.size()]; |
| 155 | + IntStream.range(0, payloadNameBytes.size()).forEachOrdered(i -> byteArray[i] = payloadNameBytes.get(i)); |
| 156 | + return new String(byteArray, StandardCharsets.UTF_8); |
134 | 157 | }
|
135 | 158 |
|
136 | 159 | boolean isResponseExpected() {
|
|
0 commit comments