Skip to content

Commit b9e1f75

Browse files
authored
Merge pull request #14348 from Automattic/vkarpov15/gh-14331
fix(cursor): make aggregation cursor support `transform` option to match query cursor
2 parents 1f340d4 + a6790a1 commit b9e1f75

File tree

2 files changed

+35
-0
lines changed

2 files changed

+35
-0
lines changed

lib/cursor/aggregationCursor.js

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,12 +57,20 @@ util.inherits(AggregationCursor, Readable);
5757
function _init(model, c, agg) {
5858
if (!model.collection.buffer) {
5959
model.hooks.execPre('aggregate', agg, function() {
60+
if (typeof agg.options?.cursor?.transform === 'function') {
61+
c._transforms.push(agg.options.cursor.transform);
62+
}
63+
6064
c.cursor = model.collection.aggregate(agg._pipeline, agg.options || {});
6165
c.emit('cursor', c.cursor);
6266
});
6367
} else {
6468
model.collection.emitter.once('queue', function() {
6569
model.hooks.execPre('aggregate', agg, function() {
70+
if (typeof agg.options?.cursor?.transform === 'function') {
71+
c._transforms.push(agg.options.cursor.transform);
72+
}
73+
6674
c.cursor = model.collection.aggregate(agg._pipeline, agg.options || {});
6775
c.emit('cursor', c.cursor);
6876
});

test/aggregate.test.js

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
const start = require('./common');
88

99
const assert = require('assert');
10+
const stream = require('stream');
1011

1112
const Aggregate = require('../lib/aggregate');
1213

@@ -1215,6 +1216,32 @@ describe('aggregate: ', function() {
12151216
assert.equal(res[1].test, 'a test');
12161217
});
12171218

1219+
it('cursor supports transform option (gh-14331)', async function() {
1220+
const mySchema = new Schema({ name: String });
1221+
const Test = db.model('Test', mySchema);
1222+
1223+
await Test.deleteMany({});
1224+
await Test.create([{ name: 'Apple' }, { name: 'Apple' }]);
1225+
1226+
let resolve;
1227+
const waitForStream = new Promise(innerResolve => {
1228+
resolve = innerResolve;
1229+
});
1230+
const otherStream = new stream.Writable({
1231+
write(chunk, encoding, callback) {
1232+
resolve(chunk.toString());
1233+
callback();
1234+
}
1235+
});
1236+
1237+
await Test.
1238+
aggregate([{ $match: { name: 'Apple' } }]).
1239+
cursor({ transform: JSON.stringify }).
1240+
pipe(otherStream);
1241+
const streamValue = await waitForStream;
1242+
assert.ok(streamValue.includes('"name":"Apple"'), streamValue);
1243+
});
1244+
12181245
describe('Mongo 3.6 options', function() {
12191246
before(async function() {
12201247
await onlyTestAtOrAbove('3.6', this);

0 commit comments

Comments
 (0)