diff --git a/package.json b/package.json index 8fb5560..619b3a3 100644 --- a/package.json +++ b/package.json @@ -45,7 +45,8 @@ "detect-node": "^2.0.4", "dirty-chai": "^2.0.1", "ipfs": "~0.31.5", - "ipfsd-ctl": "~0.39.1" + "ipfsd-ctl": "~0.39.1", + "sinon": "^7.0.0" }, "contributors": [ "Vasco Santos ", diff --git a/src/index.js b/src/index.js index c8e266a..d95c9a3 100644 --- a/src/index.js +++ b/src/index.js @@ -86,17 +86,28 @@ class DatastorePubsub { const stringifiedTopic = key.toString() - // Subscribe - this._pubsub.subscribe(stringifiedTopic, this._handleSubscription, (err) => { + this._pubsub.ls((err, res) => { if (err) { - const errMsg = `cannot subscribe topic ${stringifiedTopic}` + return callback(err) + } - log.error(errMsg) - return callback(errcode(new Error(errMsg), 'ERR_SUBSCRIBING_TOPIC')) + // If already subscribed, just try to get it + if (res && Array.isArray(res) && res.indexOf(stringifiedTopic) > -1) { + return this._getLocal(key, callback) } - log(`subscribed values for key ${stringifiedTopic}`) - this._getLocal(key, callback) + // Subscribe + this._pubsub.subscribe(stringifiedTopic, this._handleSubscription, (err) => { + if (err) { + const errMsg = `cannot subscribe topic ${stringifiedTopic}` + + log.error(errMsg) + return callback(errcode(new Error(errMsg), 'ERR_SUBSCRIBING_TOPIC')) + } + log(`subscribed values for key ${stringifiedTopic}`) + + this._getLocal(key, callback) + }) }) } diff --git a/test/index.spec.js b/test/index.spec.js index def5448..1a5266d 100644 --- a/test/index.spec.js +++ b/test/index.spec.js @@ -5,6 +5,7 @@ const chai = require('chai') const dirtyChai = require('dirty-chai') const expect = chai.expect chai.use(dirtyChai) +const sinon = require('sinon') const isNode = require('detect-node') const parallel = require('async/parallel') @@ -495,4 +496,20 @@ describe('datastore-pubsub', function () { }) }) }) + + it('should subscribe a topic only once', function (done) { + const dsPubsubA = new DatastorePubsub(pubsubA, datastoreA, peerIdA, smoothValidator) + + sinon.spy(pubsubA, 'subscribe') + + dsPubsubA.get(key, (err) => { + expect(err).to.exist() // not locally stored record + dsPubsubA.get(key, (err) => { + expect(err).to.exist() // not locally stored record + expect(pubsubA.subscribe.calledOnce).to.equal(true) + + done() + }) + }) + }) })