Skip to content

Commit 91c865b

Browse files
darknosachrinza
authored andcommitted
feat: add transaction support
Signed-off by: Sergey Nosenko <[email protected]> Signed-off-by: Rifa Achrinza <[email protected]>
1 parent baf0759 commit 91c865b

File tree

2 files changed

+363
-30
lines changed

2 files changed

+363
-30
lines changed

lib/mongodb.js

Lines changed: 129 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,13 @@ exports.initialize = function initializeDataSource(dataSource, callback) {
105105

106106
s.safe = s.safe !== false;
107107
s.w = s.w || 1;
108+
s.writeConcern = s.writeConcern || {
109+
w: s.w,
110+
wtimeout: s.wtimeout || null,
111+
j: s.j || null,
112+
journal: s.journal || null,
113+
fsync: s.fsync || null,
114+
};
108115
s.url = s.url || generateMongoDBURL(s);
109116
s.useNewUrlParser = s.useNewUrlParser !== false;
110117
s.useUnifiedTopology = s.useUnifiedTopology !== false;
@@ -251,9 +258,6 @@ MongoDB.prototype.connect = function(callback) {
251258
'acceptableLatencyMS',
252259
'connectWithNoPrimary',
253260
'authSource',
254-
'w',
255-
'wtimeout',
256-
'j',
257261
'forceServerObjectId',
258262
'serializeFunctions',
259263
'ignoreUndefined',
@@ -278,13 +282,13 @@ MongoDB.prototype.connect = function(callback) {
278282
'password',
279283
'authMechanism',
280284
'compression',
281-
'fsync',
282285
'readPreferenceTags',
283286
'numberOfRetries',
284287
'auto_reconnect',
285288
'minSize',
286289
'useNewUrlParser',
287290
'useUnifiedTopology',
291+
'writeConcern',
288292
// Ignored options
289293
'native_parser',
290294
// Legacy options
@@ -293,6 +297,11 @@ MongoDB.prototype.connect = function(callback) {
293297
'replSet',
294298
'mongos',
295299
'db',
300+
'w',
301+
'wtimeout',
302+
'j',
303+
'journal',
304+
'fsync',
296305
];
297306

298307
const lbOptions = Object.keys(self.settings);
@@ -683,7 +692,7 @@ MongoDB.prototype.exists = function(modelName, id, options, callback) {
683692
debug('exists', modelName, id);
684693
}
685694
id = self.coerceId(modelName, id, options);
686-
this.execute(modelName, 'findOne', {_id: id}, function(err, data) {
695+
this.execute(modelName, 'findOne', {_id: id}, buildOptions({}, options), function(err, data) {
687696
if (self.debug) {
688697
debug('exists.callback', modelName, id, err, data);
689698
}
@@ -704,7 +713,7 @@ MongoDB.prototype.find = function find(modelName, id, options, callback) {
704713
}
705714
const idName = self.idName(modelName);
706715
const oid = self.coerceId(modelName, id, options);
707-
this.execute(modelName, 'findOne', {_id: oid}, function(err, data) {
716+
this.execute(modelName, 'findOne', {_id: oid}, buildOptions({}, options), function(err, data) {
708717
if (self.debug) {
709718
debug('find.callback', modelName, id, err, data);
710719
}
@@ -893,7 +902,7 @@ MongoDB.prototype.destroy = function destroy(modelName, id, options, callback) {
893902
debug('delete', modelName, id);
894903
}
895904
id = self.coerceId(modelName, id, options);
896-
this.execute(modelName, 'deleteOne', {_id: id}, function(err, result) {
905+
this.execute(modelName, 'deleteOne', {_id: id}, buildOptions({}, options), function(err, result) {
897906
if (self.debug) {
898907
debug('delete.callback', modelName, id, err, result);
899908
}
@@ -1034,6 +1043,16 @@ MongoDB.prototype.buildWhere = function(modelName, where, options) {
10341043

10351044
query[k] = {$regex: cond};
10361045
} else {
1046+
if (isObjectIDProperty(modelCtor, propDef, cond, options)) {
1047+
if (Array.isArray(cond)) {
1048+
cond = cond.map(function(c) {
1049+
return ObjectID(c);
1050+
});
1051+
} else {
1052+
cond = ObjectID(cond);
1053+
}
1054+
}
1055+
10371056
query[k] = {};
10381057
query[k]['$' + spec] = cond;
10391058
}
@@ -1044,8 +1063,15 @@ MongoDB.prototype.buildWhere = function(modelName, where, options) {
10441063
query[k] = {$type: 10};
10451064
} else {
10461065
if (isObjectIDProperty(modelCtor, propDef, cond, options)) {
1047-
cond = ObjectID(cond);
1066+
if (Array.isArray(cond)) {
1067+
cond = cond.map(function(c) {
1068+
return ObjectID(c);
1069+
});
1070+
} else {
1071+
cond = ObjectID(cond);
1072+
}
10481073
}
1074+
10491075
query[k] = cond;
10501076
}
10511077
}
@@ -1312,8 +1338,17 @@ MongoDB.prototype.convertColumnNames = function(model, data, direction) {
13121338
}
13131339

13141340
if (direction === 'database') {
1315-
data[columnName] = data[propName];
1316-
delete data[propName];
1341+
// Handle data is Array object - in case of fields filter
1342+
if (Array.isArray(data)) {
1343+
const idx = data.indexOf(propName);
1344+
if (idx !== -1) {
1345+
data.push(columnName);
1346+
delete data[idx];
1347+
}
1348+
} else { // Handle data as Object - in case to create / update
1349+
data[columnName] = data[propName];
1350+
delete data[propName];
1351+
}
13171352
}
13181353

13191354
if (direction === 'property') {
@@ -1351,17 +1386,23 @@ MongoDB.prototype.all = function all(modelName, filter, options, callback) {
13511386
if (filter.where) {
13521387
query = self.buildWhere(modelName, filter.where, options);
13531388
}
1354-
let fields = filter.fields;
1389+
// Use Object.assign to avoid change filter.fields
1390+
// which will cause error when create model from data
1391+
let fields = undefined;
1392+
if (typeof filter.fields !== 'undefined') {
1393+
fields = [];
1394+
Object.assign(fields, filter.fields);
1395+
}
13551396

13561397
// Convert custom column names
13571398
fields = self.fromPropertyToDatabaseNames(modelName, fields);
13581399

1400+
options = buildOptions({}, options);
1401+
13591402
if (fields) {
1360-
const findOpts = {projection: fieldsArrayToObj(fields)};
1361-
this.execute(modelName, 'find', query, findOpts, processResponse);
1362-
} else {
1363-
this.execute(modelName, 'find', query, processResponse);
1403+
options.projection = fieldsArrayToObj(fields);
13641404
}
1405+
this.execute(modelName, 'find', query, options, processResponse);
13651406

13661407
function processResponse(err, cursor) {
13671408
if (err) {
@@ -1461,7 +1502,7 @@ MongoDB.prototype.destroyAll = function destroyAll(
14611502
where = self.buildWhere(modelName, where, options);
14621503
if (debug.enabled) debug('destroyAll where %s', util.inspect(where));
14631504

1464-
this.execute(modelName, 'deleteMany', where || {}, function(err, info) {
1505+
this.execute(modelName, 'deleteMany', where || {}, buildOptions({}, options), function(err, info) {
14651506
if (err) return callback && callback(err);
14661507

14671508
if (self.debug) debug('destroyAll.callback', modelName, where, err, info);
@@ -1488,15 +1529,26 @@ MongoDB.prototype.count = function count(modelName, where, options, callback) {
14881529
debug('count', modelName, where);
14891530
}
14901531
where = self.buildWhere(modelName, where, options) || {};
1491-
const method = Object.keys(where).length === 0 ? 'estimatedDocumentCount' : 'countDocuments';
1492-
this.execute(modelName, method, where, function(err, count) {
1493-
if (self.debug) {
1494-
debug('count.callback', modelName, err, count);
1495-
}
1496-
if (callback) {
1497-
callback(err, count);
1498-
}
1499-
});
1532+
options = buildOptions({}, options);
1533+
if (Object.keys(where).length === 0 && !options.session) {
1534+
this.execute(modelName, 'estimatedDocumentCount', function(err, count) {
1535+
if (self.debug) {
1536+
debug('count.callback', modelName, err, count);
1537+
}
1538+
if (callback) {
1539+
callback(err, count);
1540+
}
1541+
});
1542+
} else {
1543+
this.execute(modelName, 'countDocuments', where, options, function(err, count) {
1544+
if (self.debug) {
1545+
debug('count.callback', modelName, err, count);
1546+
}
1547+
if (callback) {
1548+
callback(err, count);
1549+
}
1550+
});
1551+
}
15001552
};
15011553

15021554
/**
@@ -1538,7 +1590,7 @@ MongoDB.prototype.replaceWithOptions = function(modelName, id, data, options, cb
15381590
const idName = self.idName(modelName);
15391591
delete data[idName];
15401592
data = self.toDatabase(modelName, data);
1541-
this.execute(modelName, 'replaceOne', {_id: id}, data, options, function(
1593+
this.execute(modelName, 'replaceOne', {_id: id}, data, buildOptions({}, options), function(
15421594
err,
15431595
info,
15441596
) {
@@ -1735,11 +1787,11 @@ MongoDB.prototype.upsertWithWhere = function upsertWithWhere(
17351787
'findOneAndUpdate',
17361788
where,
17371789
updateData,
1738-
{
1790+
buildOptions({
17391791
upsert: true,
17401792
returnOriginal: false,
17411793
sort: [['_id', 'asc']],
1742-
},
1794+
}, options),
17431795
function(err, result) {
17441796
if (err) return cb && cb(err);
17451797

@@ -2015,6 +2067,48 @@ MongoDB.prototype.ping = function(cb) {
20152067
}
20162068
};
20172069

2070+
MongoDB.prototype.beginTransaction = function(isolationLevel, cb) {
2071+
// TODO: think about how to convert READ_COMMITED, etc. to transactionOptions
2072+
const transactionOptions = {
2073+
readPreference: 'primary',
2074+
readConcern: {level: 'local'},
2075+
writeConcern: {w: 'majority'},
2076+
};
2077+
if (isolationLevel instanceof Object) {
2078+
Object.assign(transactionOptions, isolationLevel || {});
2079+
}
2080+
const session = this.client.startSession();
2081+
session.startTransaction(transactionOptions);
2082+
cb(null, session);
2083+
};
2084+
2085+
MongoDB.prototype.commit = function(tx, cb) {
2086+
tx.commitTransaction(function(err) {
2087+
tx.endSession(null, function(error) {
2088+
if (err) return cb(err);
2089+
if (error) return cb(error);
2090+
cb();
2091+
});
2092+
});
2093+
};
2094+
2095+
MongoDB.prototype.rollback = function(tx, cb) {
2096+
tx.abortTransaction(function(err) {
2097+
tx.endSession(null, function(error) {
2098+
if (err) return cb(err);
2099+
if (error) return cb(error);
2100+
cb();
2101+
});
2102+
});
2103+
};
2104+
2105+
function isInTransation(options) {
2106+
const ops = {};
2107+
if (options && options.transaction && options.transaction.isInTransation)
2108+
ops.session = options.transaction.session;
2109+
return ops;
2110+
}
2111+
20182112
// Case insensitive check if a string looks like "ObjectID"
20192113
function typeIsObjectId(input) {
20202114
if (!input) return false;
@@ -2072,7 +2166,8 @@ function coerceToObjectId(modelCtor, propDef, propValue) {
20722166
function isObjectIDProperty(modelCtor, propDef, value, options) {
20732167
if (!propDef) return false;
20742168

2075-
if (typeof value === 'string' && value.match(ObjectIdValueRegex)) {
2169+
if ((typeof value === 'string' && value.match(ObjectIdValueRegex)) ||
2170+
(Array.isArray(value) && value.every((v) => v.match(ObjectIdValueRegex)))) {
20762171
if (isStoredAsObjectID(propDef)) return true;
20772172
else return !isStrictObjectIDCoercionEnabled(modelCtor, options);
20782173
} else if (value instanceof mongodb.ObjectID) {
@@ -2306,5 +2401,9 @@ function hasDataType(dataType, propertyDef) {
23062401
* @param {*} connectorOptions User specified Options
23072402
*/
23082403
function buildOptions(requiredOptions, connectorOptions) {
2309-
return Object.assign({}, connectorOptions, requiredOptions);
2404+
if (connectorOptions && connectorOptions.transaction && connectorOptions.transaction.isActive()) {
2405+
return Object.assign({session: connectorOptions.transaction.connection}, connectorOptions, requiredOptions);
2406+
} else {
2407+
return Object.assign({}, connectorOptions, requiredOptions);
2408+
}
23102409
}

0 commit comments

Comments
 (0)