diff --git a/src/simperium/channel.js b/src/simperium/channel.js index 4f59d15..4b01468 100644 --- a/src/simperium/channel.js +++ b/src/simperium/channel.js @@ -3,6 +3,8 @@ import { format, inherits } from 'util' import { EventEmitter } from 'events' import { parseMessage, parseVersionMessage, change as change_util } from './util' import { v4 as uuid } from 'uuid' +import NetworkQueue from './queues/network-queue'; +import LocalQueue from './queues/local-queue'; const UNKNOWN_CV = '?'; const CODE_INVALID_VERSION = 405; @@ -262,87 +264,6 @@ internal.indexingComplete = function() { this.emit( 'ready' ) } -/** - * A ghost represents a version of a bucket object as known by Simperium - * - * Generally a client will keep the last known ghost stored locally for efficient - * diffing and patching of Simperium change operations. - * - * @typedef {Object} Ghost - * @property {Number} version - the ghost's version - * @property {String} key - the simperium bucket object id this ghost is for - * @property {Object} data - the data for the given ghost version - */ - -/** - * Callback function used by the ghost store to iterate over existing ghosts - * - * @callback ghostIterator - * @param {Ghost} - the current ghost - */ - -/** - * A GhostStore provides the store mechanism for ghost data that the Channel - * uses to maintain syncing state and producing change operations for - * Bucket objects. - * - * @interface GhostStore - */ - -/** - * Retrieve a Ghost for the given bucket object id - * - * @function - * @name GhostStore#get - * @param {String} id - bucket object id - * @returns {Promise} - the ghost for this object - */ - -/** - * Save a ghost in the store. - * - * @function - * @name GhostStore#put - * @param {String} id - bucket object id - * @param {Number} version - version of ghost data - * @param {Object} data - object literal to save as this ghost's data for this version - * @returns {Promise} - the ghost for this object - */ - -/** - * Delete a Ghost from the store. - * - * @function - * @name GhostStore#remove - * @param {String} id - bucket object id - * @returns {Promise} - the ghost for this object - */ - -/** - * Iterate over existing Ghost objects with the given callback. - * - * @function - * @name GhostStore#eachGhost - * @param {ghostIterator} - function to run against each ghost - */ - -/** - * Get the current change version (cv) that this channel has synced. - * - * @function - * @name GhostStore#getChangeVersion - * @returns {Promise} - the current change version for the bucket - */ - -/** - * Set the current change version. - * - * @function - * @name GhostStore#setChangeVersion - * @returns {Promise} - resolves once the change version is saved - */ - - /** * Maintains syncing state for a Simperium bucket. * @@ -662,199 +583,6 @@ Channel.prototype.onVersion = function( data ) { this.emit( 'version.' + ghost.id + '.' + ghost.version, ghost.data ); }; -function NetworkQueue() { - this.queues = {}; -} - -NetworkQueue.prototype.queueFor = function( id ) { - var queues = this.queues, - queue = queues[id]; - - if ( !queue ) { - queue = new Queue(); - queue.on( 'finish', function() { - delete queues[id]; - } ); - queues[id] = queue; - } - - return queue; -}; - -function Queue() { - this.queue = []; - this.running = false; -} - -inherits( Queue, EventEmitter ); - -// Add a function at the end of the queue -Queue.prototype.add = function( fn ) { - this.queue.push( fn ); - this.start(); - return this; -}; - -Queue.prototype.start = function() { - if ( this.running ) return; - this.running = true; - this.emit( 'start' ); - setImmediate( this.run.bind( this ) ); -} - -Queue.prototype.run = function() { - var fn; - this.running = true; - - if ( this.queue.length === 0 ) { - this.running = false; - this.emit( 'finish' ); - return; - } - - fn = this.queue.shift(); - fn( this.run.bind( this ) ); -} - -function LocalQueue( store ) { - this.store = store; - this.sent = {}; - this.queues = {}; - this.ready = false; -} - -inherits( LocalQueue, EventEmitter ); - -LocalQueue.prototype.start = function() { - var queueId; - this.ready = true; - for ( queueId in this.queues ) { - this.processQueue( queueId ); - } -} - -LocalQueue.prototype.pause = function() { - this.ready = false; -}; - -LocalQueue.prototype.acknowledge = function( change ) { - if ( this.sent[change.id] === change ) { - delete this.sent[change.id]; - } - - this.processQueue( change.id ); -} - -LocalQueue.prototype.queue = function( change ) { - var queue = this.queues[change.id]; - - if ( !queue ) { - queue = []; - this.queues[change.id] = queue; - } - - queue.push( change ); - - this.emit( 'queued', change.id, change, queue ); - - if ( !this.ready ) return; - - this.processQueue( change.id ); -}; - -LocalQueue.prototype.hasChanges = function() { - return Object.keys( this.queues ).length > 0; -}; - -LocalQueue.prototype.dequeueChangesFor = function( id ) { - var changes = [], sent = this.sent[id], queue = this.queues[id]; - - if ( sent ) { - delete this.sent[id]; - changes.push( sent ); - } - - if ( queue ) { - delete this.queues[id]; - changes = changes.concat( queue ); - } - - return changes; -}; - -LocalQueue.prototype.processQueue = function( id ) { - var queue = this.queues[id]; - var compressAndSend = this.compressAndSend.bind( this, id ); - - // there is no queue, don't do anything - if ( !queue ) return; - - // queue is empty, delete it from memory - if ( queue.length === 0 ) { - delete this.queues[id]; - return; - } - - // waiting for a previous sent change to get acknowledged - if ( this.sent[id] ) { - this.emit( 'wait', id ); - return; - } - - this.store.get( id ).then( compressAndSend ); -} - -LocalQueue.prototype.compressAndSend = function( id, ghost ) { - var changes = this.queues[id]; - var change; - var target = ghost.data; - var c; - var type; - - // a change was sent before we could compress and send - if ( this.sent[id] ) { - this.emit( 'wait', id ); - return; - } - - if ( changes.length === 1 ) { - change = changes.shift(); - this.sent[id] = change; - this.emit( 'send', change ); - return; - } - - if ( changes.length > 1 && changes[0].type === change_util.type.REMOVE ) { - change = changes.shift(); - changes.splice( 0, changes.length - 1 ); - this.sent[id] = change; - this.emit( 'send', change ); - } - - while ( changes.length > 0 ) { - c = changes.shift(); - - if ( c.o === change_util.type.REMOVE ) { - changes.unshift( c ); - break; - } - - target = change_util.apply( c.v, target ); - } - - type = target === null ? change_util.type.REMOVE : change_util.type.MODIFY; - change = change_util.buildChange( type, id, target, ghost ); - - this.sent[id] = change; - this.emit( 'send', change ); -} - -LocalQueue.prototype.resendSentChanges = function() { - for ( let ccid in this.sent ) { - this.emit( 'send', this.sent[ccid] ) - } -} - /** * Since revision data is basically immutable we can prevent the * need to refetch it after it has been loaded once. diff --git a/src/simperium/queues/README.md b/src/simperium/queues/README.md new file mode 100644 index 0000000..49acc89 --- /dev/null +++ b/src/simperium/queues/README.md @@ -0,0 +1,25 @@ +# Channel Queues + +These queues were originally in the Channel module. They have been extracted +and flowtyped to improve code quality and clarify the API. + +## LocalQueue + +Each `Channel` instance has a single `LocalQueue` that tracks changes that are sent +are pending to be sent to simperium. + +As bucket objects are updated, the `Channel` will reference this queue to determine +when an object should be sent. It also uses the `LocalQueue` to report if a bucket object +is currently being synced or not. + +## NetworkQueue + +Each `Channel` instance has a single `NetworkQueue`. As changes are received from simperium, +the channel will apply the changes in sequence. Together with the `LocalQueue` the channel +will be able to determine when pending changes in the `LocalQueue` have been accepted or +rejected by the server. + +## Queue + +A generic queue object used by `LocalQueue` and `RemoteQueue` that sequences tasks as +first-in-first-out execution order. diff --git a/src/simperium/queues/local-queue.js b/src/simperium/queues/local-queue.js new file mode 100644 index 0000000..b819400 --- /dev/null +++ b/src/simperium/queues/local-queue.js @@ -0,0 +1,295 @@ +// @flow +import events from 'events'; +import * as change_util from '../util/change'; +import type { BucketOperation } from '../util/change'; + +const { EventEmitter } = events; + +/** + * A ghost represents a version of a bucket object as known by Simperium + * + * Generally a client will keep the last known ghost stored locally for efficient + * diffing and patching of Simperium change operations. + * + * @typedef {Object} Ghost + * @property {Number} version - the ghost's version + * @property {String} key - the simperium bucket object id this ghost is for + * @property {Object} data - the data for the given ghost version + */ +type Ghost = { version: number, key: string, data: {} }; + +/** + * A GhostStore provides the store mechanism for ghost data that the Channel + * uses to maintain syncing state and producing change operations for + * Bucket objects. + * + * @interface GhostStore + */ +interface GhostStore { + + /** + * Callback function used by the ghost store to iterate over existing ghosts + * + * @callback ghostIterator + * @param {Ghost} - the current ghost + */ + + /** + * Retrieve a Ghost for the given bucket object id + * + * @function + * @name GhostStore#get + * @param {String} id - bucket object id + * @returns {Promise} - the ghost for this object + */ + get( id: string ): Promise; + + /** + * Save a ghost in the store. + * + * @function + * @name GhostStore#put + * @param {String} id - bucket object id + * @param {Number} version - version of ghost data + * @param {Object} data - object literal to save as this ghost's data for this version + * @returns {Promise} - the ghost for this object + */ + + /** + * Delete a Ghost from the store. + * + * @function + * @name GhostStore#remove + * @param {String} id - bucket object id + * @returns {Promise} - the ghost for this object + */ + + /** + * Iterate over existing Ghost objects with the given callback. + * + * @function + * @name GhostStore#eachGhost + * @param {ghostIterator} - function to run against each ghost + */ + + /** + * Get the current change version (cv) that this channel has synced. + * + * @function + * @name GhostStore#getChangeVersion + * @returns {Promise} - the current change version for the bucket + */ + + /** + * Set the current change version. + * + * @function + * @name GhostStore#setChangeVersion + * @returns {Promise} - resolves once the change version is saved + */ +} + +export default class LocalQueue extends EventEmitter { + store: GhostStore; + sent: { [objectId: string]: ?BucketOperation }; + queues: { [objectId: string]: ?BucketOperation[] }; + ready: boolean + + /* + * @param {GhostStore} store - the ghost store for retrieving ghost data + */ + constructor( store: GhostStore ) { + super(); + this.store = store; + this.sent = {}; + this.queues = {}; + this.ready = false; + } + + /* + * Start processing any local changes + */ + start() { + this.ready = true; + for ( const queueId in this.queues ) { + this.processQueue( queueId ); + } + } + + /** + * Pause execution of local changes. No local changes will be sent to + * simperium until .start is called. + */ + pause() { + this.ready = false; + }; + + /** + * When a change is acknowledged and it matches the sent change for the + * given bucket operation clear it from the sent queue. + * + * Any pending changes for the bucket object will then be sent. + * + * @param {BucketOperation} change - the operation that is being acknowledged + */ + acknowledge( change: BucketOperation ) { + if ( this.sent[change.id] === change ) { + delete this.sent[change.id]; + } + + this.processQueue( change.id ); + } + + /** + * Queues a on operation that will modify a bucket object on simperium.com. If the + * local queue has been started the queue for the corresponding bucket object will + * be processed and the next change will be sent. + * + * @param {BucketOperation} change - the bucket operation to send to simperium + */ + queue( change: BucketOperation ) { + let queue = this.queues[change.id]; + + if ( !queue ) { + queue = []; + this.queues[change.id] = queue; + } + + queue.push( change ); + + this.emit( 'queued', change.id, change, queue ); + + if ( !this.ready ) return; + + this.processQueue( change.id ); + }; + + /** + * Reports if there are any changes pending for this channel. + * + * @returns {boolean} true if there are any pending changes + */ + hasChanges() { + return Object.keys( this.queues ).length > 0 || + Object.keys( this.sent ).length > 0; + }; + + /** + * Removes pending and sent changes from the queue and returns them + * @param {string} id - bucket object id + * @returns {BucketOperation[]} list of changes removed from the queue + */ + dequeueChangesFor( id: string ) { + let changes: BucketOperation[] = []; + const sent = this.sent[id]; + const queue = this.queues[id]; + + if ( sent ) { + delete this.sent[id]; + changes = changes.concat( sent ); + } + + if ( queue ) { + delete this.queues[id]; + changes = changes.concat( queue ); + } + + return changes; + }; + + /** + * Send the changes queued for the given bucket object + * @emits wait + * @param {string} id - bucket object id + */ + processQueue( id: string ) { + const queue = this.queues[id]; + + // there is no queue, don't do anything + if ( !queue ) return; + + // queue is empty, delete it from memory + if ( queue.length === 0 ) { + delete this.queues[id]; + return; + } + + // waiting for a previous sent change to get acknowledged + if ( this.sent[id] ) { + this.emit( 'wait', id ); + return; + } + + this.store.get( id ).then( ghost => { + this.compressAndSend( id, ghost ); + } ); + } + + /** + * Sends queued changes if any to simperium for the given bucket object id + * + * @param {string} id - bucket object id to send changes for + * @param {*} ghost - the current ghost for the given bucket object + */ + compressAndSend( id: string, ghost: Ghost ) { + const changes = this.queues[id]; + // the starting point of any changes will be the ghost's current data + let modifiedObject = ghost.data; + + // a change was sent before we could compress and send + if ( this.sent[id] ) { + this.emit( 'wait', id ); + return; + } + if ( !changes ) { + return; + } + + // there is a single change, remove it from the bucket + // objects pending queue and send it + if ( changes.length === 1 ) { + const change = changes.shift(); + this.sent[id] = change; + this.emit( 'send', change ); + return; + } + + // there is more than one change but if the firest change is a delete type + // then the following local changes can be discarded + if ( changes.length > 1 && changes[0].type === change_util.type.REMOVE ) { + const change = changes.shift(); + changes.splice( 0, changes.length - 1 ); + this.sent[id] = change; + this.emit( 'send', change ); + return; + } + + while ( changes.length > 0 ) { + const change = changes.shift(); + + if ( change.o === '-' ) { + changes.unshift( change ); + break; + } + + if ( change.o === 'M' ) { + modifiedObject = change_util.apply( change.v, modifiedObject ); + } + } + + const type = modifiedObject === null ? change_util.type.REMOVE : change_util.type.MODIFY; + const change = change_util.buildChange( type, id, modifiedObject, ghost ); + + this.sent[id] = change; + this.emit( 'send', change ); + } + + /** + * Retries sending any previously sent changes that have not been acknowledged by simperium + */ + resendSentChanges() { + for ( let ccid in this.sent ) { + this.emit( 'send', this.sent[ccid] ) + } + } +} diff --git a/src/simperium/queues/network-queue.js b/src/simperium/queues/network-queue.js new file mode 100644 index 0000000..09cbb12 --- /dev/null +++ b/src/simperium/queues/network-queue.js @@ -0,0 +1,33 @@ +// @flow +import Queue from './queue'; + +/** + * Stores a mapping of Queue objects to bucket object ids + */ +export default class NetworkQueue { + queues: { [bucketObjectID: string]: ?Queue }; + + constructor() { + this.queues = {}; + } + + /** + * Retrieve the queue for the given bucket object id + * + * @param {string} id - the bucket object id to retrieve the queue for + * @return {Queue} the queue for the giver bucket object, creates a new queue if none exists + */ + queueFor( id: string ) { + let queue: ?Queue = this.queues[id]; + + if ( !queue ) { + queue = new Queue(); + queue.on( 'finish', () => { + delete this.queues[id]; + } ); + this.queues[id] = queue; + } + + return queue; + } +} diff --git a/src/simperium/queues/queue.js b/src/simperium/queues/queue.js new file mode 100644 index 0000000..5dc380a --- /dev/null +++ b/src/simperium/queues/queue.js @@ -0,0 +1,55 @@ +// @flow +import events from 'events'; + +const { EventEmitter } = events; + +type Task = ( onComplete: () => void ) => void + +export default class Queue extends EventEmitter { + queue: Task[]; + running: boolean + constructor() { + super(); + this.queue = []; + this.running = false; + } + /** + * Add a task to the queue. THe queue will start if it has not been started + * @param {Task} task - the task to execute + * @returns {Queue} the queue instance for chaining + */ + add( task: Task ) { + this.queue.push( task ); + this.start(); + return this; + }; + + /** + * Begins processing tasks if the queue is not already running + * @emits 'start' + */ + start() { + if ( this.running ) return; + this.running = true; + this.emit( 'start' ); + setImmediate( this.run.bind( this ) ); + } + + /** + * Runs the next action on the queue + * @emits finish - when all tasks are completed + * @private + */ + run() { + this.running = true; + + if ( this.queue.length === 0 ) { + this.running = false; + this.emit( 'finish' ); + return; + } + + const task = this.queue.shift(); + task( () => this.run() ); + } +}