Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 34 additions & 6 deletions src/bulk/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ export class BulkWriteResult {
* Create a new BulkWriteResult instance
* @internal
*/
constructor(bulkResult: BulkResult) {
constructor(bulkResult: BulkResult, isOrdered: boolean) {
this.result = bulkResult;
this.insertedCount = this.result.nInserted ?? 0;
this.matchedCount = this.result.nMatched ?? 0;
Expand All @@ -206,6 +206,21 @@ export class BulkWriteResult {
this.upsertedCount = this.result.upserted.length ?? 0;
this.upsertedIds = BulkWriteResult.generateIdMap(this.result.upserted);
this.insertedIds = BulkWriteResult.generateIdMap(this.result.insertedIds);
if (isOrdered && this.result.writeErrors.length !== 0) {
const errIdx = this.result.writeErrors[0].index;
for (const index in this.insertedIds) {
if (Number(index) >= errIdx) {
delete this.insertedIds[index];
}
}
} else if (!isOrdered && this.result.writeErrors.length !== 0) {
for (let i = 0; i < this.result.writeErrors.length; i++) {
const index = this.result.writeErrors[i].index;
if (index in this.insertedIds) {
delete this.insertedIds[index];
}
}
}
Object.defineProperty(this, 'result', { value: this.result, enumerable: false });
}

Expand Down Expand Up @@ -479,7 +494,10 @@ function executeCommands(
callback: Callback<BulkWriteResult>
) {
if (bulkOperation.s.batches.length === 0) {
return callback(undefined, new BulkWriteResult(bulkOperation.s.bulkResult));
return callback(
undefined,
new BulkWriteResult(bulkOperation.s.bulkResult, bulkOperation.isOrdered)
);
}

const batch = bulkOperation.s.batches.shift() as Batch;
Expand All @@ -488,17 +506,26 @@ function executeCommands(
// Error is a driver related error not a bulk op error, return early
if (err && 'message' in err && !(err instanceof MongoWriteConcernError)) {
return callback(
new MongoBulkWriteError(err, new BulkWriteResult(bulkOperation.s.bulkResult))
new MongoBulkWriteError(
err,
new BulkWriteResult(bulkOperation.s.bulkResult, bulkOperation.isOrdered)
)
);
}

if (err instanceof MongoWriteConcernError) {
return handleMongoWriteConcernError(batch, bulkOperation.s.bulkResult, err, callback);
return handleMongoWriteConcernError(
batch,
bulkOperation.s.bulkResult,
bulkOperation.isOrdered,
err,
callback
);
}

// Merge the results together
mergeBatchResults(batch, bulkOperation.s.bulkResult, err, result);
const writeResult = new BulkWriteResult(bulkOperation.s.bulkResult);
const writeResult = new BulkWriteResult(bulkOperation.s.bulkResult, bulkOperation.isOrdered);
if (bulkOperation.handleWriteError(callback, writeResult)) return;

// Execute the next command in line
Expand Down Expand Up @@ -572,6 +599,7 @@ function executeCommands(
function handleMongoWriteConcernError(
batch: Batch,
bulkResult: BulkResult,
isOrdered: boolean,
err: MongoWriteConcernError,
callback: Callback<BulkWriteResult>
) {
Expand All @@ -583,7 +611,7 @@ function handleMongoWriteConcernError(
message: err.result?.writeConcernError.errmsg,
code: err.result?.writeConcernError.result
},
new BulkWriteResult(bulkResult)
new BulkWriteResult(bulkResult, isOrdered)
)
);
}
Expand Down
93 changes: 92 additions & 1 deletion test/integration/crud/bulk.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
type Collection,
Long,
MongoBatchReExecutionError,
MongoBulkWriteError,
type MongoClient,
MongoDriverError,
MongoInvalidArgumentError
Expand All @@ -31,7 +32,6 @@ describe('Bulk', function () {
.createCollection('test')
.catch(() => null); // make ns exist
});

afterEach(async function () {
const cleanup = this.configuration.newClient();
await cleanup
Expand Down Expand Up @@ -107,6 +107,97 @@ describe('Bulk', function () {
});
});

describe('BulkWriteResult', function () {
describe('insertedIds', function () {
describe('#insertMany()', function () {
context('BulkWriteResult should not include invalid insert in insertedIds', function () {
async function insertManyTryInvalidIds(input, isOrdered, indices) {
try {
const db = client.db();
const col = db.collection('test');
for (const index in indices) {
col.createIndex(index, { unique: true, sparse: false });
}
await col.insertMany(input, { ordered: isOrdered });
expect(false); // no error -> test fails
} catch (error) {
expect(error instanceof MongoBulkWriteError).to.equal(true);
expect(error.result.insertedCount).to.equal(
Object.keys(error.result.insertedIds).length
);
}
}
it('when passed 1 duplicate ID on an index', async function () {
await insertManyTryInvalidIds([{ a: 1 }, { a: 1 }], true, [{ a: 1 }]);
});
it('when, on an ordered insert, passed multiple duplicate IDs on an index', async function () {
await insertManyTryInvalidIds(
[{ a: 1 }, { b: 2 }, { a: 1 }, { a: 1 }, { c: 3 }],
true,
[{ a: 1 }]
);
});
it('when, on an unordered insert, passed multiple duplicate IDs on an index', async function () {
await insertManyTryInvalidIds(
[{ a: 1 }, { b: 2 }, { a: 1 }, { a: 1 }, { c: 3 }],
false,
[{ a: 1 }]
);
});
});
});
describe('#bulkWrite()', function () {
context('BulkWriteResult should not include invalid insert in insertedIds', function () {
async function bulkWriteTryInvalidIds(input, isOrdered, indices) {
try {
const db = client.db();
const col = db.collection('test');
for (const index in indices) {
col.createIndex(index, { unique: true, sparse: false });
}
await col.bulkWrite(input, { ordered: isOrdered });
expect(false);
} catch (error) {
expect(error instanceof MongoBulkWriteError).to.equal(true);
expect(error.result.insertedCount).to.equal(
Object.keys(error.result.insertedIds).length
);
}
}
it('when passed 1 duplicate ID on an index', async function () {
await bulkWriteTryInvalidIds([{ insertOne: { a: 1 } }, { insertOne: { a: 1 } }], true, [
{ a: 1 }
]);
});
it('when, on an ordered insert, passed multiple duplicate IDs on an index', async function () {
await bulkWriteTryInvalidIds(
[
{ insertOne: { a: 1 } },
{ insertOne: { a: 1 } },
{ insertOne: { a: 1 } },
{ insertOne: { a: 1 } }
],
true,
[{ a: 1 }]
);
});
it('when, on an unordered insert, passed multiple duplicate IDs on an index', async function () {
await bulkWriteTryInvalidIds(
[
{ insertOne: { a: 1 } },
{ insertOne: { a: 1 } },
{ insertOne: { a: 1 } },
{ insertOne: { a: 1 } }
],
false,
[{ a: 1 }]
);
});
});
});
});
});

it('Should correctly execute unordered bulk operation', async function () {
const db = client.db();
const bulk = db
Expand Down