diff --git a/lib/index.js b/lib/index.js index 0a5021f21..a71aec70d 100644 --- a/lib/index.js +++ b/lib/index.js @@ -213,6 +213,11 @@ Livedb.prototype.getOps = function(cName, docName, from, to, callback) { this.driver.getOps(c, docName, from, to, function(err, ops) { if (self.sdc) self.sdc.timing('livedb.getOps', Date.now() - start); + + // If we got results, check that we got the right ops. + // Sometimes, if ops have expired from the oplog, there might be some missing. + if (ops && ops.length > 0 && ops[0].v !== from) + return callback('Missing operations'); // Interestingly, this will filter ops for other types as if they were the projected type. This // is a bug, but it shouldn't cause any problems for now. I'll have to revisit this @@ -330,6 +335,14 @@ Livedb.prototype._trySubmit = function(submitData) { self._resumeSubmitting(opData.src); return; } + + // Check if the first op we received wasn't the one we requested. + // This can occur if the ops were dropped from the oplog (e.g. expired). + if (from !== snapshot.v && (ops.length === 0 || ops[0].v !== from)) { + submitData.callback('Missing operations'); + self._resumeSubmitting(opData.src); + return; + } for (var i = 0; i < ops.length; i++) { var op = ops[i]; @@ -342,7 +355,7 @@ Livedb.prototype._trySubmit = function(submitData) { self._resumeSubmitting(opData.src); return; } - + // Bring both the op and the snapshot up to date. At least one of // these two conditionals should be true. if (snapshot.v === op.v) { diff --git a/lib/redisdriver.js b/lib/redisdriver.js index 5c71b265a..525fdceef 100644 --- a/lib/redisdriver.js +++ b/lib/redisdriver.js @@ -172,7 +172,7 @@ RedisDriver.prototype.atomicSubmit = function(cName, docName, opData, options, c // In this case, we should write a no-op ramp to the snapshot // version, followed by a delete & a create to fill in the missing // ops. - throw Error('Missing oplog for ' + cName + ' ' + docName); + return callback('Missing oplog for ' + cName + ' ' + docName); } self._redisSubmitScript(cName, docName, opData, dirtyData, version, callbackWrapper); }); @@ -323,7 +323,8 @@ RedisDriver.prototype._oplogGetOps = function(cName, docName, from, to, callback var self = this; this.oplog.getOps(cName, docName, from, to, function(err, ops) { if (err) return callback(err); - if (ops.length && ops[0].v !== from) throw Error('Oplog is returning incorrect ops'); + if (ops.length && ops[0].v !== from) + return callback('Missing operations'); for (var i = 0; i < ops.length; i++) { ops[i].v = from++; diff --git a/test/oplog.coffee b/test/oplog.coffee index d6b3c39fa..8776c230a 100644 --- a/test/oplog.coffee +++ b/test/oplog.coffee @@ -98,15 +98,10 @@ module.exports = (create) -> check = (error, ops) -> throw new Error error if error assert.deepEqual ops, [] - done() if ++num is 7 + done() if ++num is 2 @db.getOps @cName, @docName, 0, 0, check - @db.getOps @cName, @docName, 0, 1, check - @db.getOps @cName, @docName, 0, 10, check @db.getOps @cName, @docName, 0, null, check - @db.getOps @cName, @docName, 10, 10, check - @db.getOps @cName, @docName, 10, 11, check - @db.getOps @cName, @docName, 10, null, check it 'returns ops', (done) -> num = 0 diff --git a/test/test.coffee b/test/test.coffee index 48f1870c5..1a5138d10 100644 --- a/test/test.coffee +++ b/test/test.coffee @@ -69,6 +69,36 @@ describe 'livedb', -> throw new Error err if err assert.equal m.language, null done() + + it 'errors when oplog is missing all operations', (done) -> @create => + @collection.submit @docName, {v:1, op:['test']}, (err, v) => + throw new Error err if err + + getOps = @client.driver.getOps + @client.driver.getOps = (cName, docName, from, to, fn) -> + fn null, [] + + @collection.submit @docName, {v:1, op:['test']}, (err, v) => + assert.equal err, 'Missing operations' + @client.driver.getOps = getOps + done() + + it 'errors when oplog is missing some operations', (done) -> @create => + @collection.submit @docName, {v:1, op:['test']}, (err, v) => + throw new Error err if err + @collection.submit @docName, {v:2, op:['test 2']}, (err, v) => + throw new Error err if err + + getOps = @client.driver.getOps + @client.driver.getOps = (cName, docName, from, to, fn) => + getOps.call @client.driver, cName, docName, from, to, (err, ops) -> + throw new Error err if err + fn null, ops.slice 1 + + @collection.submit @docName, {v:1, op:['test']}, (err, v) => + assert.equal err, 'Missing operations' + @client.driver.getOps = getOps + done() it 'can modify a document', (done) -> @create => @collection.submit @docName, v:1, op:['hi'], (err, v) => @@ -375,11 +405,22 @@ describe 'livedb', -> assert.deepEqual stripTs(ops), [{create:{type:textType.uri, data:''}, v:0, m:{}, src:''}, {op:['hi'], v:1, m:{}, src:''}] done() - - - it 'errors if ops are missing from the snapshotdb and oplogs' - - + it 'errors if ops are missing from the snapshotdb and oplogs', (done) -> @create => + @collection.submit @docName, {v:1, op:['test']}, (err, v) => + throw new Error err if err + @collection.submit @docName, {v:1, op:['test2']}, (err, v) => + throw new Error err if err + + getOps = @client.driver.getOps + @client.driver.getOps = (cName, docName, from, to, fn) => + getOps.call @client.driver, cName, docName, from, to, (err, ops) -> + throw new Error err if err + fn null, ops.slice 1 + + @collection.getOps @docName, 0, (err, ops) => + assert.equal err, 'Missing operations' + @client.driver.getOps = getOps + done() it 'works with separate clients', (done) -> @create => return done() unless @driver.distributed